Fixing problem logging remotely when local logging was turned off.

For that, moved the remote logging from the Manager to the
WriterFrontend. That also simplifies the Manager a bit.
This commit is contained in:
Robin Sommer 2012-03-08 17:14:58 -08:00
parent 554a29b3ed
commit c0678e7e1f
6 changed files with 116 additions and 75 deletions

View file

@ -234,7 +234,7 @@ static const int PRINT_BUFFER_SIZE = 10 * 1024;
static const int SOCKBUF_SIZE = 1024 * 1024; static const int SOCKBUF_SIZE = 1024 * 1024;
// Buffer size for remote-log data. // Buffer size for remote-log data.
static const int LOG_BUFFER_SIZE = 50 * 1024; static const int LOG_BUFFER_SIZE = 512;
struct ping_args { struct ping_args {
uint32 seq; uint32 seq;
@ -2587,7 +2587,10 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) || (network_time - last_flush > 1.0) ) if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) || (network_time - last_flush > 1.0) )
{ {
if ( ! FlushLogBuffer(peer) ) if ( ! FlushLogBuffer(peer) )
{
delete [] data;
return false; return false;
}
} }
// If the data is actually larger than our complete buffer, just send it out. // If the data is actually larger than our complete buffer, just send it out.
@ -2631,6 +2634,12 @@ bool RemoteSerializer::ProcessLogCreateWriter()
if ( current_peer->state == Peer::CLOSING ) if ( current_peer->state == Peer::CLOSING )
return false; return false;
#ifdef USE_PERFTOOLS
// Don't track allocations here, they'll be released only after the
// main loop exists. And it's just a tiny amount anyway.
HeapLeakChecker::Disabler disabler;
#endif
assert(current_args); assert(current_args);
EnumVal* id_val = 0; EnumVal* id_val = 0;
@ -2666,7 +2675,7 @@ bool RemoteSerializer::ProcessLogCreateWriter()
id_val = new EnumVal(id, BifType::Enum::Log::ID); id_val = new EnumVal(id, BifType::Enum::Log::ID);
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) ) if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields, true, false) )
goto error; goto error;
Unref(id_val); Unref(id_val);

View file

@ -753,64 +753,25 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j ) for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new Field(*filter->fields[j]); arg_fields[j] = new Field(*filter->fields[j]);
if ( filter->remote ) writer = CreateWriter(stream->id, filter->writer,
remote_serializer->SendLogCreateWriter(stream->id, path, filter->num_fields,
filter->writer, arg_fields, filter->local, filter->remote);
path,
filter->num_fields,
arg_fields);
if ( filter->local ) if ( ! writer )
{ {
writer = CreateWriter(stream->id, filter->writer, Unref(columns);
path, filter->num_fields, return false;
arg_fields);
if ( ! writer )
{
Unref(columns);
return false;
}
} }
else
{
// Insert a null pointer into the map to make
// sure we don't try creating it again.
stream->writers.insert(Stream::WriterMap::value_type(
Stream::WriterPathPair(filter->writer->AsEnum(), path), 0));
for( int i = 0; i < filter->num_fields; ++i)
delete arg_fields[i];
delete [] arg_fields;
}
} }
// Alright, can do the write now. // Alright, can do the write now.
if ( filter->local || filter->remote ) threading::Value** vals = RecordToFilterVals(stream, filter, columns);
{
threading::Value** vals = RecordToFilterVals(stream, filter, columns);
if ( filter->remote )
remote_serializer->SendLogWrite(stream->id,
filter->writer,
path,
filter->num_fields,
vals);
if ( filter->local )
{
// Write takes ownership of vals.
assert(writer);
writer->Write(filter->num_fields, vals);
}
else
DeleteVals(filter->num_fields, vals);
}
// Write takes ownership of vals.
assert(writer);
writer->Write(filter->num_fields, vals);
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'",
@ -976,7 +937,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
} }
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const Field* const* fields) int num_fields, const Field* const* fields, bool local, bool remote)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -992,7 +953,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
// return it. // return it.
return w->second->writer; return w->second->writer;
WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum()); WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
assert(writer_obj); assert(writer_obj);
writer_obj->Init(path, num_fields, fields); writer_obj->Init(path, num_fields, fields);

View file

@ -159,7 +159,8 @@ protected:
// Takes ownership of fields. // Takes ownership of fields.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path, WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const threading::Field* const* fields); int num_fields, const threading::Field* const* fields,
bool local, bool remote);
// Takes ownership of values.. // Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path, bool Write(EnumVal* id, EnumVal* writer, string path,

View file

