Fixing memory (and CPU) leak in log writer.

There was larger bug with remote logging when local writing was
disabled, as in the cluster.

Also fixing a number of smaller "potential" leaks.
This commit is contained in:
Robin Sommer 2011-08-05 12:02:25 -07:00
parent e738af73a0
commit eb508fae52
3 changed files with 68 additions and 22 deletions

View file

@ -398,13 +398,15 @@ LogMgr::Stream::~Stream()
{ {
WriterInfo* winfo = i->second; WriterInfo* winfo = i->second;
if ( ! winfo )
continue;
if ( winfo->rotation_timer ) if ( winfo->rotation_timer )
timer_mgr->Cancel(winfo->rotation_timer); timer_mgr->Cancel(winfo->rotation_timer);
Unref(winfo->type); Unref(winfo->type);
delete winfo->writer; delete winfo->writer;
delete i->second; delete winfo;
} }
for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f ) for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f )
@ -437,7 +439,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ ) for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{ {
if ( j->second->writer->Disabled() ) if ( j->second && j->second->writer->Disabled() )
{ {
delete j->second; delete j->second;
disabled.push_back(j->first); disabled.push_back(j->first);
@ -900,8 +902,8 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
LogWriter* writer = 0; LogWriter* writer = 0;
if ( w != stream->writers.end() ) if ( w != stream->writers.end() )
// We have a writer already. // We know this writer already.
writer = w->second->writer; writer = w->second ? w->second->writer : 0;
else else
{ {
@ -926,6 +928,11 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
return false; return false;
} }
} }
else
// Insert a null pointer into the map to make
// sure we don't try creating it again.
stream->writers.insert(Stream::WriterMap::value_type(
Stream::WriterPathPair(filter->writer->AsEnum(), path), 0));
if ( filter->remote ) if ( filter->remote )
remote_serializer->SendLogCreateWriter(stream->id, remote_serializer->SendLogCreateWriter(stream->id,
@ -937,24 +944,36 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
// Alright, can do the write now. // Alright, can do the write now.
LogVal** vals = RecordToFilterVals(stream, filter, columns); if ( filter->local || filter->remote )
{
LogVal** vals = RecordToFilterVals(stream, filter, columns);
if ( filter->remote ) if ( filter->remote )
remote_serializer->SendLogWrite(stream->id, remote_serializer->SendLogWrite(stream->id,
filter->writer, filter->writer,
path, path,
filter->num_fields, filter->num_fields,
vals); vals);
if ( filter->local )
{
assert(writer);
// Write takes ownership of vals.
if ( ! writer->Write(filter->num_fields, vals) )
error = true;
}
else
DeleteVals(filter->num_fields, vals);
}
if ( filter->local && ! writer->Write(filter->num_fields, vals) )
error = true;
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'",
filter->name.c_str(), stream->name.c_str()); filter->name.c_str(), stream->name.c_str());
#endif #endif
delete [] vals;
} }
Unref(columns); Unref(columns);
@ -1124,7 +1143,7 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
Stream::WriterMap::iterator w = Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path)); stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w != stream->writers.end() ) if ( w != stream->writers.end() && w->second )
// If we already have a writer for this. That's fine, we just // If we already have a writer for this. That's fine, we just
// return it. // return it.
return w->second->writer; return w->second->writer;
@ -1194,6 +1213,14 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return writer_obj; return writer_obj;
} }
void LogMgr::DeleteVals(int num_fields, LogVal** vals)
{
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}
bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
LogVal** vals) LogVal** vals)
{ {
@ -1208,11 +1235,15 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()", DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()",
desc.Description()); desc.Description());
#endif #endif
DeleteVals(num_fields, vals);
return false; return false;
} }
if ( ! stream->enabled ) if ( ! stream->enabled )
{
DeleteVals(num_fields, vals);
return true; return true;
}
Stream::WriterMap::iterator w = Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path)); stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
@ -1226,10 +1257,11 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()", DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()",
desc.Description()); desc.Description());
#endif #endif
DeleteVals(num_fields, vals);
return false; return false;
} }
bool success = w->second->writer->Write(num_fields, vals); bool success = (w->second ? w->second->writer->Write(num_fields, vals) : true);
DBG_LOG(DBG_LOGGING, DBG_LOG(DBG_LOGGING,
"Wrote pre-filtered record to path '%s' on stream '%s' [%s]", "Wrote pre-filtered record to path '%s' on stream '%s' [%s]",
@ -1250,7 +1282,11 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
for ( Stream::WriterMap::iterator i = stream->writers.begin(); for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ ) i != stream->writers.end(); i++ )
{ {
if ( ! i->second )
continue;
LogWriter* writer = i->second->writer; LogWriter* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id, remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val, &writer_val,
@ -1269,7 +1305,10 @@ bool LogMgr::SetBuf(EnumVal* id, bool enabled)
for ( Stream::WriterMap::iterator i = stream->writers.begin(); for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ ) i != stream->writers.end(); i++ )
i->second->writer->SetBuf(enabled); {
if ( i->second )
i->second->writer->SetBuf(enabled);
}
RemoveDisabledWriters(stream); RemoveDisabledWriters(stream);
@ -1287,7 +1326,10 @@ bool LogMgr::Flush(EnumVal* id)
for ( Stream::WriterMap::iterator i = stream->writers.begin(); for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ ) i != stream->writers.end(); i++ )
i->second->writer->Flush(); {
if ( i->second )
i->second->writer->Flush();
}
RemoveDisabledWriters(stream); RemoveDisabledWriters(stream);

View file

@ -106,6 +106,9 @@ protected:
// Reports an error for the given writer. // Reports an error for the given writer.
void Error(LogWriter* writer, const char* msg); void Error(LogWriter* writer, const char* msg);
// Deletes the values as passed into Write().
void DeleteVals(int num_fields, LogVal** vals);
private: private:
struct Filter; struct Filter;
struct Stream; struct Stream;

View file

@ -47,6 +47,7 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals)
DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)", DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)",
arg_num_fields, num_fields); arg_num_fields, num_fields);
DeleteVals(vals);
return false; return false;
} }
@ -56,6 +57,7 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals)
{ {
DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)", DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)",
vals[i]->type, fields[i]->type); vals[i]->type, fields[i]->type);
DeleteVals(vals);
return false; return false;
} }
} }
@ -146,8 +148,7 @@ void LogWriter::Error(const char *msg)
void LogWriter::DeleteVals(LogVal** vals) void LogWriter::DeleteVals(LogVal** vals)
{ {
for ( int i = 0; i < num_fields; i++ ) log_mgr->DeleteVals(num_fields, vals);
delete vals[i];
} }
bool LogWriter::RunPostProcessor(string fname, string postprocessor, bool LogWriter::RunPostProcessor(string fname, string postprocessor,