Refactor SerializationFormat::EndWrite and ChunkedIO::Chunk mem mgmt.

SerializationFormat::EndWrite now transfers ownership of the buffer
to the caller instead of doing a memcpy.

ChunkedIO::Chunk is no longer a POD type, hopefully the ctor/dtor
make it easier to manage its associated memory.  It also now
tracks how to deallocate its buffer (i.e. delete vs. free).
This commit is contained in:
Jon Siwek 2014-03-18 14:42:38 -05:00
parent 07a4a8d147
commit 70131b5c84
9 changed files with 87 additions and 77 deletions

View file

@ -127,12 +127,7 @@ ChunkedIOFd::~ChunkedIOFd()
delete [] read_buffer;
delete [] write_buffer;
safe_close(fd);
if ( partial )
{
delete [] partial->data;
delete partial;
}
delete partial;
}
bool ChunkedIOFd::Write(Chunk* chunk)
@ -169,10 +164,9 @@ bool ChunkedIOFd::Write(Chunk* chunk)
while ( left )
{
Chunk* part = new Chunk;
uint32 sz = min<uint32>(BUFFER_SIZE - sizeof(uint32), left);
Chunk* part = new Chunk(new char[sz], sz);
part->len = min<uint32>(BUFFER_SIZE - sizeof(uint32), left);
part->data = new char[part->len];
memcpy(part->data, p, part->len);
left -= part->len;
p += part->len;
@ -181,9 +175,7 @@ bool ChunkedIOFd::Write(Chunk* chunk)
return false;
}
delete [] chunk->data;
delete chunk;
return true;
}
@ -239,7 +231,6 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
memcpy(write_buffer + write_len, chunk->data, len);
write_len += len;
delete [] chunk->data;
delete chunk;
if ( network_time - last_flush > 0.005 )
@ -362,9 +353,7 @@ ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
read_pos += sizeof(uint32);
Chunk* chunk = new Chunk;
chunk->len = len;
chunk->data = new char[real_len];
Chunk* chunk = new Chunk(new char[real_len], len);
memcpy(chunk->data, read_buffer + read_pos, real_len);
read_pos += real_len;
@ -375,17 +364,13 @@ ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
ChunkedIO::Chunk* ChunkedIOFd::ConcatChunks(Chunk* c1, Chunk* c2)
{
Chunk* c = new Chunk;
c->len = c1->len + c2->len;
c->data = new char[c->len];
uint32 sz = c1->len + c2->len;
Chunk* c = new Chunk(new char[sz], sz);
memcpy(c->data, c1->data, c1->len);
memcpy(c->data + c1->len, c2->data, c2->len);
delete [] c1->data;
delete c1;
delete [] c2->data;
delete c2;
return c;
@ -627,7 +612,6 @@ void ChunkedIOFd::Clear()
while ( pending_head )
{
ChunkQueue* next = pending_head->next;
delete [] pending_head->chunk->data;
delete pending_head->chunk;
delete pending_head;
pending_head = next;
@ -946,7 +930,6 @@ bool ChunkedIOSSL::Flush()
--stats.pending;
delete q;
delete [] c->data;
delete c;
write_state = LEN;
@ -1063,7 +1046,10 @@ bool ChunkedIOSSL::Read(Chunk** chunk, bool mayblock)
}
if ( ! read_chunk->data )
{
read_chunk->data = new char[read_chunk->len];
read_chunk->free_func = Chunk::free_func_delete;
}
if ( ! ReadData(read_chunk->data, read_chunk->len, &error) )
return ! error;
@ -1123,7 +1109,6 @@ void ChunkedIOSSL::Clear()
while ( write_head )
{
Queue* next = write_head->next;
delete [] write_head->chunk->data;
delete write_head->chunk;
delete write_head;
write_head = next;
@ -1231,12 +1216,13 @@ bool CompressedChunkedIO::Read(Chunk** chunk, bool may_block)
return false;
}
delete [] (*chunk)->data;
(*chunk)->free_func((*chunk)->data);
uncompressed_bytes_read += uncompressed_len;
(*chunk)->len = uncompressed_len;
(*chunk)->data = uncompressed;
(*chunk)->free_func = Chunk::free_func_delete;
return true;
}
@ -1280,8 +1266,9 @@ bool CompressedChunkedIO::Write(Chunk* chunk)
memcpy(compressed, chunk->data, chunk->len);
*(uint32*) (compressed + chunk->len) = 0; // uncompressed_length
delete [] chunk->data;
chunk->free_func(chunk->data);
chunk->data = compressed;
chunk->free_func = Chunk::free_func_delete;
chunk->len += 4;
DBG_LOG(DBG_CHUNKEDIO, "zlib write pass-through: size=%d", chunk->len);
@ -1322,8 +1309,9 @@ bool CompressedChunkedIO::Write(Chunk* chunk)
*(uint32*) zout.next_out = original_size; // uncompressed_length
delete [] chunk->data;
chunk->free_func(chunk->data);
chunk->data = compressed;
chunk->free_func = Chunk::free_func_delete;
chunk->len =
((char*) zout.next_out - compressed) + sizeof(uint32);

