diff --git a/NEWS b/NEWS index d9410e1c7c..0798920d8a 100644 --- a/NEWS +++ b/NEWS @@ -140,6 +140,11 @@ the full set. Bro now supports decapsulating tunnels directly for protocols it understands. +- ASCII logs now record the time when they were opened/closed at the + beginning and end of the file, respectively. The options + LogAscii::header_prefix and LogAscii::include_header have been + renamed to LogAscii::meta_prefix and LogAscii::include_meta, + respectively. Bro 2.0 ------- diff --git a/scripts/base/frameworks/logging/writers/ascii.bro b/scripts/base/frameworks/logging/writers/ascii.bro index fa1fcd6797..bacb0996d0 100644 --- a/scripts/base/frameworks/logging/writers/ascii.bro +++ b/scripts/base/frameworks/logging/writers/ascii.bro @@ -8,12 +8,13 @@ export { ## into files. This is primarily for debugging purposes. const output_to_stdout = F &redef; - ## If true, include a header line with column names and description - ## of the other ASCII logging options that were used. - const include_header = T &redef; + ## If true, include lines with log meta information such as column names with + ## types, the values of ASCII logging options that in use, and the time when the + ## file was opened and closes (the latter at the end). + const include_meta = T &redef; - ## Prefix for the header line if included. - const header_prefix = "#" &redef; + ## Prefix for lines with meta information. + const meta_prefix = "#" &redef; ## Separator between fields. const separator = "\t" &redef; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index fc68343813..1c6b69e8ec 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -726,8 +726,6 @@ bool Manager::RemoveStream(Stream *i) i->removed = true; - i->reader->Close(); - DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index dea554251e..84106a3c94 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -207,7 +207,7 @@ bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields, return success; } -void ReaderBackend::Close() +bool ReaderBackend::OnFinish(double network_time) { DoClose(); disabled = true; // frontend disables itself when it gets the Close-message. @@ -221,6 +221,8 @@ void ReaderBackend::Close() delete [] (fields); fields = 0; } + + return true; } bool ReaderBackend::Update() @@ -243,10 +245,9 @@ void ReaderBackend::DisableFrontend() SendOut(new DisableMessage(frontend)); } -bool ReaderBackend::DoHeartbeat(double network_time, double current_time) +bool ReaderBackend::OnHeartbeat(double network_time, double current_time) { - MsgThread::DoHeartbeat(network_time, current_time); - return true; + return DoHeartbeat(network_time, current_time); } TransportProto ReaderBackend::StringToProto(const string &proto) diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 820633254a..1e77a61f37 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -108,15 +108,6 @@ public: */ bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields); - /** - * Finishes reading from this input stream in a regular fashion. Must - * not be called if an error has been indicated earlier. After - * calling this, no further reading from the stream can be performed. - * - * @return False if an error occured. - */ - void Close(); - /** * Force trigger an update of the input stream. The action that will * be taken depends on the current read mode and the individual input @@ -149,6 +140,9 @@ public: */ int NumFields() const { return num_fields; } + // Overridden from MsgThread. + virtual bool OnHeartbeat(double network_time, double current_time); + virtual bool OnFinish(double network_time); protected: // Methods that have to be overwritten by the individual readers @@ -200,6 +194,11 @@ protected: */ virtual bool DoUpdate() = 0; + /** + * Triggered by regular heartbeat messages from the main thread. + */ + virtual bool DoHeartbeat(double network_time, double current_time) = 0; + /** * Method allowing a reader to send a specified Bro event. Vals must * match the values expected by the bro event. @@ -271,14 +270,6 @@ protected: */ void EndCurrentSend(); - /** - * Triggered by regular heartbeat messages from the main thread. - * - * This method can be overridden but once must call - * ReaderBackend::DoHeartbeat(). - */ - virtual bool DoHeartbeat(double network_time, double current_time); - /** * Convert a string into a TransportProto. This is just a utility * function for Readers. diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index e489147d36..7e4ef201b1 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -37,17 +37,6 @@ public: virtual bool Process() { return Object()->Update(); } }; -class CloseMessage : public threading::InputMessage -{ -public: - CloseMessage(ReaderBackend* backend) - : threading::InputMessage("Close", backend) - { } - - virtual bool Process() { Object()->Close(); return true; } -}; - - ReaderFrontend::ReaderFrontend(bro_int_t type) { disabled = initialized = false; @@ -93,21 +82,6 @@ void ReaderFrontend::Update() backend->SendIn(new UpdateMessage(backend)); } -void ReaderFrontend::Close() - { - if ( disabled ) - return; - - if ( ! initialized ) - { - reporter->Error("Tried to call finish on uninitialized reader"); - return; - } - - disabled = true; - backend->SendIn(new CloseMessage(backend)); - } - string ReaderFrontend::Name() const { if ( ! info.source.size() ) diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index dd1e742e5e..7f93a3138c 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -506,8 +506,6 @@ bool Ascii::DoUpdate() bool Ascii::DoHeartbeat(double network_time, double current_time) { - ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index d71901fa66..28afdc1c89 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -222,7 +222,6 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) bool Benchmark::DoHeartbeat(double network_time, double current_time) { - ReaderBackend::DoHeartbeat(network_time, current_time); num_lines = (int) ( (double) num_lines*multiplication_factor); num_lines += add; heartbeatstarttime = CurrTime(); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 1bae6cfa0c..f62e966883 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -252,8 +252,6 @@ bool Raw::DoUpdate() bool Raw::DoHeartbeat(double network_time, double current_time) { - ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) diff --git a/src/logging.bif b/src/logging.bif index d25e89c33c..48e0edbb06 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -65,8 +65,8 @@ function Log::__flush%(id: Log::ID%): bool module LogAscii; const output_to_stdout: bool; -const include_header: bool; -const header_prefix: string; +const include_meta: bool; +const meta_prefix: string; const separator: string; const set_separator: string; const empty_field: string; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 0fea3d577d..1808b83738 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -771,6 +771,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) WriterBackend::WriterInfo info; info.path = path; + info.network_time = network_time; HashKey* k; IterCookie* c = filter->config->AsTable()->InitForIteration(); @@ -1156,7 +1157,7 @@ bool Manager::Flush(EnumVal* id) for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) - i->second->writer->Flush(); + i->second->writer->Flush(network_time); RemoveDisabledWriters(stream); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 00590208d5..68b0b506a1 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -18,7 +18,7 @@ namespace logging { class RotationFinishedMessage : public threading::OutputMessage { public: - RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name, + RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name, double open, double close, bool terminating) : threading::OutputMessage("RotationFinished", writer), new_name(new_name), old_name(old_name), open(open), @@ -260,9 +260,9 @@ bool WriterBackend::Rotate(string rotated_path, double open, return true; } -bool WriterBackend::Flush() +bool WriterBackend::Flush(double network_time) { - if ( ! DoFlush() ) + if ( ! DoFlush(network_time) ) { DisableFrontend(); return false; @@ -271,13 +271,15 @@ bool WriterBackend::Flush() return true; } -bool WriterBackend::DoHeartbeat(double network_time, double current_time) +bool WriterBackend::OnFinish(double network_time) { - MsgThread::DoHeartbeat(network_time, current_time); + return DoFinish(network_time); + } +bool WriterBackend::OnHeartbeat(double network_time, double current_time) + { SendOut(new FlushWriteBufferMessage(frontend)); - - return true; + return DoHeartbeat(network_time, current_time); } string WriterBackend::Render(const threading::Value::addr_t& addr) const diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 6e65a8151a..33cde8679e 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -67,6 +67,11 @@ public: */ double rotation_base; + /** + * The network time when the writer is created. + */ + double network_time; + /** * A map of key/value pairs corresponding to the relevant * filter's "config" table. @@ -129,9 +134,11 @@ public: * Flushes any currently buffered output, assuming the writer * supports that. (If not, it will be ignored). * + * @param network_time The network time when the flush was triggered. + * * @return False if an error occured. */ - bool Flush(); + bool Flush(double network_time); /** * Triggers rotation, if the writer supports that. (If not, it will @@ -213,6 +220,10 @@ public: */ string Render(double d) const; + // Overridden from MsgThread. + virtual bool OnHeartbeat(double network_time, double current_time); + virtual bool OnFinish(double network_time); + protected: friend class FinishMessage; @@ -272,8 +283,10 @@ protected: * will then be disabled and eventually deleted. When returning * false, an implementation should also call Error() to indicate what * happened. + * + * @param network_time The network time when the flush was triggered. */ - virtual bool DoFlush() = 0; + virtual bool DoFlush(double network_time) = 0; /** * Writer-specific method implementing log rotation. Most directly @@ -314,20 +327,19 @@ protected: /** * Writer-specific method called just before the threading system is - * going to shutdown. + * going to shutdown. It is assumed that once this messages returns, + * the thread can be safely terminated. * - * This method can be overridden but one must call - * WriterBackend::DoFinish(). + * @param network_time The network time when the finish is triggered. */ - virtual bool DoFinish() { return MsgThread::DoFinish(); } - + virtual bool DoFinish(double network_time) = 0; /** * Triggered by regular heartbeat messages from the main thread. * - * This method can be overridden but one must call - * WriterBackend::DoHeartbeat(). + * This method can be overridden. Default implementation does + * nothing. */ - virtual bool DoHeartbeat(double network_time, double current_time); + virtual bool DoHeartbeat(double network_time, double current_time) = 0; private: /** diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 21bde0d43c..577003926b 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -81,19 +81,13 @@ private: class FlushMessage : public threading::InputMessage { public: - FlushMessage(WriterBackend* backend) - : threading::InputMessage("Flush", backend) {} + FlushMessage(WriterBackend* backend, double network_time) + : threading::InputMessage("Flush", backend), + network_time(network_time) {} - virtual bool Process() { return Object()->Flush(); } -}; - -class FinishMessage : public threading::InputMessage -{ -public: - FinishMessage(WriterBackend* backend) - : threading::InputMessage("Finish", backend) {} - - virtual bool Process() { return Object()->DoFinish(); } + virtual bool Process() { return Object()->Flush(network_time); } +private: + double network_time; }; } @@ -240,7 +234,7 @@ void WriterFrontend::SetBuf(bool enabled) FlushWriteBuffer(); } -void WriterFrontend::Flush() +void WriterFrontend::Flush(double network_time) { if ( disabled ) return; @@ -248,7 +242,7 @@ void WriterFrontend::Flush() FlushWriteBuffer(); if ( backend ) - backend->SendIn(new FlushMessage(backend)); + backend->SendIn(new FlushMessage(backend, network_time)); } void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) @@ -266,17 +260,6 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool log_mgr->FinishedRotation(0, "", rotated_path, open, close, terminating); } -void WriterFrontend::Finish() - { - if ( disabled ) - return; - - FlushWriteBuffer(); - - if ( backend ) - backend->SendIn(new FinishMessage(backend)); - } - void WriterFrontend::DeleteVals(Value** vals) { // Note this code is duplicated in Manager::DeleteVals(). diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 8a0dce4645..6581fb1c1b 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -114,8 +114,10 @@ public: * message back that will asynchronously call Disable(). * * This method must only be called from the main thread. + * + * @param network_time The network time when the flush was triggered. */ - void Flush(); + void Flush(double network_time); /** * Triggers log rotation. @@ -138,8 +140,10 @@ public: * sends a message back that will asynchronously call Disable(). * * This method must only be called from the main thread. + * + * @param network_time The network time when the finish was triggered. */ - void Finish(); + void Finish(double network_time); /** * Explicitly triggers a transfer of all potentially buffered Write() diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 6e5ceef678..ab68cd77d8 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -18,7 +18,7 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) file = 0; output_to_stdout = BifConst::LogAscii::output_to_stdout; - include_header = BifConst::LogAscii::include_header; + include_meta = BifConst::LogAscii::include_meta; separator_len = BifConst::LogAscii::separator->Len(); separator = new char[separator_len]; @@ -40,10 +40,10 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) memcpy(unset_field, BifConst::LogAscii::unset_field->Bytes(), unset_field_len); - header_prefix_len = BifConst::LogAscii::header_prefix->Len(); - header_prefix = new char[header_prefix_len]; - memcpy(header_prefix, BifConst::LogAscii::header_prefix->Bytes(), - header_prefix_len); + meta_prefix_len = BifConst::LogAscii::meta_prefix->Len(); + meta_prefix = new char[meta_prefix_len]; + memcpy(meta_prefix, BifConst::LogAscii::meta_prefix->Bytes(), + meta_prefix_len); desc.EnableEscaping(); desc.AddEscapeSequence(separator, separator_len); @@ -51,24 +51,39 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) Ascii::~Ascii() { + // Normally, the file will be closed here already via the Finish() + // message. But when we terminate abnormally, we may still have it + // open. if ( file ) - fclose(file); + CloseFile(0); delete [] separator; delete [] set_separator; delete [] empty_field; delete [] unset_field; - delete [] header_prefix; + delete [] meta_prefix; } bool Ascii::WriteHeaderField(const string& key, const string& val) { - string str = string(header_prefix, header_prefix_len) + + string str = string(meta_prefix, meta_prefix_len) + key + string(separator, separator_len) + val + "\n"; return (fwrite(str.c_str(), str.length(), 1, file) == 1); } +void Ascii::CloseFile(double t) + { + if ( ! file ) + return; + + if ( include_meta ) + WriteHeaderField("end", t ? Timestamp(t) : ""); + + fclose(file); + file = 0; + } + bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields) { string path = info.path; @@ -81,17 +96,17 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * if ( ! (file = fopen(fname.c_str(), "w")) ) { Error(Fmt("cannot open %s: %s", fname.c_str(), - strerror(errno))); + Strerror(errno))); return false; } - if ( include_header ) + if ( include_meta ) { string names; string types; - string str = string(header_prefix, header_prefix_len) + string str = string(meta_prefix, meta_prefix_len) + "separator " // Always use space as separator here. + get_escaped_string(string(separator, separator_len), false) + "\n"; @@ -105,8 +120,9 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * string(empty_field, empty_field_len), false)) && WriteHeaderField("unset_field", get_escaped_string( string(unset_field, unset_field_len), false)) && - WriteHeaderField("path", get_escaped_string(path, false))) ) - goto write_error; + WriteHeaderField("path", get_escaped_string(path, false)) && + WriteHeaderField("start", Timestamp(info.network_time))) ) + goto write_error; for ( int i = 0; i < num_fields; ++i ) { @@ -128,21 +144,23 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * return true; write_error: - Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno))); + Error(Fmt("error writing to %s: %s", fname.c_str(), Strerror(errno))); return false; } -bool Ascii::DoFlush() +bool Ascii::DoFlush(double network_time) { fflush(file); return true; } -bool Ascii::DoFinish() +bool Ascii::DoFinish(double network_time) { - return WriterBackend::DoFinish(); + CloseFile(network_time); + return true; } + bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) { if ( ! val->present ) @@ -307,16 +325,33 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields, desc.AddRaw("\n", 1); - if ( fwrite(desc.Bytes(), desc.Len(), 1, file) != 1 ) + const char* bytes = (const char*)desc.Bytes(); + int len = desc.Len(); + + // Make sure the line doesn't look like meta information. + if ( strncmp(bytes, meta_prefix, meta_prefix_len) == 0 ) { - Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno))); - return false; + // It would so escape the first character. + char buf[16]; + snprintf(buf, sizeof(buf), "\\x%02x", bytes[0]); + if ( fwrite(buf, strlen(buf), 1, file) != 1 ) + goto write_error; + + ++bytes; + --len; } + if ( fwrite(bytes, len, 1, file) != 1 ) + goto write_error; + if ( IsBuf() ) fflush(file); return true; + +write_error: + Error(Fmt("error writing to %s: %s", fname.c_str(), Strerror(errno))); + return false; } bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating) @@ -325,8 +360,7 @@ bool Ascii::DoRotate(string rotated_path, double open, double close, bool termin if ( ! file || IsSpecial(Info().path) ) return true; - fclose(file); - file = 0; + CloseFile(close); string nname = rotated_path + "." + LogExt(); rename(fname.c_str(), nname.c_str()); @@ -346,9 +380,28 @@ bool Ascii::DoSetBuf(bool enabled) return true; } +bool Ascii::DoHeartbeat(double network_time, double current_time) + { + // Nothing to do. + return true; + } + string Ascii::LogExt() { const char* ext = getenv("BRO_LOG_SUFFIX"); if ( ! ext ) ext = "log"; return ext; } + +string Ascii::Timestamp(double t) + { + struct tm tm; + char buf[128]; + const char* const date_fmt = "%Y-%m-%d-%H-%M-%S"; + time_t teatime = time_t(t); + + localtime_r(&teatime, &tm); + strftime(buf, sizeof(buf), date_fmt, &tm); + return buf; + } + diff --git a/src/logging/writers/Ascii.h b/src/logging/writers/Ascii.h index a95e644d49..857954ce37 100644 --- a/src/logging/writers/Ascii.h +++ b/src/logging/writers/Ascii.h @@ -26,13 +26,16 @@ protected: virtual bool DoSetBuf(bool enabled); virtual bool DoRotate(string rotated_path, double open, double close, bool terminating); - virtual bool DoFlush(); - virtual bool DoFinish(); + virtual bool DoFlush(double network_time); + virtual bool DoFinish(double network_time); + virtual bool DoHeartbeat(double network_time, double current_time); private: bool IsSpecial(string path) { return path.find("/dev/") == 0; } bool DoWriteOne(ODesc* desc, threading::Value* val, const threading::Field* field); bool WriteHeaderField(const string& key, const string& value); + void CloseFile(double t); + string Timestamp(double t); FILE* file; string fname; @@ -40,7 +43,7 @@ private: // Options set from the script-level. bool output_to_stdout; - bool include_header; + bool include_meta; char* separator; int separator_len; @@ -54,8 +57,8 @@ private: char* unset_field; int unset_field_len; - char* header_prefix; - int header_prefix_len; + char* meta_prefix; + int meta_prefix_len; }; } diff --git a/src/logging/writers/DataSeries.cc b/src/logging/writers/DataSeries.cc index b34ea3412a..1978a8b781 100644 --- a/src/logging/writers/DataSeries.cc +++ b/src/logging/writers/DataSeries.cc @@ -311,7 +311,7 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading: } else - Error(Fmt("cannot dump schema: %s", strerror(errno))); + Error(Fmt("cannot dump schema: %s", Strerror(errno))); } compress_type = Extent::compress_all; @@ -343,7 +343,7 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading: return OpenLog(info.path); } -bool DataSeries::DoFlush() +bool DataSeries::DoFlush(double network_time) { // Flushing is handled by DataSeries automatically, so this function // doesn't do anything. @@ -366,11 +366,10 @@ void DataSeries::CloseLog() log_file = 0; } -bool DataSeries::DoFinish() +bool DataSeries::DoFinish(double network_time) { CloseLog(); - - return WriterBackend::DoFinish(); + return true; } bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields, @@ -420,4 +419,9 @@ bool DataSeries::DoSetBuf(bool enabled) return true; } +bool DataSeries::DoHeartbeat(double network_time, double current_time) +{ + return true; +} + #endif /* USE_DATASERIES */ diff --git a/src/logging/writers/DataSeries.h b/src/logging/writers/DataSeries.h index 0ae3572b76..31d17a1a7b 100644 --- a/src/logging/writers/DataSeries.h +++ b/src/logging/writers/DataSeries.h @@ -34,8 +34,9 @@ protected: virtual bool DoSetBuf(bool enabled); virtual bool DoRotate(string rotated_path, double open, double close, bool terminating); - virtual bool DoFlush(); - virtual bool DoFinish(); + virtual bool DoFlush(double network_time); + virtual bool DoFinish(double network_time); + virtual bool DoHeartbeat(double network_time, double current_time); private: static const size_t ROW_MIN = 2048; // Minimum extent size. diff --git a/src/logging/writers/None.h b/src/logging/writers/None.h index 7e2e4ef4eb..c6d7cba56a 100644 --- a/src/logging/writers/None.h +++ b/src/logging/writers/None.h @@ -26,8 +26,9 @@ protected: virtual bool DoSetBuf(bool enabled) { return true; } virtual bool DoRotate(string rotated_path, double open, double close, bool terminating); - virtual bool DoFlush() { return true; } - virtual bool DoFinish() { WriterBackend::DoFinish(); return true; } + virtual bool DoFlush(double network_time) { return true; } + virtual bool DoFinish(double network_time) { return true; } + virtual bool DoHeartbeat(double network_time, double current_time) { return true; } }; } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index dfa4c28eda..88c4ac0965 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -78,24 +78,22 @@ const char* BasicThread::Fmt(const char* format, ...) return buf; } +const char* BasicThread::Strerror(int err) + { + static char buf[128] = ""; + strerror_r(err, buf, sizeof(buf)); + return buf; + } + void BasicThread::Start() { if ( started ) return; - int err = pthread_mutex_init(&terminate, 0); - if ( err != 0 ) - reporter->FatalError("Cannot create terminate mutex for thread %s: %s", name.c_str(), strerror(err)); - - // We use this like a binary semaphore and acquire it immediately. - err = pthread_mutex_lock(&terminate); + int err = pthread_create(&pthread, 0, BasicThread::launcher, this); if ( err != 0 ) - reporter->FatalError("Cannot aquire terminate mutex for thread %s: %s", name.c_str(), strerror(err)); - - err = pthread_create(&pthread, 0, BasicThread::launcher, this); - if ( err != 0 ) - reporter->FatalError("Cannot create thread %s:%s", name.c_str(), strerror(err)); + reporter->FatalError("Cannot create thread %s:%s", name.c_str(), Strerror(err)); DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str()); @@ -114,12 +112,6 @@ void BasicThread::Stop() DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str()); - // Signal that it's ok for the thread to exit now by unlocking the - // mutex. - int err = pthread_mutex_unlock(&terminate); - if ( err != 0 ) - reporter->FatalError("Failure flagging terminate condition for thread %s: %s", name.c_str(), strerror(err)); - terminating = true; OnStop(); @@ -130,16 +122,13 @@ void BasicThread::Join() if ( ! started ) return; - if ( ! terminating ) - Stop(); + assert(terminating); DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str()); if ( pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name.c_str()); - pthread_mutex_destroy(&terminate); - DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); pthread = 0; @@ -178,10 +167,6 @@ void* BasicThread::launcher(void *arg) // Run thread's main function. thread->Run(); - // Wait until somebody actually wants us to terminate. - if ( pthread_mutex_lock(&thread->terminate) != 0 ) - reporter->FatalError("Failure acquiring terminate mutex at end of thread %s", thread->Name().c_str()); - return 0; } diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index cc87ae03bc..d47eb5c3c3 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -96,6 +96,14 @@ public: */ const char* Fmt(const char* format, ...); + /** + * A version of strerror() that the thread can safely use. This is + * essentially a wrapper around strerror_r(). Note that it keeps a + * single static buffer internally so the result remains valid only + * until the next call. + */ + const char* Strerror(int err); + protected: friend class Manager; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6a3d496325..81ef123661 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -16,9 +16,17 @@ namespace threading { class FinishMessage : public InputMessage { public: - FinishMessage(MsgThread* thread) : InputMessage("Finish", thread) { } + FinishMessage(MsgThread* thread, double network_time) : InputMessage("Finish", thread), + network_time(network_time) { } - virtual bool Process() { return Object()->DoFinish(); } + virtual bool Process() { + bool result = Object()->OnFinish(network_time); + Object()->Finished(); + return result; + } + +private: + double network_time; }; // A dummy message that's only purpose is unblock the current read operation @@ -39,7 +47,10 @@ public: : InputMessage("Heartbeat", thread) { network_time = arg_network_time; current_time = arg_current_time; } - virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); } + virtual bool Process() { + Object()->HeartbeatInChild(); + return Object()->OnHeartbeat(network_time, current_time); + } private: double network_time; @@ -146,8 +157,11 @@ MsgThread::MsgThread() : BasicThread() void MsgThread::OnStop() { + if ( finished ) + return; + // Signal thread to terminate and wait until it has acknowledged. - SendIn(new FinishMessage(this), true); + SendIn(new FinishMessage(this, network_time), true); int cnt = 0; while ( ! finished ) @@ -161,6 +175,8 @@ void MsgThread::OnStop() usleep(1000); } + Finished(); + // One more message to make sure the current queue read operation unblocks. SendIn(new UnblockMessage(this), true); } @@ -170,7 +186,7 @@ void MsgThread::Heartbeat() SendIn(new HeartbeatMessage(this, network_time, current_time())); } -bool MsgThread::DoHeartbeat(double network_time, double current_time) +void MsgThread::HeartbeatInChild() { string n = Name(); @@ -179,16 +195,13 @@ bool MsgThread::DoHeartbeat(double network_time, double current_time) cnt_sent_out - queue_out.Size()); SetOSName(n.c_str()); - - return true; } -bool MsgThread::DoFinish() +void MsgThread::Finished() { // This is thread-safe "enough", we're the only one ever writing // there. finished = true; - return true; } void MsgThread::Info(const char* msg) diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index a917f54396..67ab9517c5 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -189,39 +189,45 @@ protected: * * This is method is called regularly by the threading::Manager. * - * Can be overriden in derived classed to hook into the heart beat, - * but must call the parent implementation. Note that this method is - * always called by the main thread and must not access data of the - * child thread directly. See DoHeartbeat() if you want to do - * something on the child-side. + * Can be overriden in derived classed to hook into the heart beat + * sending, but must call the parent implementation. Note that this + * method is always called by the main thread and must not access + * data of the child thread directly. Implement OnHeartbeat() if you + * want to do something on the child-side. */ virtual void Heartbeat(); - /** - * Overriden from BasicThread. - * + /** Flags that the child process has finished processing. Called from child. */ - virtual void Run(); - virtual void OnStop(); + void Finished(); + + /** Internal heartbeat processing. Called from child. + */ + void HeartbeatInChild(); /** * Regulatly triggered for execution in the child thread. * - * When overriding, one must call the parent class' implementation. - * * network_time: The network_time when the heartbeat was trigger by * the main thread. * * current_time: Wall clock when the heartbeat was trigger by the * main thread. */ - virtual bool DoHeartbeat(double network_time, double current_time); + virtual bool OnHeartbeat(double network_time, double current_time) = 0; /** Triggered for execution in the child thread just before shutting threads down. * The child thread should finish its operations and then *must* * call this class' implementation. */ - virtual bool DoFinish(); + virtual bool OnFinish(double network_time) = 0; + + /** + * Overriden from BasicThread. + * + */ + virtual void Run(); + virtual void OnStop(); private: /** diff --git a/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh-filtered.log b/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh-filtered.log new file mode 100644 index 0000000000..a2610bb522 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh-filtered.log @@ -0,0 +1,12 @@ +PREFIX<>separator | +PREFIX<>set_separator|, +PREFIX<>empty_field|EMPTY +PREFIX<>unset_field|NOT-SET +PREFIX<>path|ssh +PREFIX<>fields|t|id.orig_h|id.orig_p|id.resp_h|id.resp_p|status|country|b +PREFIX<>types|time|addr|port|addr|port|string|string|bool +1342126762.852986|1.2.3.4|1234|2.3.4.5|80|success|unknown|NOT-SET +1342126762.852986|1.2.3.4|1234|2.3.4.5|80|NOT-SET|US|NOT-SET +1342126762.852986|1.2.3.4|1234|2.3.4.5|80|failure|UK|NOT-SET +1342126762.852986|1.2.3.4|1234|2.3.4.5|80|NOT-SET|BR|NOT-SET +1342126762.852986|1.2.3.4|1234|2.3.4.5|80|failure|EMPTY|T diff --git a/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh.log b/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh.log deleted file mode 100644 index 10275205a5..0000000000 --- a/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-empty/ssh.log +++ /dev/null @@ -1,12 +0,0 @@ -PREFIX<>separator | -PREFIX<>set_separator|, -PREFIX<>empty_field|EMPTY -PREFIX<>unset_field|NOT-SET -PREFIX<>path|ssh -PREFIX<>fields|t|id.orig_h|id.orig_p|id.resp_h|id.resp_p|status|country|b -PREFIX<>types|time|addr|port|addr|port|string|string|bool -1324314313.345323|1.2.3.4|1234|2.3.4.5|80|success|unknown|NOT-SET -1324314313.345323|1.2.3.4|1234|2.3.4.5|80|NOT-SET|US|NOT-SET -1324314313.345323|1.2.3.4|1234|2.3.4.5|80|failure|UK|NOT-SET -1324314313.345323|1.2.3.4|1234|2.3.4.5|80|NOT-SET|BR|NOT-SET -1324314313.345323|1.2.3.4|1234|2.3.4.5|80|failure|EMPTY|T diff --git a/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-line-like-comment/test.log b/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-line-like-comment/test.log new file mode 100644 index 0000000000..72df0d73d4 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.logging.ascii-line-like-comment/test.log @@ -0,0 +1,12 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#start 2012-07-12-21-00-27 +#fields data c +#types string count +Test1 42 +\x23Kaputt 42 +Test2 42 +#end 2012-07-12-21-00-27 diff --git a/testing/btest/core/expr-exception.bro b/testing/btest/core/expr-exception.bro index 66f9b78c4b..9e84717935 100644 --- a/testing/btest/core/expr-exception.bro +++ b/testing/btest/core/expr-exception.bro @@ -2,7 +2,7 @@ # shouldn't abort Bro entirely, but just return from the function body. # # @TEST-EXEC: bro -r $TRACES/wikipedia.trace %INPUT >output -# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff reporter.log +# @TEST-EXEC: TEST_DIFF_CANONIFIER="$SCRIPTS/diff-remove-abspath | $SCRIPTS/diff-remove-timestamps" btest-diff reporter.log # @TEST-EXEC: btest-diff output event connection_established(c: connection) diff --git a/testing/btest/istate/events-ssl.bro b/testing/btest/istate/events-ssl.bro index d87d014a21..afbee3f6d9 100644 --- a/testing/btest/istate/events-ssl.bro +++ b/testing/btest/istate/events-ssl.bro @@ -6,10 +6,13 @@ # # @TEST-EXEC: btest-diff sender/http.log # @TEST-EXEC: btest-diff receiver/http.log -# @TEST-EXEC: cmp sender/http.log receiver/http.log # -# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log -# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log +# @TEST-EXEC: cat sender/http.log $SCRIPTS/diff-remove-timestamps >sender.http.log +# @TEST-EXEC: cat receiver/http.log $SCRIPTS/diff-remove-timestamps >receiver.http.log +# @TEST-EXEC: cmp sender.http.log receiver.http.log +# +# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.snd.log +# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.rec.log # @TEST-EXEC: btest-diff events.rec.log # @TEST-EXEC: btest-diff events.snd.log # @TEST-EXEC: cmp events.rec.log events.snd.log diff --git a/testing/btest/istate/events.bro b/testing/btest/istate/events.bro index fe588b5c3b..1f05dfc729 100644 --- a/testing/btest/istate/events.bro +++ b/testing/btest/istate/events.bro @@ -1,12 +1,15 @@ # @TEST-SERIALIZE: comm # -# @TEST-EXEC: btest-bg-run sender bro -C -r $TRACES/web.trace --pseudo-realtime ../sender.bro -# @TEST-EXEC: btest-bg-run receiver bro ../receiver.bro +# @TEST-EXEC: btest-bg-run sender bro -Bthreading,logging,comm -C -r $TRACES/web.trace --pseudo-realtime ../sender.bro +# @TEST-EXEC: btest-bg-run receiver bro -Bthreading,logging,comm ../receiver.bro # @TEST-EXEC: btest-bg-wait -k 20 # # @TEST-EXEC: btest-diff sender/http.log # @TEST-EXEC: btest-diff receiver/http.log -# @TEST-EXEC: cmp sender/http.log receiver/http.log +# +# @TEST-EXEC: cat sender/http.log $SCRIPTS/diff-remove-timestamps >sender.http.log +# @TEST-EXEC: cat receiver/http.log $SCRIPTS/diff-remove-timestamps >receiver.http.log +# @TEST-EXEC: cmp sender.http.log receiver.http.log # # @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log # @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log diff --git a/testing/btest/scripts/base/frameworks/logging/ascii-empty.bro b/testing/btest/scripts/base/frameworks/logging/ascii-empty.bro index 9dace5d52a..0bb5900e30 100644 --- a/testing/btest/scripts/base/frameworks/logging/ascii-empty.bro +++ b/testing/btest/scripts/base/frameworks/logging/ascii-empty.bro @@ -1,12 +1,13 @@ # # @TEST-EXEC: bro -b %INPUT -# @TEST-EXEC: btest-diff ssh.log +# @TEST-EXEC: cat ssh.log | grep -v PREFIX.*20..- >ssh-filtered.log +# @TEST-EXEC: btest-diff ssh-filtered.log redef LogAscii::output_to_stdout = F; redef LogAscii::separator = "|"; redef LogAscii::empty_field = "EMPTY"; redef LogAscii::unset_field = "NOT-SET"; -redef LogAscii::header_prefix = "PREFIX<>"; +redef LogAscii::meta_prefix = "PREFIX<>"; module SSH; diff --git a/testing/btest/scripts/base/frameworks/logging/ascii-line-like-comment.bro b/testing/btest/scripts/base/frameworks/logging/ascii-line-like-comment.bro new file mode 100644 index 0000000000..4670811b2a --- /dev/null +++ b/testing/btest/scripts/base/frameworks/logging/ascii-line-like-comment.bro @@ -0,0 +1,23 @@ +# +# @TEST-EXEC: bro -b %INPUT +# @TEST-EXEC: btest-diff test.log + +module Test; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + data: string &log; + c: count &log &default=42; + }; +} + +event bro_init() +{ + Log::create_stream(Test::LOG, [$columns=Info]); + Log::write(Test::LOG, [$data="Test1"]); + Log::write(Test::LOG, [$data="#Kaputt"]); + Log::write(Test::LOG, [$data="Test2"]); +} + diff --git a/testing/btest/scripts/base/frameworks/logging/ascii-options.bro b/testing/btest/scripts/base/frameworks/logging/ascii-options.bro index 8c228c1384..474b179536 100644 --- a/testing/btest/scripts/base/frameworks/logging/ascii-options.bro +++ b/testing/btest/scripts/base/frameworks/logging/ascii-options.bro @@ -4,7 +4,7 @@ redef LogAscii::output_to_stdout = F; redef LogAscii::separator = "|"; -redef LogAscii::include_header = F; +redef LogAscii::include_meta = F; module SSH; diff --git a/testing/btest/scripts/base/frameworks/logging/remote-types.bro b/testing/btest/scripts/base/frameworks/logging/remote-types.bro index f1ef4f0c31..3f102e6319 100644 --- a/testing/btest/scripts/base/frameworks/logging/remote-types.bro +++ b/testing/btest/scripts/base/frameworks/logging/remote-types.bro @@ -1,10 +1,12 @@ # @TEST-SERIALIZE: comm # -# @TEST-EXEC: btest-bg-run sender bro --pseudo-realtime %INPUT ../sender.bro -# @TEST-EXEC: btest-bg-run receiver bro --pseudo-realtime %INPUT ../receiver.bro +# @TEST-EXEC: btest-bg-run sender bro -B threading,logging --pseudo-realtime %INPUT ../sender.bro +# @TEST-EXEC: btest-bg-run receiver bro -B threading,logging --pseudo-realtime %INPUT ../receiver.bro # @TEST-EXEC: btest-bg-wait -k 10 # @TEST-EXEC: btest-diff receiver/test.log -# @TEST-EXEC: cmp receiver/test.log sender/test.log +# @TEST-EXEC: cat receiver/test.log | egrep -v '#start|#end' >r.log +# @TEST-EXEC: cat sender/test.log | egrep -v '#start|#end' >s.log +# @TEST-EXEC: cmp r.log s.log # Remote version testing all types. diff --git a/testing/btest/scripts/base/frameworks/logging/remote.bro b/testing/btest/scripts/base/frameworks/logging/remote.bro index 8375d7915a..48683148f5 100644 --- a/testing/btest/scripts/base/frameworks/logging/remote.bro +++ b/testing/btest/scripts/base/frameworks/logging/remote.bro @@ -8,9 +8,11 @@ # @TEST-EXEC: btest-diff sender/test.log # @TEST-EXEC: btest-diff sender/test.failure.log # @TEST-EXEC: btest-diff sender/test.success.log -# @TEST-EXEC: cmp receiver/test.log sender/test.log -# @TEST-EXEC: cmp receiver/test.failure.log sender/test.failure.log -# @TEST-EXEC: cmp receiver/test.success.log sender/test.success.log +# @TEST-EXEC: ( cd sender && for i in *.log; do cat $i | $SCRIPTS/diff-remove-timestamps >c.$i; done ) +# @TEST-EXEC: ( cd receiver && for i in *.log; do cat $i | $SCRIPTS/diff-remove-timestamps >c.$i; done ) +# @TEST-EXEC: cmp receiver/c.test.log sender/c.test.log +# @TEST-EXEC: cmp receiver/c.test.failure.log sender/c.test.failure.log +# @TEST-EXEC: cmp receiver/c.test.success.log sender/c.test.success.log # This is the common part loaded by both sender and receiver. module Test; diff --git a/testing/btest/scripts/base/frameworks/notice/default-policy-order.test b/testing/btest/scripts/base/frameworks/notice/default-policy-order.test index 6e53bd3b54..d5d3f4c3fa 100644 --- a/testing/btest/scripts/base/frameworks/notice/default-policy-order.test +++ b/testing/btest/scripts/base/frameworks/notice/default-policy-order.test @@ -1,10 +1,10 @@ # This test checks that the default notice policy ordering does not # change from run to run. # @TEST-EXEC: bro -e '' -# @TEST-EXEC: mv notice_policy.log notice_policy.log.1 +# @TEST-EXEC: cat notice_policy.log | $SCRIPTS/diff-remove-timestamps > notice_policy.log.1 # @TEST-EXEC: bro -e '' -# @TEST-EXEC: mv notice_policy.log notice_policy.log.2 +# @TEST-EXEC: cat notice_policy.log | $SCRIPTS/diff-remove-timestamps > notice_policy.log.2 # @TEST-EXEC: bro -e '' -# @TEST-EXEC: mv notice_policy.log notice_policy.log.3 +# @TEST-EXEC: cat notice_policy.log | $SCRIPTS/diff-remove-timestamps > notice_policy.log.3 # @TEST-EXEC: diff notice_policy.log.1 notice_policy.log.2 # @TEST-EXEC: diff notice_policy.log.1 notice_policy.log.3 diff --git a/testing/scripts/diff-remove-timestamps b/testing/scripts/diff-remove-timestamps index 063f1e4900..2b029789de 100755 --- a/testing/scripts/diff-remove-timestamps +++ b/testing/scripts/diff-remove-timestamps @@ -1,5 +1,8 @@ #! /usr/bin/env bash # -# Replace anything which looks like timestamps with XXXs. +# Replace anything which looks like timestamps with XXXs (including the #start/end markers in logs). + +sed 's/[0-9]\{10\}\.[0-9]\{2,8\}/XXXXXXXXXX.XXXXXX/g' | \ +sed 's/^#\(start\|end\).20..-..-..-..-..-..$/#\1 XXXX-XX-XX-XX-XX-XX/g' | \ +grep -v '#start' | grep -v '#end' -sed 's/[0-9]\{10\}\.[0-9]\{2,8\}/XXXXXXXXXX.XXXXXX/g'