@ -99,21 +99,36 @@ public:
using namespace logging; using namespace logging;
WriterFrontend::WriterFrontend(bro_int_t type) WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
{ {
stream = arg_stream;
writer = arg_writer;
Ref(stream);
Ref(writer);
disabled = initialized = false; disabled = initialized = false;
buf = true; buf = true;
local = arg_local;
remote = arg_remote;
write_buffer = 0; write_buffer = 0;
write_buffer_pos = 0; write_buffer_pos = 0;
ty_name = "<not set>"; ty_name = "<not set>";
backend = log_mgr->CreateBackend(this, type);
assert(backend); if ( local )
backend->Start(); {
backend = log_mgr->CreateBackend(this, writer->AsEnum());
assert(backend);
backend->Start();
}
else
backend = 0;
} }
WriterFrontend::~WriterFrontend() WriterFrontend::~WriterFrontend()
{ {
Unref(stream);
Unref(writer);
} }
string WriterFrontend::Name() const string WriterFrontend::Name() const
@ -128,7 +143,9 @@ void WriterFrontend::Stop()
{ {
FlushWriteBuffer(); FlushWriteBuffer();
SetDisable(); SetDisable();
backend->Stop();
if ( backend )
backend->Stop();
} }
void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields) void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields)
@ -144,7 +161,17 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
fields = arg_fields; fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
if ( backend )
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
if ( remote )
remote_serializer->SendLogCreateWriter(stream,
writer,
arg_path,
arg_num_fields,
arg_fields);
} }
void WriterFrontend::Write(int num_fields, Value** vals) void WriterFrontend::Write(int num_fields, Value** vals)
@ -152,6 +179,19 @@ void WriterFrontend::Write(int num_fields, Value** vals)
if ( disabled ) if ( disabled )
return; return;
if ( remote )
remote_serializer->SendLogWrite(stream,
writer,
path,
num_fields,
vals);
if ( ! backend )
{
DeleteVals(vals);
return;
}
if ( ! write_buffer ) if ( ! write_buffer )
{ {
// Need new buffer. // Need new buffer.
@ -173,7 +213,8 @@ void WriterFrontend::FlushWriteBuffer()
// Nothing to do. // Nothing to do.
return; return;
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); if ( backend )
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer));
// Clear buffer (no delete, we pass ownership to child thread.) // Clear buffer (no delete, we pass ownership to child thread.)
write_buffer = 0; write_buffer = 0;
@ -187,7 +228,8 @@ void WriterFrontend::SetBuf(bool enabled)
buf = enabled; buf = enabled;
backend->SendIn(new SetBufMessage(backend, enabled)); if ( backend )
backend->SendIn(new SetBufMessage(backend, enabled));
if ( ! buf ) if ( ! buf )
// Make sure no longer buffer any still queued data. // Make sure no longer buffer any still queued data.
@ -200,7 +242,9 @@ void WriterFrontend::Flush()
return; return;
FlushWriteBuffer(); FlushWriteBuffer();
backend->SendIn(new FlushMessage(backend));
if ( backend )
backend->SendIn(new FlushMessage(backend));
} }
void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating)
@ -209,7 +253,9 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool
return; return;
FlushWriteBuffer(); FlushWriteBuffer();
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
if ( backend )
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
} }
void WriterFrontend::Finish() void WriterFrontend::Finish()
@ -218,7 +264,18 @@ void WriterFrontend::Finish()
return; return;
FlushWriteBuffer(); FlushWriteBuffer();
backend->SendIn(new FinishMessage(backend));
if ( backend )
backend->SendIn(new FinishMessage(backend));
}
void WriterFrontend::DeleteVals(Value** vals)
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
} }

View file

@ -25,14 +25,21 @@ public:
/** /**
* Constructor. * Constructor.
* *
* type: The backend writer type, with the value corresponding to the * stream: The logging stream.
*
* writer: The backend writer type, with the value corresponding to the
* script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The * script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The
* frontend will internally instantiate a WriterBackend of the * frontend will internally instantiate a WriterBackend of the
* corresponding type. * corresponding type.
*
* local: If true, the writer will instantiate a local backend.
*
* remote: If true, the writer will forward all data to remote
* clients.
* *
* Frontends must only be instantiated by the main thread. * Frontends must only be instantiated by the main thread.
*/ */
WriterFrontend(bro_int_t type); WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote);
/** /**
* Destructor. * Destructor.
@ -187,10 +194,17 @@ public:
protected: protected:
friend class Manager; friend class Manager;
void DeleteVals(threading::Value** vals);
EnumVal* stream;
EnumVal* writer;
WriterBackend* backend; // The backend we have instanatiated. WriterBackend* backend; // The backend we have instanatiated.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.
bool buf; // True if buffering is enabled (default). bool buf; // True if buffering is enabled (default).
bool local; // True if logging locally.
bool remote; // True if loggin remotely.
string ty_name; // Name of the backend type. Set by the manager. string ty_name; // Name of the backend type. Set by the manager.
string path; // The log path. string path; // The log path.

View file

@ -20,8 +20,8 @@ BasicThread::BasicThread()
terminating = false; terminating = false;
pthread = 0; pthread = 0;
buf = 0; buf_len = 2048;
buf_len = 1024; buf = (char*) malloc(buf_len);
name = Fmt("thread-%d", ++thread_counter); name = Fmt("thread-%d", ++thread_counter);
@ -57,9 +57,6 @@ void BasicThread::SetOSName(const string& name)
const char* BasicThread::Fmt(const char* format, ...) const char* BasicThread::Fmt(const char* format, ...)
{ {
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al; va_list al;
va_start(al, format); va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al); int n = safe_vsnprintf(buf, buf_len, format, al);
@ -67,13 +64,15 @@ const char* BasicThread::Fmt(const char* format, ...)
if ( (unsigned int) n >= buf_len ) if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer. { // Not enough room, grow the buffer.
buf_len = n + 32; int tmp_len = n + 32;
buf = (char*) realloc(buf, buf_len); char* tmp = (char*) malloc(tmp_len);
// Is it portable to restart? // Is it portable to restart?
va_start(al, format); va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al); n = safe_vsnprintf(tmp, tmp_len, format, al);
va_end(al); va_end(al);
free(tmp);
} }
return buf; return buf;