View file

@ -26,10 +26,27 @@ public:
ChunkedIO();
virtual ~ChunkedIO() { }
typedef struct {
struct Chunk {
typedef void (*FreeFunc)(char*);
static void free_func_free(char* data) { free(data); }
static void free_func_delete(char* data) { delete [] data; }
Chunk()
: data(), len(), free_func(free_func_delete)
{ }
Chunk(char* arg_data, uint32 arg_len,
FreeFunc arg_ff = free_func_delete)
: data(arg_data), len(arg_len), free_func(arg_ff)
{ }
~Chunk()
{ free_func(data); }
char* data;
uint32 len;
} Chunk;
FreeFunc free_func;
};
// Initialization before any I/O operation is performed. Returns false
// on any form of error.

View file

@ -372,10 +372,7 @@ static bool sendCMsg(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id)
CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
new (msg) CMsg(msg_type, id);
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(CMsg);
c->data = (char*) msg;
ChunkedIO::Chunk* c = new ChunkedIO::Chunk((char*)msg, sizeof(CMsg));
return io->Write(c);
}
@ -386,10 +383,7 @@ static ChunkedIO::Chunk* makeSerialMsg(RemoteSerializer::PeerID id)
CMsg* msg = (CMsg*) new char[sizeof(CMsg)];
new (msg) CMsg(MSG_SERIAL, id);
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(CMsg);
c->data = (char*) msg;
ChunkedIO::Chunk* c = new ChunkedIO::Chunk((char*)msg, sizeof(CMsg));
return c;
}
@ -424,7 +418,7 @@ static bool sendToIO(ChunkedIO* io, ChunkedIO::Chunk* c)
}
static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
const char* str, int len = -1)
const char* str, int len = -1, bool delete_with_free = false)
{
if ( ! sendCMsg(io, msg_type, id) )
{
@ -432,9 +426,14 @@ static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
return false;
}
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = len >= 0 ? len : strlen(str) + 1;
c->data = const_cast<char*>(str);
uint32 sz = len >= 0 ? len : strlen(str) + 1;
ChunkedIO::Chunk* c = new ChunkedIO::Chunk(const_cast<char*>(str), sz);
if ( delete_with_free )
c->free_func = ChunkedIO::Chunk::free_func_free;
else
c->free_func = ChunkedIO::Chunk::free_func_delete;
return sendToIO(io, c);
}
@ -455,10 +454,8 @@ static bool sendToIO(ChunkedIO* io, char msg_type, RemoteSerializer::PeerID id,
for ( int i = 0; i < nargs; i++ )
args[i] = htonl(va_arg(ap, uint32));
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = sizeof(uint32) * nargs;
c->data = (char*) args;
ChunkedIO::Chunk* c = new ChunkedIO::Chunk((char*)args,
sizeof(uint32) * nargs);
return sendToIO(io, c);
}
@ -1529,7 +1526,6 @@ bool RemoteSerializer::Poll(bool may_block)
current_msgtype = msg->Type();
current_args = 0;
delete [] c->data;
delete c;
switch ( current_msgtype ) {
@ -1592,7 +1588,6 @@ bool RemoteSerializer::Poll(bool may_block)
msgstate = TYPE;
bool result = DoMessage();
delete [] current_args->data;
delete current_args;
current_args = 0;
@ -1787,9 +1782,7 @@ void RemoteSerializer::PeerConnected(Peer* peer)
*args++ = htonl(peer->our_runtime);
strcpy((char*) args, peer->our_class.c_str());
ChunkedIO::Chunk* c = new ChunkedIO::Chunk;
c->len = len;
c->data = data;
ChunkedIO::Chunk* c = new ChunkedIO::Chunk(data, len);
if ( peer->our_class.size() )
Log(LogInfo, fmt("sending class \"%s\"", peer->our_class.c_str()), peer);
@ -2568,8 +2561,8 @@ bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal*
goto error;
c = new ChunkedIO::Chunk;
c->data = 0;
c->len = fmt.EndWrite(&c->data);
c->free_func = ChunkedIO::Chunk::free_func_free;
if ( ! SendToChild(c) )
goto error;
@ -2577,11 +2570,7 @@ bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal*
return true;
error:
if ( c )
{
delete [] c->data;
delete c;
}
delete c;
FatalError(io->Error());
return false;
@ -2643,21 +2632,21 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
{
if ( ! FlushLogBuffer(peer) )
{
delete [] data;
free(data);
return false;
}
}
// If the data is actually larger than our complete buffer, just send it out.
if ( len > LOG_BUFFER_SIZE )
return SendToChild(MSG_LOG_WRITE, peer, data, len);
return SendToChild(MSG_LOG_WRITE, peer, data, len, true);
// Now we have space in the buffer, copy it into there.
memcpy(peer->log_buffer + peer->log_buffer_used, data, len);
peer->log_buffer_used += len;
assert(peer->log_buffer_used <= LOG_BUFFER_SIZE);
delete [] data;
free(data);
return true;
@ -3112,14 +3101,19 @@ bool RemoteSerializer::SendCMsgToChild(char msg_type, Peer* peer)
return true;
}
bool RemoteSerializer::SendToChild(char type, Peer* peer, char* str, int len)
bool RemoteSerializer::SendToChild(char type, Peer* peer, char* str, int len,
bool delete_with_free)
{
DEBUG_COMM(fmt("parent: (->child) %s (#%" PRI_SOURCE_ID ", %s)", msgToStr(type), peer ? peer->id : PEER_NONE, str));
if ( child_pid && sendToIO(io, type, peer ? peer->id : PEER_NONE, str, len) )
if ( child_pid && sendToIO(io, type, peer ? peer->id : PEER_NONE, str, len,
delete_with_free) )
return true;
delete [] str;
if ( delete_with_free )
free(str);
else
delete [] str;
if ( ! child_pid )
return false;
@ -3169,7 +3163,7 @@ bool RemoteSerializer::SendToChild(ChunkedIO::Chunk* c)
if ( child_pid && sendToIO(io, c) )
return true;
delete [] c->data;
c->free_func(c->data);
c->data = 0;
if ( ! child_pid )
@ -3545,7 +3539,6 @@ bool SocketComm::ProcessParentMessage()
parent_msgtype = msg->Type();
parent_args = 0;
delete [] c->data;
delete c;
switch ( parent_msgtype ) {
@ -3600,7 +3593,6 @@ bool SocketComm::ProcessParentMessage()
if ( parent_args )
{
delete [] parent_args->data;
delete parent_args;
parent_args = 0;
}
@ -3900,7 +3892,6 @@ bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer)
peer->state = msg->Type();
}
delete [] c->data;
delete c;
break;

