diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 4b8f527f2b..c6b9623096 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -234,7 +234,7 @@ static const int PRINT_BUFFER_SIZE = 10 * 1024; static const int SOCKBUF_SIZE = 1024 * 1024; // Buffer size for remote-log data. -static const int LOG_BUFFER_SIZE = 50 * 1024; +static const int LOG_BUFFER_SIZE = 512; struct ping_args { uint32 seq; @@ -2587,7 +2587,10 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) || (network_time - last_flush > 1.0) ) { if ( ! FlushLogBuffer(peer) ) + { + delete [] data; return false; + } } // If the data is actually larger than our complete buffer, just send it out. @@ -2631,6 +2634,12 @@ bool RemoteSerializer::ProcessLogCreateWriter() if ( current_peer->state == Peer::CLOSING ) return false; +#ifdef USE_PERFTOOLS + // Don't track allocations here, they'll be released only after the + // main loop exists. And it's just a tiny amount anyway. + HeapLeakChecker::Disabler disabler; +#endif + assert(current_args); EnumVal* id_val = 0; @@ -2666,7 +2675,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() id_val = new EnumVal(id, BifType::Enum::Log::ID); writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); - if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) ) + if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields, true, false) ) goto error; Unref(id_val); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 0753296cb4..14fb3428fe 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -753,64 +753,25 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) for ( int j = 0; j < filter->num_fields; ++j ) arg_fields[j] = new Field(*filter->fields[j]); - if ( filter->remote ) - remote_serializer->SendLogCreateWriter(stream->id, - filter->writer, - path, - filter->num_fields, - arg_fields); + writer = CreateWriter(stream->id, filter->writer, + path, filter->num_fields, + arg_fields, filter->local, filter->remote); - if ( filter->local ) + if ( ! writer ) { - writer = CreateWriter(stream->id, filter->writer, - path, filter->num_fields, - arg_fields); - - if ( ! writer ) - { - Unref(columns); - return false; - } + Unref(columns); + return false; } - else - { - // Insert a null pointer into the map to make - // sure we don't try creating it again. - stream->writers.insert(Stream::WriterMap::value_type( - Stream::WriterPathPair(filter->writer->AsEnum(), path), 0)); - for( int i = 0; i < filter->num_fields; ++i) - delete arg_fields[i]; - - delete [] arg_fields; - } } // Alright, can do the write now. - if ( filter->local || filter->remote ) - { - threading::Value** vals = RecordToFilterVals(stream, filter, columns); - - if ( filter->remote ) - remote_serializer->SendLogWrite(stream->id, - filter->writer, - path, - filter->num_fields, - vals); - - if ( filter->local ) - { - // Write takes ownership of vals. - assert(writer); - writer->Write(filter->num_fields, vals); - } - - else - DeleteVals(filter->num_fields, vals); - - } + threading::Value** vals = RecordToFilterVals(stream, filter, columns); + // Write takes ownership of vals. + assert(writer); + writer->Write(filter->num_fields, vals); #ifdef DEBUG DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", @@ -976,7 +937,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, } WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, const Field* const* fields) + int num_fields, const Field* const* fields, bool local, bool remote) { Stream* stream = FindStream(id); @@ -992,7 +953,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, // return it. return w->second->writer; - WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum()); + WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote); assert(writer_obj); writer_obj->Init(path, num_fields, fields); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index d931bfaef8..bf097c5e1a 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -159,7 +159,8 @@ protected: // Takes ownership of fields. WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, const threading::Field* const* fields); + int num_fields, const threading::Field* const* fields, + bool local, bool remote); // Takes ownership of values.. bool Write(EnumVal* id, EnumVal* writer, string path, diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 02f1a188d8..26e8eaf22e 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -99,21 +99,36 @@ public: using namespace logging; -WriterFrontend::WriterFrontend(bro_int_t type) +WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) { + stream = arg_stream; + writer = arg_writer; + Ref(stream); + Ref(writer); + disabled = initialized = false; buf = true; + local = arg_local; + remote = arg_remote; write_buffer = 0; write_buffer_pos = 0; ty_name = ""; - backend = log_mgr->CreateBackend(this, type); - assert(backend); - backend->Start(); + if ( local ) + { + backend = log_mgr->CreateBackend(this, writer->AsEnum()); + assert(backend); + backend->Start(); + } + + else + backend = 0; } WriterFrontend::~WriterFrontend() { + Unref(stream); + Unref(writer); } string WriterFrontend::Name() const @@ -128,7 +143,9 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); - backend->Stop(); + + if ( backend ) + backend->Stop(); } void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields) @@ -144,7 +161,17 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + + if ( backend ) + backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + + if ( remote ) + remote_serializer->SendLogCreateWriter(stream, + writer, + arg_path, + arg_num_fields, + arg_fields); + } void WriterFrontend::Write(int num_fields, Value** vals) @@ -152,6 +179,19 @@ void WriterFrontend::Write(int num_fields, Value** vals) if ( disabled ) return; + if ( remote ) + remote_serializer->SendLogWrite(stream, + writer, + path, + num_fields, + vals); + + if ( ! backend ) + { + DeleteVals(vals); + return; + } + if ( ! write_buffer ) { // Need new buffer. @@ -173,7 +213,8 @@ void WriterFrontend::FlushWriteBuffer() // Nothing to do. return; - backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); + if ( backend ) + backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); // Clear buffer (no delete, we pass ownership to child thread.) write_buffer = 0; @@ -187,7 +228,8 @@ void WriterFrontend::SetBuf(bool enabled) buf = enabled; - backend->SendIn(new SetBufMessage(backend, enabled)); + if ( backend ) + backend->SendIn(new SetBufMessage(backend, enabled)); if ( ! buf ) // Make sure no longer buffer any still queued data. @@ -200,7 +242,9 @@ void WriterFrontend::Flush() return; FlushWriteBuffer(); - backend->SendIn(new FlushMessage(backend)); + + if ( backend ) + backend->SendIn(new FlushMessage(backend)); } void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) @@ -209,7 +253,9 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool return; FlushWriteBuffer(); - backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); + + if ( backend ) + backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); } void WriterFrontend::Finish() @@ -218,7 +264,18 @@ void WriterFrontend::Finish() return; FlushWriteBuffer(); - backend->SendIn(new FinishMessage(backend)); + + if ( backend ) + backend->SendIn(new FinishMessage(backend)); + } + +void WriterFrontend::DeleteVals(Value** vals) + { + // Note this code is duplicated in Manager::DeleteVals(). + for ( int i = 0; i < num_fields; i++ ) + delete vals[i]; + + delete [] vals; } diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 4386a15f64..3e05d17c9e 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -25,14 +25,21 @@ public: /** * Constructor. * - * type: The backend writer type, with the value corresponding to the + * stream: The logging stream. + * + * writer: The backend writer type, with the value corresponding to the * script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The * frontend will internally instantiate a WriterBackend of the * corresponding type. + * + * local: If true, the writer will instantiate a local backend. + * + * remote: If true, the writer will forward all data to remote + * clients. * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(bro_int_t type); + WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote); /** * Destructor. @@ -187,10 +194,17 @@ public: protected: friend class Manager; + void DeleteVals(threading::Value** vals); + + EnumVal* stream; + EnumVal* writer; + WriterBackend* backend; // The backend we have instanatiated. bool disabled; // True if disabled. bool initialized; // True if initialized. bool buf; // True if buffering is enabled (default). + bool local; // True if logging locally. + bool remote; // True if loggin remotely. string ty_name; // Name of the backend type. Set by the manager. string path; // The log path. diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 51c4f7a3bc..e590b13434 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -20,8 +20,8 @@ BasicThread::BasicThread() terminating = false; pthread = 0; - buf = 0; - buf_len = 1024; + buf_len = 2048; + buf = (char*) malloc(buf_len); name = Fmt("thread-%d", ++thread_counter); @@ -57,9 +57,6 @@ void BasicThread::SetOSName(const string& name) const char* BasicThread::Fmt(const char* format, ...) { - if ( ! buf ) - buf = (char*) malloc(buf_len); - va_list al; va_start(al, format); int n = safe_vsnprintf(buf, buf_len, format, al); @@ -67,13 +64,15 @@ const char* BasicThread::Fmt(const char* format, ...) if ( (unsigned int) n >= buf_len ) { // Not enough room, grow the buffer. - buf_len = n + 32; - buf = (char*) realloc(buf, buf_len); + int tmp_len = n + 32; + char* tmp = (char*) malloc(tmp_len); // Is it portable to restart? va_start(al, format); - n = safe_vsnprintf(buf, buf_len, format, al); + n = safe_vsnprintf(tmp, tmp_len, format, al); va_end(al); + + free(tmp); } return buf;