Merge remote-tracking branch 'remotes/origin/topic/robin/logging-fix'

* remotes/origin/topic/robin/logging-fix:
  Fixing crash in memory profiling.
  Fix compiler warning.
  Fixing memory (and CPU) leak in log writer.
  Fixing missing sync in cluster setup.
  Updating submodule(s).
This commit is contained in:
Robin Sommer 2011-08-05 19:07:32 -07:00
commit e2d9a57947
10 changed files with 89 additions and 32 deletions

11
CHANGES
View file

@ -1,4 +1,15 @@
1.6-dev-1044 | 2011-08-05 19:07:32 -0700
* Fixing memory (and CPU) leak in log writer.
* Fixing crash in memory profiling. (Robin Sommer)
* Fix compiler warning. (Robin Sommer)
* Fixing missing sync in cluster setup. (Robin Sommer)
1.6-dev-1038 | 2011-08-05 18:25:44 -0700
* Smaller updates to script docs and their generation. (Jon Siwek)

View file

@ -1 +1 @@
1.6-dev-1038
1.6-dev-1044

@ -1 +1 @@
Subproject commit 2455dbebc15f06b3cc5eccb701727baf5472cf24
Subproject commit 6df97331bb74d02ef2252138b301e4ca14523962

@ -1 +1 @@
Subproject commit 2ad87692db1cb5b104937bdde8cd6e0447f8ad94
Subproject commit bebd494cb6c2572561638a0bf67fa4e73c635330

View file

@ -70,7 +70,7 @@ event bro_init() &priority=9
if ( n$node_type == PROXY && me$proxy == i )
Communication::nodes["proxy"] = [$host=nodes[i]$ip, $p=nodes[i]$p,
$connect=T, $retry=1mins,
$connect=T, $retry=1mins, $sync=T,
$class=node];
if ( n$node_type == TIME_MACHINE && me?$time_machine && me$time_machine == i )

View file

@ -398,13 +398,15 @@ LogMgr::Stream::~Stream()
{
WriterInfo* winfo = i->second;
if ( ! winfo )
continue;
if ( winfo->rotation_timer )
timer_mgr->Cancel(winfo->rotation_timer);
Unref(winfo->type);
delete winfo->writer;
delete i->second;
delete winfo;
}
for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f )
@ -437,7 +439,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{
if ( j->second->writer->Disabled() )
if ( j->second && j->second->writer->Disabled() )
{
delete j->second;
disabled.push_back(j->first);
@ -900,8 +902,8 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
LogWriter* writer = 0;
if ( w != stream->writers.end() )
// We have a writer already.
writer = w->second->writer;
// We know this writer already.
writer = w->second ? w->second->writer : 0;
else
{
@ -926,6 +928,11 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
return false;
}
}
else
// Insert a null pointer into the map to make
// sure we don't try creating it again.
stream->writers.insert(Stream::WriterMap::value_type(
Stream::WriterPathPair(filter->writer->AsEnum(), path), 0));
if ( filter->remote )
remote_serializer->SendLogCreateWriter(stream->id,
@ -937,6 +944,8 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
// Alright, can do the write now.
if ( filter->local || filter->remote )
{
LogVal** vals = RecordToFilterVals(stream, filter, columns);
if ( filter->remote )
@ -946,15 +955,25 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
filter->num_fields,
vals);
if ( filter->local && ! writer->Write(filter->num_fields, vals) )
if ( filter->local )
{
assert(writer);
// Write takes ownership of vals.
if ( ! writer->Write(filter->num_fields, vals) )
error = true;
}
else
DeleteVals(filter->num_fields, vals);
}
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'",
filter->name.c_str(), stream->name.c_str());
#endif
delete [] vals;
}
Unref(columns);
@ -1124,7 +1143,7 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w != stream->writers.end() )
if ( w != stream->writers.end() && w->second )
// If we already have a writer for this. That's fine, we just
// return it.
return w->second->writer;
@ -1194,6 +1213,14 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return writer_obj;
}
void LogMgr::DeleteVals(int num_fields, LogVal** vals)
{
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}
bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
LogVal** vals)
{
@ -1208,11 +1235,15 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()",
desc.Description());
#endif
DeleteVals(num_fields, vals);
return false;
}
if ( ! stream->enabled )
{
DeleteVals(num_fields, vals);
return true;
}
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
@ -1226,10 +1257,11 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()",
desc.Description());
#endif
DeleteVals(num_fields, vals);
return false;
}
bool success = w->second->writer->Write(num_fields, vals);
bool success = (w->second ? w->second->writer->Write(num_fields, vals) : true);
DBG_LOG(DBG_LOGGING,
"Wrote pre-filtered record to path '%s' on stream '%s' [%s]",
@ -1250,7 +1282,11 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( ! i->second )
continue;
LogWriter* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val,
@ -1269,7 +1305,10 @@ bool LogMgr::SetBuf(EnumVal* id, bool enabled)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( i->second )
i->second->writer->SetBuf(enabled);
}
RemoveDisabledWriters(stream);
@ -1287,7 +1326,10 @@ bool LogMgr::Flush(EnumVal* id)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( i->second )
i->second->writer->Flush();
}
RemoveDisabledWriters(stream);

View file

@ -106,6 +106,9 @@ protected:
// Reports an error for the given writer.
void Error(LogWriter* writer, const char* msg);
// Deletes the values as passed into Write().
void DeleteVals(int num_fields, LogVal** vals);
private:
struct Filter;
struct Stream;

View file

@ -47,6 +47,7 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals)
DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)",
arg_num_fields, num_fields);
DeleteVals(vals);
return false;
}
@ -56,6 +57,7 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals)
{
DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)",
vals[i]->type, fields[i]->type);
DeleteVals(vals);
return false;
}
}
@ -146,8 +148,7 @@ void LogWriter::Error(const char *msg)
void LogWriter::DeleteVals(LogVal** vals)
{
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
log_mgr->DeleteVals(num_fields, vals);
}
bool LogWriter::RunPostProcessor(string fname, string postprocessor,

View file

@ -3163,12 +3163,11 @@ unsigned int RecordVal::MemoryAllocation() const
{
unsigned int size = 0;
for ( int i = 0; i < type->AsRecordType()->NumFields(); ++i )
{
Val* v = (*val.val_list_val)[i];
const val_list* vl = AsRecord();
// v might be nil for records that don't wind
// up being set to a value.
loop_over_list(*vl, i)
{
Val* v = (*vl)[i];
if ( v )
size += v->MemoryAllocation();
}

View file

@ -799,6 +799,7 @@ EnumVal* map_conn_type(TransportProto tp)
// Cannot be reached;
assert(false);
return 0; // Make compiler happy.
}
%%}