View file

@ -317,7 +317,8 @@ protected:
// Communication helpers
bool SendCMsgToChild(char msg_type, Peer* peer);
bool SendToChild(char type, Peer* peer, char* str, int len = -1);
bool SendToChild(char type, Peer* peer, char* str, int len = -1,
bool delete_with_free = false);
bool SendToChild(char type, Peer* peer, int nargs, ...); // can send uints32 only
bool SendToChild(ChunkedIO::Chunk* c);

View file

@ -5,6 +5,8 @@
#include "Serializer.h"
#include "Reporter.h"
const float SerializationFormat::GROWTH_FACTOR = 2.5;
SerializationFormat::SerializationFormat()
: output(), output_size(), output_pos(), input(), input_len(), input_pos(),
bytes_written(), bytes_read()
@ -49,9 +51,12 @@ void SerializationFormat::StartWrite()
uint32 SerializationFormat::EndWrite(char** data)
{
*data = new char[output_pos];
memcpy(*data, output, output_pos);
return output_pos;
uint32 rval = output_pos;
*data = output;
output = 0;
output_size = 0;
output_pos = 0;
return rval;
}
bool SerializationFormat::ReadData(void* b, size_t count)
@ -75,7 +80,7 @@ bool SerializationFormat::WriteData(const void* b, size_t count)
// Increase buffer if necessary.
while ( output_pos + count > output_size )
{
output_size += output_size * 1.5;
output_size *= GROWTH_FACTOR;
output = (char*)safe_realloc(output, output_size);
}

View file

@ -44,7 +44,15 @@ public:
// Serialization.
virtual void StartWrite();
virtual uint32 EndWrite(char** data); // passes ownership
/**
* Retrieves serialized data.
* @param data A pointer that will be assigned to point at the internal
* buffer containing serialized data. The memory should
* be reclaimed using "free()".
* @return The number of bytes in the buffer object assigned to \a data.
*/
virtual uint32 EndWrite(char** data);
virtual bool Write(int v, const char* tag) = 0;
virtual bool Write(uint16 v, const char* tag) = 0;
@ -74,6 +82,7 @@ protected:
bool WriteData(const void* buf, size_t count);
static const uint32 INITIAL_SIZE = 65536;
static const float GROWTH_FACTOR;
char* output;
uint32 output_size;
uint32 output_pos;

View file

@ -81,6 +81,7 @@ bool Serializer::EndSerialization(SerialInfo* info)
ChunkedIO::Chunk* chunk = new ChunkedIO::Chunk;
chunk->len = format->EndWrite(&chunk->data);
chunk->free_func = ChunkedIO::Chunk::free_func_free;
if ( ! io->Write(chunk) )
{
@ -282,7 +283,6 @@ int Serializer::Unserialize(UnserialInfo* info, bool block)
if ( ! info->chunk )
{ // only delete if we allocated it ourselves
delete [] chunk->data;
delete chunk;
}

View file

@ -131,7 +131,7 @@ BroType* BroType::Clone() const
sinfo.cache = false;
this->Serialize(&sinfo);
char* data = 0;
char* data;
uint32 len = form->EndWrite(&data);
form->StartRead(data, len);
@ -141,7 +141,7 @@ BroType* BroType::Clone() const
BroType* rval = this->Unserialize(&uinfo, false);
assert(rval != this);
delete [] data;
free(data);
return rval;
}

View file

@ -92,8 +92,7 @@ Val* Val::Clone() const
uinfo.cache = false;
Val* clone = Unserialize(&uinfo, type);
delete [] data;
free(data);
return clone;
}