diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 095490edc4..7c9c1d10ca 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -31,13 +31,13 @@ private: bool terminating; }; -class DisableMessage : public threading::OutputMessage +class FlushWriteBufferMessage : public threading::OutputMessage { public: - DisableMessage(WriterFrontend* writer) - : threading::OutputMessage("Disable", writer) {} + FlushWriteBufferMessage(WriterFrontend* writer) + : threading::OutputMessage("FlushWriteBuffer", writer) {} - virtual bool Process() { Object()->SetDisable(); return true; } + virtual bool Process() { Object()->FlushWriteBuffer(); return true; } }; } @@ -65,25 +65,31 @@ WriterBackend::~WriterBackend() } } -void WriterBackend::DeleteVals(Value** vals) +void WriterBackend::DeleteVals(int num_writes, Value*** vals) { - // Note this code is duplicated in Manager::DeleteVals(). - for ( int i = 0; i < num_fields; i++ ) - delete vals[i]; + for ( int j = 0; j < num_writes; ++j ) + { + // Note this code is duplicated in Manager::DeleteVals(). + for ( int i = 0; i < num_fields; i++ ) + delete vals[j][i]; + + delete [] vals[j]; + } delete [] vals; } -bool WriterBackend::FinishedRotation(WriterFrontend* writer, string new_name, string old_name, +bool WriterBackend::FinishedRotation(string new_name, string old_name, double open, double close, bool terminating) { - SendOut(new RotationFinishedMessage(writer, new_name, old_name, open, close, terminating)); + SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating)); return true; } -bool WriterBackend::Init(string arg_path, int arg_num_fields, - const Field* const * arg_fields) +bool WriterBackend::Init(WriterFrontend* arg_frontend, string arg_path, int arg_num_fields, + const Field* const * arg_fields) { + frontend = arg_frontend; path = arg_path; num_fields = arg_num_fields; fields = arg_fields; @@ -94,7 +100,7 @@ bool WriterBackend::Init(string arg_path, int arg_num_fields, return true; } -bool WriterBackend::Write(int arg_num_fields, Value** vals) +bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { // Double-check that the arguments match. If we get this from remote, // something might be mixed up. @@ -107,30 +113,42 @@ bool WriterBackend::Write(int arg_num_fields, Value** vals) Debug(DBG_LOGGING, msg); #endif - DeleteVals(vals); + DeleteVals(num_writes, vals); return false; } - for ( int i = 0; i < num_fields; ++i ) - { - if ( vals[i]->type != fields[i]->type ) - { #ifdef DEBUG - const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)", - vals[i]->type, fields[i]->type); - Debug(DBG_LOGGING, msg); -#endif + // Double-check all the types match. + for ( int j = 0; j < num_writes; j++ ) + { + for ( int i = 0; i < num_fields; ++i ) + { + if ( vals[j][i]->type != fields[i]->type ) + { + const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)", + vals[j][i]->type, fields[i]->type); + Debug(DBG_LOGGING, msg); - DeleteVals(vals); - return false; + DeleteVals(num_writes, vals); + return false; + } } } +#endif - bool result = DoWrite(num_fields, fields, vals); + bool success = true; - DeleteVals(vals); + for ( int j = 0; j < num_writes; j++ ) + { + success = DoWrite(num_fields, fields, vals[j]); - return result; + if ( ! success ) + break; + } + + DeleteVals(num_writes, vals); + + return success; } bool WriterBackend::SetBuf(bool enabled) @@ -144,10 +162,10 @@ bool WriterBackend::SetBuf(bool enabled) return DoSetBuf(enabled); } -bool WriterBackend::Rotate(WriterFrontend* writer, string rotated_path, - double open, double close, bool terminating) +bool WriterBackend::Rotate(string rotated_path, double open, + double close, bool terminating) { - return DoRotate(writer, rotated_path, open, close, terminating); + return DoRotate(rotated_path, open, close, terminating); } bool WriterBackend::Flush() @@ -159,3 +177,11 @@ bool WriterBackend::Finish() { return DoFinish(); } + +bool WriterBackend::DoHeartbeat(double network_time, double current_time) + { + SendOut(new FlushWriteBufferMessage(frontend)); + return true; + } + + diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index d1e4634e6d..27f4fe45a5 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -19,6 +19,12 @@ public: virtual ~WriterBackend(); // One-time initialization of the writer to define the logged fields. + // + // "frontend" is the frontend writer that created this backend. The + // *only* purpose of this value is to be passed back via messages as + // a argument to callbacks. One must not otherwise access the + // frontend, it's running in a different thread. + // // Interpretation of "path" is left to the writer, and will be // corresponding the value configured on the script-level. // @@ -27,7 +33,7 @@ public: // // The new instance takes ownership of "fields", and will delete them // when done. - bool Init(string path, int num_fields, const Field* const * fields); + bool Init(WriterFrontend* frontend, string path, int num_fields, const Field* const * fields); // Writes one log entry. The method takes ownership of "vals" and // will return immediately after queueing the write request, which is @@ -38,7 +44,7 @@ public: // // Returns false if an error occured, in which case the writer must // not be used any further. - bool Write(int num_fields, Value** vals); + bool Write(int num_fields, int num_writes, Value*** vals); // Sets the buffering status for the writer, if the writer supports // that. (If not, it will be ignored). @@ -50,7 +56,7 @@ public: // Triggers rotation, if the writer supports that. (If not, it will // be ignored). - bool Rotate(WriterFrontend* writer, string rotated_path, double open, double close, bool terminating); + bool Rotate(string rotated_path, double open, double close, bool terminating); // Finishes writing to this logger regularly. Must not be called if // an error has been indicated earlier. After calling this, no @@ -81,9 +87,10 @@ public: // // terminating: True if rotation request occured due to the main Bro // process shutting down. - bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name, + bool FinishedRotation(string new_name, string old_name, double open, double close, bool terminating); + protected: // Methods for writers to override. If any of these returs false, it // will be assumed that a fatal error has occured that prevents the @@ -128,11 +135,6 @@ protected: // RotationDone() to signal the log manager that potential // postprocessors can now run. // - // "writer" is the frontend writer that triggered the rotation. The - // *only* purpose of this value is to be passed into - // FinishedRotation() once done. You must not otherwise access the - // frontend, it's running in a different thread. - // // "rotate_path" reflects the path to where the rotated output is to // be moved, with specifics depending on the writer. It should // generally be interpreted in a way consistent with that of "path" @@ -149,8 +151,8 @@ protected: // // A writer may ignore rotation requests if it doesn't fit with its // semantics (but must still return true in that case). - virtual bool DoRotate(WriterFrontend* writer, string rotated_path, - double open, double close, bool terminating) = 0; + virtual bool DoRotate(string rotated_path, double open, double close, + bool terminating) = 0; // Called once on termination. Not called when any of the other // methods has previously signaled an error, i.e., executing this @@ -158,7 +160,9 @@ protected: virtual bool DoFinish() = 0; // Triggered by regular heartbeat messages from the main process. - virtual bool DoHeartbeat(double network_time, double current_time) { return true; }; + // + // Note when overriding, you must call WriterBackend::DoHeartbeat(). + virtual bool DoHeartbeat(double network_time, double current_time); private: friend class Manager; @@ -169,8 +173,9 @@ private: bool Disabled() { return disabled; } // Deletes the values as passed into Write(). - void DeleteVals(Value** vals); + void DeleteVals(int num_writes, Value*** vals); + WriterFrontend* frontend; string path; int num_fields; const Field* const * fields; diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 92c93c1c56..2f7c1d6e7e 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -9,13 +9,14 @@ namespace logging { class InitMessage : public threading::InputMessage { public: - InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const *fields) + InitMessage(WriterBackend* backend, WriterFrontend* frontend, const string path, const int num_fields, const Field* const *fields) : threading::InputMessage("Init", backend), path(path), num_fields(num_fields), fields(fields) { } - virtual bool Process() { return Object()->Init(path, num_fields, fields); } + virtual bool Process() { return Object()->Init(frontend, path, num_fields, fields); } private: + WriterFrontend* frontend; const string path; const int num_fields; const Field * const* fields; @@ -31,7 +32,7 @@ public: rotated_path(rotated_path), open(open), close(close), terminating(terminating) { } - virtual bool Process() { return Object()->Rotate(frontend, rotated_path, open, close, terminating); } + virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); } private: WriterFrontend* frontend; @@ -44,16 +45,16 @@ private: class WriteMessage : public threading::InputMessage { public: - WriteMessage(WriterBackend* backend, const int num_fields, Value **vals) + WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals) : threading::InputMessage("Write", backend), - num_fields(num_fields), fields(fields), vals(vals) {} + num_fields(num_fields), vals(vals) {} - virtual bool Process() { return Object()->Write(num_fields, vals); } + virtual bool Process() { return Object()->Write(num_fields, num_writes, vals); } private: int num_fields; - Field* const* fields; - Value **vals; + int num_writes; + Value ***vals; }; class SetBufMessage : public threading::InputMessage @@ -96,6 +97,8 @@ using namespace logging; WriterFrontend::WriterFrontend(bro_int_t type) { disabled = initialized = false; + buf = true; + write_buffer_pos = 0; backend = log_mgr->CreateBackend(type); assert(backend); @@ -108,6 +111,7 @@ WriterFrontend::~WriterFrontend() void WriterFrontend::Stop() { + FlushWriteBuffer(); SetDisable(); backend->Stop(); } @@ -125,7 +129,7 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + backend->SendIn(new InitMessage(backend, this, arg_path, arg_num_fields, arg_fields)); } void WriterFrontend::Write(int num_fields, Value** vals) @@ -133,7 +137,34 @@ void WriterFrontend::Write(int num_fields, Value** vals) if ( disabled ) return; - backend->SendIn(new WriteMessage(backend, num_fields, vals)); + if ( ! write_buffer ) + { + // Need new buffer. + write_buffer = new Value**[WRITER_BUFFER_SIZE]; + write_buffer_pos = 0; + } + + if ( write_buffer_pos >= WRITER_BUFFER_SIZE ) + // Buffer full. + FlushWriteBuffer(); + + write_buffer[write_buffer_pos++] = vals; + + if ( ! buf ) + // Send out immediately if we don't want buffering. + FlushWriteBuffer(); + } + +void WriterFrontend::FlushWriteBuffer() + { + if ( ! write_buffer_pos ) + // Nothing to do. + return; + + backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); + + // Clear buffer (no delete, we pass ownership to child thread.) + write_buffer = 0; } void WriterFrontend::SetBuf(bool enabled) @@ -141,7 +172,13 @@ void WriterFrontend::SetBuf(bool enabled) if ( disabled ) return; + buf = enabled; + backend->SendIn(new SetBufMessage(backend, enabled)); + + if ( ! buf ) + // Make sure no longer buffer any still queued data. + FlushWriteBuffer(); } void WriterFrontend::Flush() @@ -149,6 +186,7 @@ void WriterFrontend::Flush() if ( disabled ) return; + FlushWriteBuffer(); backend->SendIn(new FlushMessage(backend)); } @@ -157,6 +195,7 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool if ( disabled ) return; + FlushWriteBuffer(); backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); } @@ -165,6 +204,7 @@ void WriterFrontend::Finish() if ( disabled ) return; + FlushWriteBuffer(); backend->SendIn(new FinishMessage(backend)); } diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 1998429d38..ed1a674842 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -34,6 +34,7 @@ public: void Write(int num_fields, Value** vals); void SetBuf(bool enabled); void Flush(); + void FlushWriteBuffer(); void Rotate(string rotated_path, double open, double close, bool terminating); void Finish(); @@ -49,18 +50,22 @@ public: protected: friend class Manager; - WriterBackend* backend; bool disabled; bool initialized; + bool buf; string path; int num_fields; const Field* const * fields; + + // Buffer for bulk writes. + static const int WRITER_BUFFER_SIZE = 50; + + int write_buffer_pos; + Value*** write_buffer; }; } - - #endif diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 70f513be3b..a1ceb6e217 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -317,8 +317,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields, return true; } -bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open, - double close, bool terminating) +bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating) { // Don't rotate special files or if there's not one currently open. if ( ! file || IsSpecial(Path()) ) @@ -330,7 +329,7 @@ bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open, string nname = rotated_path + "." + LogExt(); rename(fname.c_str(), nname.c_str()); - if ( ! FinishedRotation(writer, nname, fname, open, close, terminating) ) + if ( ! FinishedRotation(nname, fname, open, close, terminating) ) { Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); return false; diff --git a/src/logging/writers/Ascii.h b/src/logging/writers/Ascii.h index 37fcfef267..0c627c68e9 100644 --- a/src/logging/writers/Ascii.h +++ b/src/logging/writers/Ascii.h @@ -23,8 +23,8 @@ protected: virtual bool DoWrite(int num_fields, const Field* const * fields, Value** vals); virtual bool DoSetBuf(bool enabled); - virtual bool DoRotate(WriterFrontend* writer, string rotated_path, - double open, double close, bool terminating); + virtual bool DoRotate(string rotated_path, double open, + double close, bool terminating); virtual bool DoFlush(); virtual bool DoFinish(); diff --git a/src/logging/writers/None.cc b/src/logging/writers/None.cc index e419d88a6b..a9a7872f85 100644 --- a/src/logging/writers/None.cc +++ b/src/logging/writers/None.cc @@ -4,10 +4,9 @@ using namespace logging; using namespace writer; -bool None::DoRotate(WriterFrontend* writer, string rotated_path, - double open, double close, bool terminating) +bool None::DoRotate(string rotated_path, double open, double close, bool terminating) { - if ( ! FinishedRotation(writer, string("/dev/null"), Path(), open, close, terminating)) + if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating)) { Error(Fmt("error rotating %s", Path().c_str())); return false; diff --git a/src/logging/writers/None.h b/src/logging/writers/None.h index 9b2ab6c698..9360ef44f6 100644 --- a/src/logging/writers/None.h +++ b/src/logging/writers/None.h @@ -23,8 +23,8 @@ protected: virtual bool DoWrite(int num_fields, const Field* const * fields, Value** vals) { return true; } virtual bool DoSetBuf(bool enabled) { return true; } - virtual bool DoRotate(WriterFrontend* writer, string rotated_path, - double open, double close, bool terminating); + virtual bool DoRotate(string rotated_path, double open, + double close, bool terminating); virtual bool DoFlush() { return true; } virtual bool DoFinish() { return true; } }; diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 8f37041bb6..ec249e90ad 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -184,8 +184,11 @@ protected: * Triggers a heartbeat message being sent to the client thread. * * This is method is called regularly by the threading::Manager. + * + * Can be overriden in derived classed to hook into the heart beat, + * but must call the parent implementation. */ - void Heartbeat(); + virtual void Heartbeat(); /** * Overriden from BasicThread. @@ -194,6 +197,17 @@ protected: virtual void Run(); virtual void OnStop(); + /** + * Regulatly triggered for execution in the child thread. + * + * When overriding, one must call the parent class' implementation. + * + * network_time: The network_time when the heartbeat was trigger by + * the main thread. + * + * current_time: Wall clock when the heartbeat was trigger by the + * main thread. + */ virtual bool DoHeartbeat(double network_time, double current_time) { return true; } private: