diff --git a/src/Attr.cc b/src/Attr.cc index aed9165182..b877250f52 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -5,7 +5,7 @@ #include "Attr.h" #include "Expr.h" #include "Serializer.h" -#include "LogMgr.h" +#include "logging/Manager.h" const char* attr_name(attr_tag t) { @@ -416,7 +416,7 @@ void Attributes::CheckAttr(Attr* a) break; case ATTR_LOG: - if ( ! LogVal::IsCompatibleType(type) ) + if ( ! logging::Value::IsCompatibleType(type) ) Error("&log applied to a type that cannot be logged"); break; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0e29082db3..61a4847b70 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -213,6 +213,8 @@ binpac_target(syslog.pac ######################################################################## ## bro target +find_package (Threads) + # This macro stores associated headers for any C/C++ source files given # as arguments (past _var) as a list in the CMake variable named "_var". macro(COLLECT_HEADERS _var) @@ -334,10 +336,6 @@ set(bro_SRCS IRC.cc List.cc Reporter.cc - LogMgr.cc - LogWriter.cc - LogWriterAscii.cc - LogWriterNone.cc Login.cc MIME.cc NCP.cc @@ -409,6 +407,17 @@ set(bro_SRCS PacketDumper.cc strsep.c modp_numtoa.c + + threading/BasicThread.cc + threading/Manager.cc + threading/MsgThread.cc + + logging/Manager.cc + logging/WriterBackend.cc + logging/WriterFrontend.cc + logging/writers/Ascii.cc + logging/writers/None.cc + ${dns_SRCS} ${openssl_SRCS} ) @@ -421,7 +430,7 @@ add_definitions(-DBRO_BUILD_PATH="${CMAKE_CURRENT_BINARY_DIR}") add_executable(bro ${bro_SRCS} ${bro_HEADERS}) -target_link_libraries(bro ${brodeps}) +target_link_libraries(bro ${brodeps} ${CMAKE_THREAD_LIBS_INIT}) install(TARGETS bro DESTINATION bin) install(FILES ${INSTALL_BIF_OUTPUTS} DESTINATION ${BRO_SCRIPT_INSTALL_PATH}/base) diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index d60fdd70c8..c41a0552c6 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -15,7 +15,7 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = { { "compressor", 0, false }, {"string", 0, false }, { "notifiers", 0, false }, { "main-loop", 0, false }, { "dpd", 0, false }, { "tm", 0, false }, - { "logging", 0, false } + { "logging", 0, false }, { "threading", 0, false } }; DebugLogger::DebugLogger(const char* filename) diff --git a/src/DebugLogger.h b/src/DebugLogger.h index a2dece5b3c..71e21bfa26 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -24,6 +24,7 @@ enum DebugStream { DBG_DPD, // Dynamic application detection framework DBG_TM, // Time-machine packet input via Brocolli DBG_LOGGING, // Logging streams + DBG_THREADING, // Threading system NUM_DBGS // Has to be last }; diff --git a/src/LogWriter.cc b/src/LogWriter.cc deleted file mode 100644 index 8584a0b0b5..0000000000 --- a/src/LogWriter.cc +++ /dev/null @@ -1,158 +0,0 @@ -// See the file "COPYING" in the main distribution directory for copyright. - -#include "util.h" -#include "LogWriter.h" - -LogWriter::LogWriter() - { - buf = 0; - buf_len = 1024; - buffering = true; - disabled = false; - } - -LogWriter::~LogWriter() - { - if ( buf ) - free(buf); - - for(int i = 0; i < num_fields; ++i) - delete fields[i]; - - delete [] fields; - } - -bool LogWriter::Init(string arg_path, int arg_num_fields, - const LogField* const * arg_fields) - { - path = arg_path; - num_fields = arg_num_fields; - fields = arg_fields; - - if ( ! DoInit(arg_path, arg_num_fields, arg_fields) ) - { - disabled = true; - return false; - } - - return true; - } - -bool LogWriter::Write(int arg_num_fields, LogVal** vals) - { - // Double-check that the arguments match. If we get this from remote, - // something might be mixed up. - if ( num_fields != arg_num_fields ) - { - DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)", - arg_num_fields, num_fields); - - DeleteVals(vals); - return false; - } - - for ( int i = 0; i < num_fields; ++i ) - { - if ( vals[i]->type != fields[i]->type ) - { - DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)", - vals[i]->type, fields[i]->type); - DeleteVals(vals); - return false; - } - } - - bool result = DoWrite(num_fields, fields, vals); - - DeleteVals(vals); - - if ( ! result ) - disabled = true; - - return result; - } - -bool LogWriter::SetBuf(bool enabled) - { - if ( enabled == buffering ) - // No change. - return true; - - buffering = enabled; - - if ( ! DoSetBuf(enabled) ) - { - disabled = true; - return false; - } - - return true; - } - -bool LogWriter::Rotate(string rotated_path, double open, - double close, bool terminating) - { - if ( ! DoRotate(rotated_path, open, close, terminating) ) - { - disabled = true; - return false; - } - - return true; - } - -bool LogWriter::Flush() - { - if ( ! DoFlush() ) - { - disabled = true; - return false; - } - - return true; - } - -void LogWriter::Finish() - { - DoFinish(); - } - -const char* LogWriter::Fmt(const char* format, ...) - { - if ( ! buf ) - buf = (char*) malloc(buf_len); - - va_list al; - va_start(al, format); - int n = safe_vsnprintf(buf, buf_len, format, al); - va_end(al); - - if ( (unsigned int) n >= buf_len ) - { // Not enough room, grow the buffer. - buf_len = n + 32; - buf = (char*) realloc(buf, buf_len); - - // Is it portable to restart? - va_start(al, format); - n = safe_vsnprintf(buf, buf_len, format, al); - va_end(al); - } - - return buf; - } - -void LogWriter::Error(const char *msg) - { - log_mgr->Error(this, msg); - } - -void LogWriter::DeleteVals(LogVal** vals) - { - log_mgr->DeleteVals(num_fields, vals); - } - -bool LogWriter::FinishedRotation(string new_name, string old_name, double open, - double close, bool terminating) - { - return log_mgr->FinishedRotation(this, new_name, old_name, open, close, terminating); - } diff --git a/src/LogWriterNone.cc b/src/LogWriterNone.cc deleted file mode 100644 index 592772afdb..0000000000 --- a/src/LogWriterNone.cc +++ /dev/null @@ -1,16 +0,0 @@ - -#include "LogWriterNone.h" - -bool LogWriterNone::DoRotate(string rotated_path, double open, - double close, bool terminating) - { - if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating)) - { - Error(Fmt("error rotating %s", Path().c_str())); - return false; - } - - return true; - } - - diff --git a/src/LogWriterNone.h b/src/LogWriterNone.h deleted file mode 100644 index 3811a19469..0000000000 --- a/src/LogWriterNone.h +++ /dev/null @@ -1,30 +0,0 @@ -// See the file "COPYING" in the main distribution directory for copyright. -// -// Dummy log writer that just discards everything (but still pretends to rotate). - -#ifndef LOGWRITERNONE_H -#define LOGWRITERNONE_H - -#include "LogWriter.h" - -class LogWriterNone : public LogWriter { -public: - LogWriterNone() {} - ~LogWriterNone() {}; - - static LogWriter* Instantiate() { return new LogWriterNone; } - -protected: - virtual bool DoInit(string path, int num_fields, - const LogField* const * fields) { return true; } - - virtual bool DoWrite(int num_fields, const LogField* const * fields, - LogVal** vals) { return true; } - virtual bool DoSetBuf(bool enabled) { return true; } - virtual bool DoRotate(string rotated_path, double open, double close, - bool terminating); - virtual bool DoFlush() { return true; } - virtual void DoFinish() {} -}; - -#endif diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index b72a6dcc1a..a75812b42b 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -183,8 +183,8 @@ #include "Sessions.h" #include "File.h" #include "Conn.h" -#include "LogMgr.h" #include "Reporter.h" +#include "logging/Manager.h" extern "C" { #include "setsignal.h" @@ -2476,7 +2476,7 @@ bool RemoteSerializer::ProcessRemotePrint() return true; } -bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) +bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields) { loop_over_list(peers, i) { @@ -2486,7 +2486,7 @@ bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string return true; } -bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) +bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields) { SetErrorDescr("logging"); @@ -2540,7 +2540,7 @@ error: return false; } -bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) +bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals) { loop_over_list(peers, i) { @@ -2550,7 +2550,7 @@ bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, i return true; } -bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) +bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals) { if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING ) return false; @@ -2641,7 +2641,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() EnumVal* id_val = 0; EnumVal* writer_val = 0; - LogField** fields = 0; + logging::Field** fields = 0; BinarySerializationFormat fmt; fmt.StartRead(current_args->data, current_args->len); @@ -2658,11 +2658,11 @@ bool RemoteSerializer::ProcessLogCreateWriter() if ( ! success ) goto error; - fields = new LogField* [num_fields]; + fields = new logging::Field* [num_fields]; for ( int i = 0; i < num_fields; i++ ) { - fields[i] = new LogField; + fields[i] = new logging::Field; if ( ! fields[i]->Read(&fmt) ) goto error; } @@ -2703,7 +2703,7 @@ bool RemoteSerializer::ProcessLogWrite() // Unserialize one entry. EnumVal* id_val = 0; EnumVal* writer_val = 0; - LogVal** vals = 0; + logging::Value** vals = 0; int id, writer; string path; @@ -2717,11 +2717,11 @@ bool RemoteSerializer::ProcessLogWrite() if ( ! success ) goto error; - vals = new LogVal* [num_fields]; + vals = new logging::Value* [num_fields]; for ( int i = 0; i < num_fields; i++ ) { - vals[i] = new LogVal; + vals[i] = new logging::Value; if ( ! vals[i]->Read(&fmt) ) goto error; } diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index b64fdcbe66..ba0bde7d41 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -14,8 +14,11 @@ // FIXME: Change this to network byte order class IncrementalSendTimer; -class LogField; -class LogVal; + +namespace logging { + class Field; + class Value; +} // This class handles the communication done in Bro's main loop. class RemoteSerializer : public Serializer, public IOSource { @@ -99,13 +102,13 @@ public: bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len); // Send a request to create a writer on a remote side. - bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); + bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields); // Broadcasts a request to create a writer. - bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); + bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields); // Broadcast a log entry to everybody interested. - bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); + bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals); // Synchronzizes time with all connected peers. Returns number of // current sync-point, or -1 on error. @@ -300,7 +303,7 @@ protected: bool SendID(SerialInfo* info, Peer* peer, const ID& id); bool SendCapabilities(Peer* peer); bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p); - bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); + bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals); void UnregisterHandlers(Peer* peer); void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0); diff --git a/src/logging.bif b/src/logging.bif index 31e1bebacd..c8960b4e38 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -3,8 +3,9 @@ module Log; %%{ -#include "LogMgr.h" #include "NetVar.h" + +#include "logging/Manager.h" %%} type Filter: record; diff --git a/src/LogMgr.cc b/src/logging/Manager.cc similarity index 87% rename from src/LogMgr.cc rename to src/logging/Manager.cc index 28e9a2ac1f..09c5030fdc 100644 --- a/src/LogMgr.cc +++ b/src/logging/Manager.cc @@ -2,33 +2,38 @@ #include -#include "LogMgr.h" -#include "Event.h" -#include "EventHandler.h" -#include "NetVar.h" -#include "Net.h" +#include "../Event.h" +#include "../EventHandler.h" +#include "../NetVar.h" +#include "../Net.h" -#include "LogWriterAscii.h" -#include "LogWriterNone.h" +#include "Manager.h" +#include "WriterFrontend.h" +#include "WriterBackend.h" + +#include "writers/Ascii.h" +#include "writers/None.h" + +using namespace logging; // Structure describing a log writer type. -struct LogWriterDefinition { +struct WriterDefinition { bro_int_t type; // The type. const char *name; // Descriptive name for error messages. bool (*init)(); // An optional one-time initialization function. - LogWriter* (*factory)(); // A factory function creating instances. + WriterBackend* (*factory)(); // A factory function creating instances. }; // Static table defining all availabel log writers. -LogWriterDefinition log_writers[] = { - { BifEnum::Log::WRITER_NONE, "None", 0, LogWriterNone::Instantiate }, - { BifEnum::Log::WRITER_ASCII, "Ascii", 0, LogWriterAscii::Instantiate }, +WriterDefinition log_writers[] = { + { BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate }, + { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate }, // End marker, don't touch. - { BifEnum::Log::WRITER_DEFAULT, "None", 0, (LogWriter* (*)())0 } + { BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)())0 } }; -struct LogMgr::Filter { +struct Manager::Filter { string name; EnumVal* id; Func* pred; @@ -42,7 +47,7 @@ struct LogMgr::Filter { Func* postprocessor; int num_fields; - LogField** fields; + Field** fields; // Vector indexed by field number. Each element is a list of record // indices defining a path leading to the value across potential @@ -52,16 +57,16 @@ struct LogMgr::Filter { ~Filter(); }; -struct LogMgr::WriterInfo { +struct Manager::WriterInfo { EnumVal* type; double open_time; Timer* rotation_timer; double interval; Func* postprocessor; - LogWriter* writer; + WriterFrontend* writer; }; -struct LogMgr::Stream { +struct Manager::Stream { EnumVal* id; bool enabled; string name; @@ -78,7 +83,7 @@ struct LogMgr::Stream { ~Stream(); }; -bool LogField::Read(SerializationFormat* fmt) +bool Field::Read(SerializationFormat* fmt) { int t; int st; @@ -90,12 +95,12 @@ bool LogField::Read(SerializationFormat* fmt) return success; } -bool LogField::Write(SerializationFormat* fmt) const +bool Field::Write(SerializationFormat* fmt) const { return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); } -LogVal::~LogVal() +Value::~Value() { if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) && present ) @@ -118,7 +123,7 @@ LogVal::~LogVal() } } -bool LogVal::IsCompatibleType(BroType* t, bool atomic_only) +bool Value::IsCompatibleType(BroType* t, bool atomic_only) { if ( ! t ) return false; @@ -169,7 +174,7 @@ bool LogVal::IsCompatibleType(BroType* t, bool atomic_only) return false; } -bool LogVal::Read(SerializationFormat* fmt) +bool Value::Read(SerializationFormat* fmt) { int ty; @@ -249,11 +254,11 @@ bool LogVal::Read(SerializationFormat* fmt) if ( ! fmt->Read(&val.set_val.size, "set_size") ) return false; - val.set_val.vals = new LogVal* [val.set_val.size]; + val.set_val.vals = new Value* [val.set_val.size]; for ( int i = 0; i < val.set_val.size; ++i ) { - val.set_val.vals[i] = new LogVal; + val.set_val.vals[i] = new Value; if ( ! val.set_val.vals[i]->Read(fmt) ) return false; @@ -267,11 +272,11 @@ bool LogVal::Read(SerializationFormat* fmt) if ( ! fmt->Read(&val.vector_val.size, "vector_size") ) return false; - val.vector_val.vals = new LogVal* [val.vector_val.size]; + val.vector_val.vals = new Value* [val.vector_val.size]; for ( int i = 0; i < val.vector_val.size; ++i ) { - val.vector_val.vals[i] = new LogVal; + val.vector_val.vals[i] = new Value; if ( ! val.vector_val.vals[i]->Read(fmt) ) return false; @@ -281,13 +286,13 @@ bool LogVal::Read(SerializationFormat* fmt) } default: - reporter->InternalError("unsupported type %s in LogVal::Write", type_name(type)); + reporter->InternalError("unsupported type %s in Value::Write", type_name(type)); } return false; } -bool LogVal::Write(SerializationFormat* fmt) const +bool Value::Write(SerializationFormat* fmt) const { if ( ! (fmt->Write((int)type, "type") && fmt->Write(present, "present")) ) @@ -382,13 +387,13 @@ bool LogVal::Write(SerializationFormat* fmt) const } default: - reporter->InternalError("unsupported type %s in LogVal::REad", type_name(type)); + reporter->InternalError("unsupported type %s in Value::REad", type_name(type)); } return false; } -LogMgr::Filter::~Filter() +Manager::Filter::~Filter() { for ( int i = 0; i < num_fields; ++i ) delete fields[i]; @@ -398,7 +403,7 @@ LogMgr::Filter::~Filter() Unref(path_val); } -LogMgr::Stream::~Stream() +Manager::Stream::~Stream() { Unref(columns); @@ -421,17 +426,64 @@ LogMgr::Stream::~Stream() delete *f; } -LogMgr::LogMgr() +Manager::Manager() { } -LogMgr::~LogMgr() +Manager::~Manager() { for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) delete *s; } -LogMgr::Stream* LogMgr::FindStream(EnumVal* id) +WriterBackend* Manager::CreateBackend(bro_int_t type) + { + WriterDefinition* ld = log_writers; + + while ( true ) + { + if ( ld->type == BifEnum::Log::WRITER_DEFAULT ) + { + reporter->Error("unknow writer when creating writer"); + return 0; + } + + if ( ld->type == type ) + break; + + if ( ! ld->factory ) + // Oops, we can't instantiate this guy. + return 0; + + // If the writer has an init function, call it. + if ( ld->init ) + { + if ( (*ld->init)() ) + // Clear the init function so that we won't + // call it again later. + ld->init = 0; + else + // Init failed, disable by deleting factory + // function. + ld->factory = 0; + + DBG_LOG(DBG_LOGGING, "failed to init writer class %s", + ld->name); + + return false; + } + + ++ld; + } + + assert(ld->factory); + + WriterBackend* backend = (*ld->factory)(); + assert(backend); + return backend; + } + +Manager::Stream* Manager::FindStream(EnumVal* id) { unsigned int idx = id->AsEnum(); @@ -441,7 +493,7 @@ LogMgr::Stream* LogMgr::FindStream(EnumVal* id) return streams[idx]; } -LogMgr::WriterInfo* LogMgr::FindWriter(LogWriter* writer) +Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer) { for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) { @@ -460,7 +512,7 @@ LogMgr::WriterInfo* LogMgr::FindWriter(LogWriter* writer) return 0; } -void LogMgr::RemoveDisabledWriters(Stream* stream) +void Manager::RemoveDisabledWriters(Stream* stream) { list disabled; @@ -468,6 +520,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream) { if ( j->second && j->second->writer->Disabled() ) { + j->second->writer->Stop(); delete j->second; disabled.push_back(j->first); } @@ -477,7 +530,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream) stream->writers.erase(*j); } -bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval) +bool Manager::CreateStream(EnumVal* id, RecordVal* sval) { RecordType* rtype = sval->Type()->AsRecordType(); @@ -497,7 +550,7 @@ bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval) if ( ! (columns->FieldDecl(i)->FindAttr(ATTR_LOG)) ) continue; - if ( ! LogVal::IsCompatibleType(columns->FieldType(i)) ) + if ( ! Value::IsCompatibleType(columns->FieldType(i)) ) { reporter->Error("type of field '%s' is not support for logging output", columns->FieldName(i)); @@ -569,7 +622,7 @@ bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval) return true; } -bool LogMgr::EnableStream(EnumVal* id) +bool Manager::EnableStream(EnumVal* id) { Stream* stream = FindStream(id); @@ -585,7 +638,7 @@ bool LogMgr::EnableStream(EnumVal* id) return true; } -bool LogMgr::DisableStream(EnumVal* id) +bool Manager::DisableStream(EnumVal* id) { Stream* stream = FindStream(id); @@ -602,7 +655,7 @@ bool LogMgr::DisableStream(EnumVal* id) } // Helper for recursive record field unrolling. -bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, +bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, string path, list indices) { for ( int i = 0; i < rt->NumFields(); ++i ) @@ -696,9 +749,9 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, filter->indices.push_back(new_indices); - filter->fields = (LogField**) + filter->fields = (Field**) realloc(filter->fields, - sizeof(LogField) * ++filter->num_fields); + sizeof(Field) * ++filter->num_fields); if ( ! filter->fields ) { @@ -706,14 +759,14 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, return false; } - LogField* field = new LogField(); + Field* field = new Field(); field->name = new_path; field->type = t->Tag(); - if ( field->type == TYPE_TABLE ) + if ( field->type == TYPE_TABLE ) { field->subtype = t->AsSetType()->Indices()->PureType()->Tag(); - } - else if ( field->type == TYPE_VECTOR ) + } + else if ( field->type == TYPE_VECTOR ) { field->subtype = t->AsVectorType()->YieldType()->Tag(); } @@ -723,7 +776,7 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, return true; } -bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) +bool Manager::AddFilter(EnumVal* id, RecordVal* fval) { RecordType* rtype = fval->Type()->AsRecordType(); @@ -819,7 +872,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) for ( int i = 0; i < filter->num_fields; i++ ) { - LogField* field = filter->fields[i]; + Field* field = filter->fields[i]; DBG_LOG(DBG_LOGGING, " field %10s: %s", field->name.c_str(), type_name(field->type)); } @@ -828,12 +881,12 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) return true; } -bool LogMgr::RemoveFilter(EnumVal* id, StringVal* name) +bool Manager::RemoveFilter(EnumVal* id, StringVal* name) { return RemoveFilter(id, name->AsString()->CheckString()); } -bool LogMgr::RemoveFilter(EnumVal* id, string name) +bool Manager::RemoveFilter(EnumVal* id, string name) { Stream* stream = FindStream(id); if ( ! stream ) @@ -860,7 +913,7 @@ bool LogMgr::RemoveFilter(EnumVal* id, string name) return true; } -bool LogMgr::Write(EnumVal* id, RecordVal* columns) +bool Manager::Write(EnumVal* id, RecordVal* columns) { bool error = false; @@ -980,7 +1033,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path)); - LogWriter* writer = 0; + WriterFrontend* writer = 0; if ( w != stream->writers.end() ) // We know this writer already. @@ -990,12 +1043,12 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) { // No, need to create one. - // Copy the fields for LogWriter::Init() as it will take - // ownership. - LogField** arg_fields = new LogField*[filter->num_fields]; + // Copy the fields for WriterFrontend::Init() as it + // will take ownership. + Field** arg_fields = new Field*[filter->num_fields]; for ( int j = 0; j < filter->num_fields; ++j ) - arg_fields[j] = new LogField(*filter->fields[j]); + arg_fields[j] = new Field(*filter->fields[j]); if ( filter->remote ) remote_serializer->SendLogCreateWriter(stream->id, @@ -1034,7 +1087,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) if ( filter->local || filter->remote ) { - LogVal** vals = RecordToFilterVals(stream, filter, columns); + Value** vals = RecordToFilterVals(stream, filter, columns); if ( filter->remote ) remote_serializer->SendLogWrite(stream->id, @@ -1045,11 +1098,9 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) if ( filter->local ) { - assert(writer); - // Write takes ownership of vals. - if ( ! writer->Write(filter->num_fields, vals) ) - error = true; + assert(writer); + writer->Write(filter->num_fields, vals); } else @@ -1072,15 +1123,15 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) return true; } -LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty) +Value* Manager::ValToLogVal(Val* val, BroType* ty) { if ( ! ty ) ty = val->Type(); if ( ! val ) - return new LogVal(ty->Tag(), false); + return new Value(ty->Tag(), false); - LogVal* lval = new LogVal(ty->Tag()); + Value* lval = new Value(ty->Tag()); switch ( lval->type ) { case TYPE_BOOL: @@ -1160,7 +1211,7 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty) set = new ListVal(TYPE_INT); lval->val.set_val.size = set->Length(); - lval->val.set_val.vals = new LogVal* [lval->val.set_val.size]; + lval->val.set_val.vals = new Value* [lval->val.set_val.size]; for ( int i = 0; i < lval->val.set_val.size; i++ ) lval->val.set_val.vals[i] = ValToLogVal(set->Index(i)); @@ -1174,7 +1225,7 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty) VectorVal* vec = val->AsVectorVal(); lval->val.vector_val.size = vec->Size(); lval->val.vector_val.vals = - new LogVal* [lval->val.vector_val.size]; + new Value* [lval->val.vector_val.size]; for ( int i = 0; i < lval->val.vector_val.size; i++ ) { @@ -1193,10 +1244,10 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty) return lval; } -LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter, +Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns) { - LogVal** vals = new LogVal*[filter->num_fields]; + Value** vals = new Value*[filter->num_fields]; for ( int i = 0; i < filter->num_fields; ++i ) { @@ -1215,7 +1266,7 @@ LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter, if ( ! val ) { // Value, or any of its parents, is not set. - vals[i] = new LogVal(filter->fields[i]->type, false); + vals[i] = new Value(filter->fields[i]->type, false); break; } } @@ -1227,8 +1278,8 @@ LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter, return vals; } -LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, LogField** fields) +WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path, + int num_fields, Field** fields) { Stream* stream = FindStream(id); @@ -1244,56 +1295,10 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, // return it. return w->second->writer; - // Need to instantiate a new writer. + WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum()); + assert(writer_obj); - LogWriterDefinition* ld = log_writers; - - while ( true ) - { - if ( ld->type == BifEnum::Log::WRITER_DEFAULT ) - { - reporter->Error("unknow writer when creating writer"); - return 0; - } - - if ( ld->type == writer->AsEnum() ) - break; - - if ( ! ld->factory ) - // Oops, we can't instantiate this guy. - return 0; - - // If the writer has an init function, call it. - if ( ld->init ) - { - if ( (*ld->init)() ) - // Clear the init function so that we won't - // call it again later. - ld->init = 0; - else - // Init failed, disable by deleting factory - // function. - ld->factory = 0; - - DBG_LOG(DBG_LOGGING, "failed to init writer class %s", - ld->name); - - return false; - } - - ++ld; - } - - assert(ld->factory); - LogWriter* writer_obj = (*ld->factory)(); - - if ( ! writer_obj->Init(path, num_fields, fields) ) - { - DBG_LOG(DBG_LOGGING, "failed to init instance of writer %s", - ld->name); - - return 0; - } + writer_obj->Init(path, num_fields, fields); WriterInfo* winfo = new WriterInfo; winfo->type = writer->Ref()->AsEnumVal(); @@ -1338,16 +1343,17 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, return writer_obj; } -void LogMgr::DeleteVals(int num_fields, LogVal** vals) +void Manager::DeleteVals(int num_fields, Value** vals) { + // Note this code is duplicated in WriterBackend::DeleteVals(). for ( int i = 0; i < num_fields; i++ ) delete vals[i]; delete [] vals; } -bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, - LogVal** vals) +bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, + Value** vals) { Stream* stream = FindStream(id); @@ -1357,7 +1363,7 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, #ifdef DEBUG ODesc desc; id->Describe(&desc); - DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()", + DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()", desc.Description()); #endif DeleteVals(num_fields, vals); @@ -1379,23 +1385,24 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, #ifdef DEBUG ODesc desc; id->Describe(&desc); - DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()", + DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()", desc.Description()); #endif DeleteVals(num_fields, vals); return false; } - bool success = (w->second ? w->second->writer->Write(num_fields, vals) : true); + if ( w->second ) + w->second->writer->Write(num_fields, vals); DBG_LOG(DBG_LOGGING, - "Wrote pre-filtered record to path '%s' on stream '%s' [%s]", - path.c_str(), stream->name.c_str(), (success ? "ok" : "error")); + "Wrote pre-filtered record to path '%s' on stream '%s'", + path.c_str(), stream->name.c_str()); - return success; + return true; } -void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer) +void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer) { for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) { @@ -1410,7 +1417,7 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer) if ( ! i->second ) continue; - LogWriter* writer = i->second->writer; + WriterFrontend* writer = i->second->writer; EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); remote_serializer->SendLogCreateWriter(peer, (*s)->id, @@ -1422,7 +1429,7 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer) } } -bool LogMgr::SetBuf(EnumVal* id, bool enabled) +bool Manager::SetBuf(EnumVal* id, bool enabled) { Stream* stream = FindStream(id); if ( ! stream ) @@ -1440,7 +1447,7 @@ bool LogMgr::SetBuf(EnumVal* id, bool enabled) return true; } -bool LogMgr::Flush(EnumVal* id) +bool Manager::Flush(EnumVal* id) { Stream* stream = FindStream(id); if ( ! stream ) @@ -1461,7 +1468,7 @@ bool LogMgr::Flush(EnumVal* id) return true; } -void LogMgr::Error(LogWriter* writer, const char* msg) +void Manager::Error(WriterFrontend* writer, const char* msg) { reporter->Error("error with writer for %s: %s", writer->Path().c_str(), msg); @@ -1470,7 +1477,7 @@ void LogMgr::Error(LogWriter* writer, const char* msg) // Timer which on dispatching rotates the filter. class RotationTimer : public Timer { public: - RotationTimer(double t, LogMgr::WriterInfo* arg_winfo, bool arg_rotate) + RotationTimer(double t, Manager::WriterInfo* arg_winfo, bool arg_rotate) : Timer(t, TIMER_ROTATE) { winfo = arg_winfo; @@ -1482,7 +1489,7 @@ public: void Dispatch(double t, int is_expire); protected: - LogMgr::WriterInfo* winfo; + Manager::WriterInfo* winfo; bool rotate; }; @@ -1506,7 +1513,7 @@ void RotationTimer::Dispatch(double t, int is_expire) } } -void LogMgr::InstallRotationTimer(WriterInfo* winfo) +void Manager::InstallRotationTimer(WriterInfo* winfo) { if ( terminating ) return; @@ -1548,7 +1555,7 @@ void LogMgr::InstallRotationTimer(WriterInfo* winfo) } } -void LogMgr::Rotate(WriterInfo* winfo) +void Manager::Rotate(WriterInfo* winfo) { DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", winfo->writer->Path().c_str(), network_time); @@ -1568,7 +1575,7 @@ void LogMgr::Rotate(WriterInfo* winfo) winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating); } -bool LogMgr::FinishedRotation(LogWriter* writer, string new_name, string old_name, +bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name, double open, double close, bool terminating) { DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", diff --git a/src/LogMgr.h b/src/logging/Manager.h similarity index 68% rename from src/LogMgr.h rename to src/logging/Manager.h index 3eaba360d5..7fa2c271db 100644 --- a/src/LogMgr.h +++ b/src/logging/Manager.h @@ -2,24 +2,28 @@ // // A class managing log writers and filters. -#ifndef LOGMGR_H -#define LOGMGR_H +#ifndef LOGGING_MANAGER_H +#define LOGGING_MANAGER_H -#include "Val.h" -#include "EventHandler.h" -#include "RemoteSerializer.h" +#include "../Val.h" +#include "../EventHandler.h" +#include "../RemoteSerializer.h" class SerializationFormat; +class RemoteSerializer; +class RotationTimer; + +namespace logging { // Description of a log field. -struct LogField { +struct Field { string name; TypeTag type; // inner type of sets TypeTag subtype; - LogField() { subtype = TYPE_VOID; } - LogField(const LogField& other) + Field() { subtype = TYPE_VOID; } + Field(const Field& other) : name(other.name), type(other.type), subtype(other.subtype) { } // (Un-)serialize. @@ -28,13 +32,13 @@ struct LogField { }; // Values as logged by a writer. -struct LogVal { +struct Value { TypeTag type; bool present; // False for unset fields. // The following union is a subset of BroValUnion, including only the // types we can log directly. - struct set_t { bro_int_t size; LogVal** vals; }; + struct set_t { bro_int_t size; Value** vals; }; typedef set_t vec_t; union _val { @@ -48,9 +52,9 @@ struct LogVal { vec_t vector_val; } val; - LogVal(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) + Value(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) : type(arg_type), present(arg_present) {} - ~LogVal(); + ~Value(); // (Un-)serialize. bool Read(SerializationFormat* fmt); @@ -61,17 +65,17 @@ struct LogVal { static bool IsCompatibleType(BroType* t, bool atomic_only=false); private: - LogVal(const LogVal& other) { } + Value(const Value& other) { } }; -class LogWriter; -class RemoteSerializer; -class RotationTimer; +class WriterBackend; +class WriterFrontend; +class RotationFinishedMessage; -class LogMgr { +class Manager { public: - LogMgr(); - ~LogMgr(); + Manager(); + ~Manager(); // These correspond to the BiFs visible on the scripting layer. The // actual BiFs just forward here. @@ -86,19 +90,24 @@ public: bool Flush(EnumVal* id); // Flushes all writers.. protected: - friend class LogWriter; - friend class RemoteSerializer; - friend class RotationTimer; + friend class WriterFrontend; + friend class RotationFinishedMessage; + friend class ::RemoteSerializer; + friend class ::RotationTimer; + + // Instantiates a new WriterBackend of the given type (note that + // doing so creates a new thread!). + WriterBackend* CreateBackend(bro_int_t type); //// Function also used by the RemoteSerializer. // Takes ownership of fields. - LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, LogField** fields); + WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path, + int num_fields, Field** fields); // Takes ownership of values.. bool Write(EnumVal* id, EnumVal* writer, string path, - int num_fields, LogVal** vals); + int num_fields, Value** vals); // Announces all instantiated writers to peer. void SendAllWritersTo(RemoteSerializer::PeerID peer); @@ -106,14 +115,14 @@ protected: //// Functions safe to use by writers. // Signals that a file has been rotated. - bool FinishedRotation(LogWriter* writer, string new_name, string old_name, + bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name, double open, double close, bool terminating); // Reports an error for the given writer. - void Error(LogWriter* writer, const char* msg); + void Error(WriterFrontend* writer, const char* msg); // Deletes the values as passed into Write(). - void DeleteVals(int num_fields, LogVal** vals); + void DeleteVals(int num_fields, Value** vals); private: struct Filter; @@ -123,20 +132,22 @@ private: bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, string path, list indices); - LogVal** RecordToFilterVals(Stream* stream, Filter* filter, + Value** RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns); - LogVal* ValToLogVal(Val* val, BroType* ty = 0); + Value* ValToLogVal(Val* val, BroType* ty = 0); Stream* FindStream(EnumVal* id); void RemoveDisabledWriters(Stream* stream); void InstallRotationTimer(WriterInfo* winfo); void Rotate(WriterInfo* info); Filter* FindFilter(EnumVal* id, StringVal* filter); - WriterInfo* FindWriter(LogWriter* writer); + WriterInfo* FindWriter(WriterFrontend* writer); vector streams; // Indexed by stream enum. }; -extern LogMgr* log_mgr; +} + +extern logging::Manager* log_mgr; #endif diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc new file mode 100644 index 0000000000..095490edc4 --- /dev/null +++ b/src/logging/WriterBackend.cc @@ -0,0 +1,161 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "util.h" + +#include "WriterBackend.h" +#include "WriterFrontend.h" + +// Messages sent from backend to frontend (i.e., "OutputMessages"). + +namespace logging { + +class RotationFinishedMessage : public threading::OutputMessage +{ +public: + RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name, + double open, double close, bool terminating) + : threading::OutputMessage("RotationFinished", writer), + new_name(new_name), old_name(old_name), open(open), + close(close), terminating(terminating) { } + + virtual bool Process() + { + return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating); + } + +private: + string new_name; + string old_name; + double open; + double close; + bool terminating; +}; + +class DisableMessage : public threading::OutputMessage +{ +public: + DisableMessage(WriterFrontend* writer) + : threading::OutputMessage("Disable", writer) {} + + virtual bool Process() { Object()->SetDisable(); return true; } +}; + +} + +// Backend methods. + +using namespace logging; + +WriterBackend::WriterBackend(const string& name) : MsgThread(name) + { + path = ""; + num_fields = 0; + fields = 0; + buffering = true; + } + +WriterBackend::~WriterBackend() + { + if ( fields ) + { + for(int i = 0; i < num_fields; ++i) + delete fields[i]; + + delete [] fields; + } + } + +void WriterBackend::DeleteVals(Value** vals) + { + // Note this code is duplicated in Manager::DeleteVals(). + for ( int i = 0; i < num_fields; i++ ) + delete vals[i]; + + delete [] vals; + } + +bool WriterBackend::FinishedRotation(WriterFrontend* writer, string new_name, string old_name, + double open, double close, bool terminating) + { + SendOut(new RotationFinishedMessage(writer, new_name, old_name, open, close, terminating)); + return true; + } + +bool WriterBackend::Init(string arg_path, int arg_num_fields, + const Field* const * arg_fields) + { + path = arg_path; + num_fields = arg_num_fields; + fields = arg_fields; + + if ( ! DoInit(arg_path, arg_num_fields, arg_fields) ) + return false; + + return true; + } + +bool WriterBackend::Write(int arg_num_fields, Value** vals) + { + // Double-check that the arguments match. If we get this from remote, + // something might be mixed up. + if ( num_fields != arg_num_fields ) + { + +#ifdef DEBUG + const char* msg = Fmt("Number of fields don't match in WriterBackend::Write() (%d vs. %d)", + arg_num_fields, num_fields); + Debug(DBG_LOGGING, msg); +#endif + + DeleteVals(vals); + return false; + } + + for ( int i = 0; i < num_fields; ++i ) + { + if ( vals[i]->type != fields[i]->type ) + { +#ifdef DEBUG + const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)", + vals[i]->type, fields[i]->type); + Debug(DBG_LOGGING, msg); +#endif + + DeleteVals(vals); + return false; + } + } + + bool result = DoWrite(num_fields, fields, vals); + + DeleteVals(vals); + + return result; + } + +bool WriterBackend::SetBuf(bool enabled) + { + if ( enabled == buffering ) + // No change. + return true; + + buffering = enabled; + + return DoSetBuf(enabled); + } + +bool WriterBackend::Rotate(WriterFrontend* writer, string rotated_path, + double open, double close, bool terminating) + { + return DoRotate(writer, rotated_path, open, close, terminating); + } + +bool WriterBackend::Flush() + { + return DoFlush(); + } + +bool WriterBackend::Finish() + { + return DoFinish(); + } diff --git a/src/LogWriter.h b/src/logging/WriterBackend.h similarity index 70% rename from src/LogWriter.h rename to src/logging/WriterBackend.h index 1d2f9fa4b2..d1e4634e6d 100644 --- a/src/LogWriter.h +++ b/src/logging/WriterBackend.h @@ -1,32 +1,22 @@ // See the file "COPYING" in the main distribution directory for copyright. // -// Interface API for a log writer backend. The LogMgr creates a separate -// writer instance of pair of (writer type, output path). -// -// Note thay classes derived from LogWriter must be fully thread-safe and not -// use any non-thread-safe Bro functionality (which includes almost -// everything ...). In particular, do not use fmt() but LogWriter::Fmt()!. -// -// The one exception to this rule is the constructor: it is guaranteed to be -// executed inside the main thread and can thus in particular access global -// script variables. +// Bridge class between main process and writer threads. -#ifndef LOGWRITER_H -#define LOGWRITER_H +#ifndef LOGGING_WRITERBACKEND_H +#define LOGGING_WRITERBACKEND_H -#include "LogMgr.h" -#include "BroString.h" +#include "Manager.h" -class LogWriter { +#include "threading/MsgThread.h" + +namespace logging { + +// The backend runs in its own thread, separate from the main process. +class WriterBackend : public threading::MsgThread +{ public: - LogWriter(); - virtual ~LogWriter(); - - //// Interface methods to interact with the writer. Note that these - //// methods are not necessarily thread-safe and must be called only - //// from the main thread (which will typically mean only from the - //// LogMgr). In particular, they must not be called from the - //// writer's derived implementation. + WriterBackend(const string& name); + virtual ~WriterBackend(); // One-time initialization of the writer to define the logged fields. // Interpretation of "path" is left to the writer, and will be @@ -37,18 +27,18 @@ public: // // The new instance takes ownership of "fields", and will delete them // when done. - bool Init(string path, int num_fields, const LogField* const * fields); + bool Init(string path, int num_fields, const Field* const * fields); // Writes one log entry. The method takes ownership of "vals" and // will return immediately after queueing the write request, which is // potentially before output has actually been written out. // - // num_fields and the types of the LogVals must match what was passed + // num_fields and the types of the Values must match what was passed // to Init(). // // Returns false if an error occured, in which case the writer must // not be used any further. - bool Write(int num_fields, LogVal** vals); + bool Write(int num_fields, Value** vals); // Sets the buffering status for the writer, if the writer supports // that. (If not, it will be ignored). @@ -60,12 +50,12 @@ public: // Triggers rotation, if the writer supports that. (If not, it will // be ignored). - bool Rotate(string rotated_path, double open, double close, bool terminating); + bool Rotate(WriterFrontend* writer, string rotated_path, double open, double close, bool terminating); // Finishes writing to this logger regularly. Must not be called if // an error has been indicated earlier. After calling this, no // further writing must be performed. - void Finish(); + bool Finish(); //// Thread-safe methods that may be called from the writer //// implementation. @@ -73,24 +63,43 @@ public: // The following methods return the information as passed to Init(). const string Path() const { return path; } int NumFields() const { return num_fields; } - const LogField* const * Fields() const { return fields; } + const Field* const * Fields() const { return fields; } + + // Returns the current buffering state. + bool IsBuf() { return buffering; } + + // Signals to the log manager that a file has been rotated. + // + // writer: The frontend writer that triggered the rotation. This must + // be the value passed into DoRotate(). + // + // new_name: The filename of the rotated file. old_name: The filename + // of the origina file. + // + // open/close: The timestamps when the original file was opened and + // closed, respectively. + // + // terminating: True if rotation request occured due to the main Bro + // process shutting down. + bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name, + double open, double close, bool terminating); protected: // Methods for writers to override. If any of these returs false, it // will be assumed that a fatal error has occured that prevents the // writer from further operation. It will then be disabled and - // deleted. When return false, the writer should also report the + // deleted. When returning false, the writer should also report the // error via Error(). Note that even if a writer does not support the // functionality for one these methods (like rotation), it must still // return true if that is not to be considered a fatal error. // // Called once for initialization of the writer. virtual bool DoInit(string path, int num_fields, - const LogField* const * fields) = 0; + const Field* const * fields) = 0; // Called once per log entry to record. - virtual bool DoWrite(int num_fields, const LogField* const * fields, - LogVal** vals) = 0; + virtual bool DoWrite(int num_fields, const Field* const * fields, + Value** vals) = 0; // Called when the buffering status for this writer is changed. If // buffering is disabled, the writer should attempt to write out @@ -119,6 +128,11 @@ protected: // RotationDone() to signal the log manager that potential // postprocessors can now run. // + // "writer" is the frontend writer that triggered the rotation. The + // *only* purpose of this value is to be passed into + // FinishedRotation() once done. You must not otherwise access the + // frontend, it's running in a different thread. + // // "rotate_path" reflects the path to where the rotated output is to // be moved, with specifics depending on the writer. It should // generally be interpreted in a way consistent with that of "path" @@ -135,52 +149,31 @@ protected: // // A writer may ignore rotation requests if it doesn't fit with its // semantics (but must still return true in that case). - virtual bool DoRotate(string rotated_path, double open, double close, - bool terminating) = 0; + virtual bool DoRotate(WriterFrontend* writer, string rotated_path, + double open, double close, bool terminating) = 0; // Called once on termination. Not called when any of the other // methods has previously signaled an error, i.e., executing this // method signals a regular shutdown of the writer. - virtual void DoFinish() = 0; + virtual bool DoFinish() = 0; - //// Methods for writers to use. These are thread-safe. - - // A thread-safe version of fmt(). - const char* Fmt(const char* format, ...); - - // Returns the current buffering state. - bool IsBuf() { return buffering; } - - // Reports an error to the user. - void Error(const char *msg); - - // Signals to the log manager that a file has been rotated. - // - // new_name: The filename of the rotated file. old_name: The filename - // of the origina file. - // - // open/close: The timestamps when the original file was opened and - // closed, respectively. - // - // terminating: True if rotation request occured due to the main Bro - // process shutting down. - bool FinishedRotation(string new_name, string old_name, double open, - double close, bool terminating); + // Triggered by regular heartbeat messages from the main process. + virtual bool DoHeartbeat(double network_time, double current_time) { return true; }; private: - friend class LogMgr; + friend class Manager; // When an error occurs, we call this method to set a flag marking - // the writer as disabled. The LogMgr will check the flag later and + // the writer as disabled. The Manager will check the flag later and // remove the writer. bool Disabled() { return disabled; } // Deletes the values as passed into Write(). - void DeleteVals(LogVal** vals); + void DeleteVals(Value** vals); string path; int num_fields; - const LogField* const * fields; + const Field* const * fields; bool buffering; bool disabled; @@ -189,4 +182,8 @@ private: unsigned int buf_len; }; + +} + #endif + diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc new file mode 100644 index 0000000000..92c93c1c56 --- /dev/null +++ b/src/logging/WriterFrontend.cc @@ -0,0 +1,175 @@ + +#include "WriterFrontend.h" +#include "WriterBackend.h" + +namespace logging { + +// Messages sent from frontend to backend (i.e., "InputMessages"). + +class InitMessage : public threading::InputMessage +{ +public: + InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const *fields) + : threading::InputMessage("Init", backend), + path(path), num_fields(num_fields), fields(fields) { } + + virtual bool Process() { return Object()->Init(path, num_fields, fields); } + +private: + const string path; + const int num_fields; + const Field * const* fields; +}; + +class RotateMessage : public threading::InputMessage +{ +public: + RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open, + const double close, const bool terminating) + : threading::InputMessage("Rotate", backend), + frontend(frontend), + rotated_path(rotated_path), open(open), + close(close), terminating(terminating) { } + + virtual bool Process() { return Object()->Rotate(frontend, rotated_path, open, close, terminating); } + +private: + WriterFrontend* frontend; + const string rotated_path; + const double open; + const double close; + const bool terminating; +}; + +class WriteMessage : public threading::InputMessage +{ +public: + WriteMessage(WriterBackend* backend, const int num_fields, Value **vals) + : threading::InputMessage("Write", backend), + num_fields(num_fields), fields(fields), vals(vals) {} + + virtual bool Process() { return Object()->Write(num_fields, vals); } + +private: + int num_fields; + Field* const* fields; + Value **vals; +}; + +class SetBufMessage : public threading::InputMessage +{ +public: + SetBufMessage(WriterBackend* backend, const bool enabled) + : threading::InputMessage("SetBuf", backend), + enabled(enabled) { } + + virtual bool Process() { return Object()->SetBuf(enabled); } + +private: + const bool enabled; +}; + +class FlushMessage : public threading::InputMessage +{ +public: + FlushMessage(WriterBackend* backend) + : threading::InputMessage("Flush", backend) {} + + virtual bool Process() { return Object()->Flush(); } +}; + +class FinishMessage : public threading::InputMessage +{ +public: + FinishMessage(WriterBackend* backend) + : threading::InputMessage("Finish", backend) {} + + virtual bool Process() { return Object()->Finish(); } +}; + +} + +// Frontend methods. + +using namespace logging; + +WriterFrontend::WriterFrontend(bro_int_t type) + { + disabled = initialized = false; + backend = log_mgr->CreateBackend(type); + + assert(backend); + backend->Start(); + } + +WriterFrontend::~WriterFrontend() + { + } + +void WriterFrontend::Stop() + { + SetDisable(); + backend->Stop(); + } + +void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields) + { + if ( disabled ) + return; + + if ( initialized ) + reporter->InternalError("writer initialize twice"); + + path = arg_path; + num_fields = arg_num_fields; + fields = arg_fields; + + initialized = true; + backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields)); + } + +void WriterFrontend::Write(int num_fields, Value** vals) + { + if ( disabled ) + return; + + backend->SendIn(new WriteMessage(backend, num_fields, vals)); + } + +void WriterFrontend::SetBuf(bool enabled) + { + if ( disabled ) + return; + + backend->SendIn(new SetBufMessage(backend, enabled)); + } + +void WriterFrontend::Flush() + { + if ( disabled ) + return; + + backend->SendIn(new FlushMessage(backend)); + } + +void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) + { + if ( disabled ) + return; + + backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); + } + +void WriterFrontend::Finish() + { + if ( disabled ) + return; + + backend->SendIn(new FinishMessage(backend)); + } + + + + + + diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h new file mode 100644 index 0000000000..1998429d38 --- /dev/null +++ b/src/logging/WriterFrontend.h @@ -0,0 +1,66 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// +// Bridge class between main process and writer threads. + +#ifndef LOGGING_WRITERFRONTEND_H +#define LOGGING_WRITERFRONTEND_H + +#include "Manager.h" + +#include "threading/MsgThread.h" + +namespace logging { + +class WriterBackend; + +class WriterFrontend { +public: + WriterFrontend(bro_int_t type); + virtual ~WriterFrontend(); + + // Disables the writers and stop the backend thread. + void Stop(); + + // Interface methods to interact with the writer from the main thread + // (and only from the main thread), typicalli from the log manager. + // All these methods forward (via inter-thread messaging) to the + // corresponding methods of an internally created WriterBackend. See + // there for documentation. + // + // If any of these operations fails, the writer will be automatically + // (but asynchronoulsy) disabled. + + void Init(string path, int num_fields, const Field* const * fields); + void Write(int num_fields, Value** vals); + void SetBuf(bool enabled); + void Flush(); + void Rotate(string rotated_path, double open, double close, bool terminating); + void Finish(); + + // Calling this disable the writer. All methods calls will be no-ops + // from now on. The Manager will eventually remove disabled writers. + void SetDisable() { disabled = true; } + bool Disabled() { return disabled; } + + const string Path() const { return path; } + int NumFields() const { return num_fields; } + const Field* const * Fields() const { return fields; } + +protected: + friend class Manager; + + + WriterBackend* backend; + bool disabled; + bool initialized; + + string path; + int num_fields; + const Field* const * fields; +}; + +} + + + +#endif diff --git a/src/LogWriterAscii.cc b/src/logging/writers/Ascii.cc similarity index 89% rename from src/LogWriterAscii.cc rename to src/logging/writers/Ascii.cc index d2c1d91370..70f513be3b 100644 --- a/src/LogWriterAscii.cc +++ b/src/logging/writers/Ascii.cc @@ -3,10 +3,14 @@ #include #include -#include "LogWriterAscii.h" -#include "NetVar.h" +#include "../../NetVar.h" -LogWriterAscii::LogWriterAscii() +#include "Ascii.h" + +using namespace logging; +using namespace writer; + +Ascii::Ascii() : WriterBackend("Ascii") { file = 0; @@ -42,7 +46,7 @@ LogWriterAscii::LogWriterAscii() desc.AddEscapeSequence(separator, separator_len); } -LogWriterAscii::~LogWriterAscii() +Ascii::~Ascii() { if ( file ) fclose(file); @@ -54,7 +58,7 @@ LogWriterAscii::~LogWriterAscii() delete [] header_prefix; } -bool LogWriterAscii::WriteHeaderField(const string& key, const string& val) +bool Ascii::WriteHeaderField(const string& key, const string& val) { string str = string(header_prefix, header_prefix_len) + key + string(separator, separator_len) + val + "\n"; @@ -62,8 +66,8 @@ bool LogWriterAscii::WriteHeaderField(const string& key, const string& val) return (fwrite(str.c_str(), str.length(), 1, file) == 1); } -bool LogWriterAscii::DoInit(string path, int num_fields, - const LogField* const * fields) +bool Ascii::DoInit(string path, int num_fields, + const Field* const * fields) { if ( output_to_stdout ) path = "/dev/stdout"; @@ -108,7 +112,7 @@ bool LogWriterAscii::DoInit(string path, int num_fields, types += string(separator, separator_len); } - const LogField* field = fields[i]; + const Field* field = fields[i]; names += field->name; types += type_name(field->type); if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) ) @@ -131,17 +135,18 @@ write_error: return false; } -bool LogWriterAscii::DoFlush() +bool Ascii::DoFlush() { fflush(file); return true; } -void LogWriterAscii::DoFinish() +bool Ascii::DoFinish() { + return true; } -bool LogWriterAscii::DoWriteOne(ODesc* desc, LogVal* val, const LogField* field) +bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) { if ( ! val->present ) { @@ -281,8 +286,8 @@ bool LogWriterAscii::DoWriteOne(ODesc* desc, LogVal* val, const LogField* field) return true; } -bool LogWriterAscii::DoWrite(int num_fields, const LogField* const * fields, - LogVal** vals) +bool Ascii::DoWrite(int num_fields, const Field* const * fields, + Value** vals) { if ( ! file ) DoInit(Path(), NumFields(), Fields()); @@ -312,8 +317,8 @@ bool LogWriterAscii::DoWrite(int num_fields, const LogField* const * fields, return true; } -bool LogWriterAscii::DoRotate(string rotated_path, double open, - double close, bool terminating) +bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open, + double close, bool terminating) { // Don't rotate special files or if there's not one currently open. if ( ! file || IsSpecial(Path()) ) @@ -325,7 +330,7 @@ bool LogWriterAscii::DoRotate(string rotated_path, double open, string nname = rotated_path + "." + LogExt(); rename(fname.c_str(), nname.c_str()); - if ( ! FinishedRotation(nname, fname, open, close, terminating) ) + if ( ! FinishedRotation(writer, nname, fname, open, close, terminating) ) { Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); return false; @@ -334,13 +339,13 @@ bool LogWriterAscii::DoRotate(string rotated_path, double open, return true; } -bool LogWriterAscii::DoSetBuf(bool enabled) +bool Ascii::DoSetBuf(bool enabled) { // Nothing to do. return true; } -string LogWriterAscii::LogExt() +string Ascii::LogExt() { const char* ext = getenv("BRO_LOG_SUFFIX"); if ( ! ext ) ext = "log"; diff --git a/src/LogWriterAscii.h b/src/logging/writers/Ascii.h similarity index 57% rename from src/LogWriterAscii.h rename to src/logging/writers/Ascii.h index 72127c8b1f..37fcfef267 100644 --- a/src/LogWriterAscii.h +++ b/src/logging/writers/Ascii.h @@ -2,33 +2,35 @@ // // Log writer for delimiter-separated ASCII logs. -#ifndef LOGWRITERASCII_H -#define LOGWRITERASCII_H +#ifndef LOGGING_WRITER_ASCII_H +#define LOGGING_WRITER_ASCII_H -#include "LogWriter.h" +#include "../WriterBackend.h" -class LogWriterAscii : public LogWriter { +namespace logging { namespace writer { + +class Ascii : public WriterBackend { public: - LogWriterAscii(); - ~LogWriterAscii(); + Ascii(); + ~Ascii(); - static LogWriter* Instantiate() { return new LogWriterAscii; } + static WriterBackend* Instantiate() { return new Ascii; } static string LogExt(); protected: virtual bool DoInit(string path, int num_fields, - const LogField* const * fields); - virtual bool DoWrite(int num_fields, const LogField* const * fields, - LogVal** vals); + const Field* const * fields); + virtual bool DoWrite(int num_fields, const Field* const * fields, + Value** vals); virtual bool DoSetBuf(bool enabled); - virtual bool DoRotate(string rotated_path, double open, double close, - bool terminating); + virtual bool DoRotate(WriterFrontend* writer, string rotated_path, + double open, double close, bool terminating); virtual bool DoFlush(); - virtual void DoFinish(); + virtual bool DoFinish(); private: bool IsSpecial(string path) { return path.find("/dev/") == 0; } - bool DoWriteOne(ODesc* desc, LogVal* val, const LogField* field); + bool DoWriteOne(ODesc* desc, Value* val, const Field* field); bool WriteHeaderField(const string& key, const string& value); FILE* file; @@ -55,4 +57,8 @@ private: int header_prefix_len; }; +} +} + + #endif diff --git a/src/logging/writers/None.cc b/src/logging/writers/None.cc new file mode 100644 index 0000000000..e419d88a6b --- /dev/null +++ b/src/logging/writers/None.cc @@ -0,0 +1,19 @@ + +#include "None.h" + +using namespace logging; +using namespace writer; + +bool None::DoRotate(WriterFrontend* writer, string rotated_path, + double open, double close, bool terminating) + { + if ( ! FinishedRotation(writer, string("/dev/null"), Path(), open, close, terminating)) + { + Error(Fmt("error rotating %s", Path().c_str())); + return false; + } + + return true; + } + + diff --git a/src/logging/writers/None.h b/src/logging/writers/None.h new file mode 100644 index 0000000000..9b2ab6c698 --- /dev/null +++ b/src/logging/writers/None.h @@ -0,0 +1,35 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// +// Dummy log writer that just discards everything (but still pretends to rotate). + +#ifndef LOGGING_WRITER_NONE_H +#define LOGGING_WRITER_NONE_H + +#include "../WriterBackend.h" + +namespace logging { namespace writer { + +class None : public WriterBackend { +public: + None() : WriterBackend("None") {} + ~None() {}; + + static WriterBackend* Instantiate() { return new None; } + +protected: + virtual bool DoInit(string path, int num_fields, + const Field* const * fields) { return true; } + + virtual bool DoWrite(int num_fields, const Field* const * fields, + Value** vals) { return true; } + virtual bool DoSetBuf(bool enabled) { return true; } + virtual bool DoRotate(WriterFrontend* writer, string rotated_path, + double open, double close, bool terminating); + virtual bool DoFlush() { return true; } + virtual bool DoFinish() { return true; } +}; + +} +} + +#endif diff --git a/src/main.cc b/src/main.cc index bcc0498123..58a23e6c80 100644 --- a/src/main.cc +++ b/src/main.cc @@ -29,7 +29,6 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "Event.h" #include "File.h" #include "Reporter.h" -#include "LogMgr.h" #include "Net.h" #include "NetVar.h" #include "Var.h" @@ -48,7 +47,10 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "DPM.h" #include "BroDoc.h" #include "Brofiler.h" -#include "LogWriterAscii.h" + +#include "threading/Manager.h" +#include "logging/Manager.h" +#include "logging/writers/Ascii.h" #include "binpac_bro.h" @@ -75,7 +77,8 @@ char* writefile = 0; name_list prefixes; DNS_Mgr* dns_mgr; TimerMgr* timer_mgr; -LogMgr* log_mgr; +logging::Manager* log_mgr = 0; +threading::Manager* thread_mgr = 0; Stmt* stmts; EventHandlerPtr net_done = 0; RuleMatcher* rule_matcher = 0; @@ -197,7 +200,7 @@ void usage() fprintf(stderr, " $BRO_PREFIXES | prefix list (%s)\n", bro_prefixes()); fprintf(stderr, " $BRO_DNS_FAKE | disable DNS lookups (%s)\n", bro_dns_fake()); fprintf(stderr, " $BRO_SEED_FILE | file to load seeds from (not set)\n"); - fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", LogWriterAscii::LogExt().c_str()); + fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", logging::writer::Ascii::LogExt().c_str()); fprintf(stderr, " $BRO_PROFILER_FILE | Output file for script execution statistics (not set)\n"); exit(1); @@ -287,6 +290,8 @@ void terminate_bro() if ( remote_serializer ) remote_serializer->LogStats(); + thread_mgr->Terminate(); + delete timer_mgr; delete dns_mgr; delete persistence_serializer; @@ -299,6 +304,7 @@ void terminate_bro() delete remote_serializer; delete dpm; delete log_mgr; + delete thread_mgr; delete reporter; } @@ -661,7 +667,9 @@ int main(int argc, char** argv) set_processing_status("INITIALIZING", "main"); bro_start_time = current_time(true); + reporter = new Reporter(); + thread_mgr = new threading::Manager(); #ifdef DEBUG if ( debug_streams ) @@ -727,7 +735,7 @@ int main(int argc, char** argv) persistence_serializer = new PersistenceSerializer(); remote_serializer = new RemoteSerializer(); event_registry = new EventRegistry(); - log_mgr = new LogMgr(); + log_mgr = new logging::Manager(); if ( events_file ) event_player = new EventPlayer(events_file); @@ -1001,6 +1009,8 @@ int main(int argc, char** argv) have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; + io_sources.Register(thread_mgr, true); + if ( io_sources.Size() > 0 || have_pending_timers ) { if ( profiling_logger ) diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc new file mode 100644 index 0000000000..273a192de3 --- /dev/null +++ b/src/threading/BasicThread.cc @@ -0,0 +1,129 @@ + +#include +#include + +#include "BasicThread.h" +#include "Manager.h" + +using namespace threading; + +BasicThread::BasicThread(const string& arg_name) + { + started = false; + terminating = false; + pthread = 0; + + buf = 0; + buf_len = 1024; + + char tmp[128]; + snprintf(tmp, sizeof(tmp), "%s@%p", arg_name.c_str(), this); + name = string(tmp); + + thread_mgr->AddThread(this); + } + +BasicThread::~BasicThread() + { + } + +const char* BasicThread::Fmt(const char* format, ...) + { + if ( ! buf ) + buf = (char*) malloc(buf_len); + + va_list al; + va_start(al, format); + int n = safe_vsnprintf(buf, buf_len, format, al); + va_end(al); + + if ( (unsigned int) n >= buf_len ) + { // Not enough room, grow the buffer. + buf_len = n + 32; + buf = (char*) realloc(buf, buf_len); + + // Is it portable to restart? + va_start(al, format); + n = safe_vsnprintf(buf, buf_len, format, al); + va_end(al); + } + + return buf; + } + +void BasicThread::Start() + { + if ( sem_init(&terminate, 0, 0) != 0 ) + reporter->FatalError("Cannot create terminate semaphore for thread %s", name.c_str()); + + if ( pthread_create(&pthread, 0, BasicThread::launcher, this) != 0 ) + reporter->FatalError("Cannot create thread %s", name.c_str()); + + DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str()); + + started = true; + + OnStart(); + } + +void BasicThread::Stop() + { + if ( ! started ) + return; + + if ( terminating ) + return; + + DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str()); + + // Signal that it's ok for the thread to exit now. + if ( sem_post(&terminate) != 0 ) + reporter->FatalError("Failure flagging terminate condition for thread %s", name.c_str()); + + terminating = true; + + OnStop(); + } + +void BasicThread::Join() + { + if ( ! started ) + return; + + if ( ! terminating ) + Stop(); + + DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str()); + + if ( pthread_join(pthread, 0) != 0 ) + reporter->FatalError("Failure joining thread %s", name.c_str()); + + sem_destroy(&terminate); + + DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); + + pthread = 0; + } + +void* BasicThread::launcher(void *arg) + { + BasicThread* thread = (BasicThread *)arg; + + // Block signals in thread. We handle signals only in the main + // process. + sigset_t mask_set; + sigfillset(&mask_set); + int res = pthread_sigmask(SIG_BLOCK, &mask_set, 0); + assert(res == 0); // + + // Run thread's main function. + thread->Run(); + + // Wait until somebody actually wants us to terminate. + + if ( sem_wait(&thread->terminate) != 0 ) + reporter->FatalError("Failure flagging terminate condition for thread %s", thread->Name().c_str()); + + return 0; + } + diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h new file mode 100644 index 0000000000..30a11b4505 --- /dev/null +++ b/src/threading/BasicThread.h @@ -0,0 +1,63 @@ + +#ifndef THREADING_BASICTHREAD_H +#define THREADING_BASICTHREAD_H + +#include +#include + +#include "Queue.h" +#include "util.h" + +using namespace std; + +namespace threading { + +class Manager; + +class BasicThread +{ +public: + BasicThread(const string& name); // Managed by manager, must not delete otherwise. + virtual ~BasicThread(); + + const string& Name() const { return name; } + + void Start(); // Spawns the thread and enters Run(). + void Stop(); // Signals the thread to terminate. + + bool Terminating() const { return terminating; } + + // A thread-safe version of fmt(). + const char* Fmt(const char* format, ...); + +protected: + virtual void Run() = 0; + + virtual void OnStart() {} + virtual void OnStop() {} + +private: + friend class Manager; + + static void* launcher(void *arg); + + // Used from the ThreadMgr. + void Join(); // Waits until the thread has terminated and then joins it. + + bool started; // Set to to true once running. + bool terminating; // Set to to true to signal termination. + string name; + + pthread_t pthread; + sem_t terminate; + + // For implementing Fmt(). + char* buf; + unsigned int buf_len; +}; + +} + +extern threading::Manager* thread_mgr; + +#endif diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc new file mode 100644 index 0000000000..ed4d9cf623 --- /dev/null +++ b/src/threading/Manager.cc @@ -0,0 +1,104 @@ + +#include "Manager.h" + +using namespace threading; + +Manager::Manager() + { + DBG_LOG(DBG_THREADING, "Creating thread manager ..."); + + did_process = false; + next_beat = 0; + } + +Manager::~Manager() + { + if ( all_threads.size() ) + Terminate(); + } + +void Manager::Terminate() + { + DBG_LOG(DBG_THREADING, "Terminating thread manager ..."); + + // First process remaining thread output for the message threads. + do Process(); while ( did_process ); + + // Signal all to stop. + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + (*i)->Stop(); + + // Then join them all. + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + { + (*i)->Join(); + delete *i; + } + + all_threads.clear(); + msg_threads.clear(); + } + +void Manager::AddThread(BasicThread* thread) + { + DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name().c_str()); + all_threads.push_back(thread); + } + +void Manager::AddMsgThread(MsgThread* thread) + { + DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name().c_str()); + msg_threads.push_back(thread); + } + +void Manager::GetFds(int* read, int* write, int* except) + { + } + +double Manager::NextTimestamp(double* network_time) + { + if ( did_process || ! next_beat == 0 ) + // If we had something to process last time (or haven't had a + // chance to check yet), we want to check for more asap. + return timer_mgr->Time(); + + // Else we assume we don't have much to do at all and wait for the next heart beat. + return next_beat; + } + +void Manager::Process() + { + bool do_beat = (next_beat == 0 || network_time >= next_beat); + + did_process = false; + + for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) + { + MsgThread* t = *i; + + if ( do_beat ) + t->Heartbeat(); + + if ( ! t->HasOut() ) + continue; + + Message* msg = t->RetrieveOut(); + + if ( msg->Process() ) + did_process = true; + + else + { + string s = msg->Name() + " failed, terminating thread"; + reporter->Error(s.c_str()); + t->Stop(); + } + + delete msg; + } + + if ( do_beat ) + next_beat = network_time + HEART_BEAT_INTERVAL; + } + + diff --git a/src/threading/Manager.h b/src/threading/Manager.h new file mode 100644 index 0000000000..aa7292ee81 --- /dev/null +++ b/src/threading/Manager.h @@ -0,0 +1,52 @@ + +#ifndef THREADING_MANAGER_H +#define THREADING_MANAGER_H + +#include + +#include "IOSource.h" + +#include "BasicThread.h" +#include "MsgThread.h" + +namespace threading { + +class Manager : public IOSource +{ +public: + Manager(); + ~Manager(); + + void Terminate(); + +protected: + friend class BasicThread; + friend class MsgThread; + + void AddThread(BasicThread* thread); + void AddMsgThread(MsgThread* thread); + + // IOSource interface. + virtual void GetFds(int* read, int* write, int* except); + virtual double NextTimestamp(double* network_time); + virtual void Process(); + virtual const char* Tag() { return "threading::Manager"; } + +private: + static const int HEART_BEAT_INTERVAL = 1; + + typedef std::list all_thread_list; + all_thread_list all_threads; + + typedef std::list msg_thread_list; + msg_thread_list msg_threads; + + bool did_process; + double next_beat; +}; + +} + +extern threading::Manager* thread_mgr; + +#endif diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc new file mode 100644 index 0000000000..e2d81cf47f --- /dev/null +++ b/src/threading/MsgThread.cc @@ -0,0 +1,285 @@ + +#include "DebugLogger.h" + +#include "MsgThread.h" +#include "Manager.h" + +using namespace threading; + +static void strreplace(const string& s, const string& o, const string& n) + { + string r = s; + + while ( true ) + { + size_t i = r.find(o); + + if ( i == std::string::npos ) + break; + + r.replace(i, o.size(), n); + } + } + +namespace threading { + +// Standard messages. + +class TerminateMessage : public InputMessage +{ +public: + TerminateMessage(MsgThread* thread) : InputMessage("Terminate", thread) { } + + virtual bool Process() { return true; } +}; + +class ReporterMessage : public OutputMessage +{ +public: + enum Type { + INFO, WARNING, ERROR, FATAL_ERROR, FATAL_ERROR_WITH_CORE, + INTERNAL_WARNING, INTERNAL_ERROR + }; + + ReporterMessage(Type arg_type, MsgThread* thread, const string& arg_msg) + : OutputMessage("ReporterMessage", thread) + { type = arg_type; msg = arg_msg; } + + virtual bool Process(); + +private: + string msg; + Type type; +}; + +class HeartbeatMessage : public InputMessage +{ +public: + HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time) + : InputMessage("Heartbeat", thread) + { network_time = arg_network_time; current_time = arg_current_time; } + + virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); } + +private: + double network_time; + double current_time; +}; + +#ifdef DEBUG +class DebugMessage : public OutputMessage +{ +public: + DebugMessage(DebugStream arg_stream, MsgThread* thread, const string& arg_msg) + : OutputMessage("DebugMessage", thread) + { stream = arg_stream; msg = arg_msg; } + + virtual bool Process() + { + string s = Object()->Name() + ": " + msg; + strreplace(s, "%", "%%"); + debug_logger.Log(stream, s.c_str()); + return true; + } +private: + string msg; + DebugStream stream; +}; +#endif + +} + +// Methods. + +Message::~Message() + { + } + +bool ReporterMessage::Process() + { + string s = Object()->Name() + ": " + msg; + strreplace(s, "%", "%%"); + + const char* cmsg = s.c_str(); + + switch ( type ) { + + case INFO: + reporter->Info(cmsg); + break; + + case WARNING: + reporter->Warning(cmsg); + break; + + case ERROR: + reporter->Error(cmsg); + break; + + case FATAL_ERROR: + reporter->FatalError(cmsg); + break; + + case FATAL_ERROR_WITH_CORE: + reporter->FatalErrorWithCore(cmsg); + break; + + case INTERNAL_WARNING: + reporter->InternalWarning(cmsg); + break; + + case INTERNAL_ERROR : + reporter->InternalError(cmsg); + break; + + default: + reporter->InternalError("unknown ReporterMessage type %d", type); + } + + return true; + } + +MsgThread::MsgThread(const string& name) : BasicThread(name) + { + cnt_sent_in = cnt_sent_out = 0; + thread_mgr->AddMsgThread(this); + } + +void MsgThread::OnStop() + { + // This is to unblock the current queue read operation. + SendIn(new TerminateMessage(this), true); + } + +void MsgThread::Heartbeat() + { + SendIn(new HeartbeatMessage(this, network_time, current_time())); + } + +void MsgThread::Info(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg)); + } + +void MsgThread::Warning(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::WARNING, this, msg)); + } + +void MsgThread::Error(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::ERROR, this, msg)); + } + +void MsgThread::FatalError(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR, this, msg)); + } + +void MsgThread::FatalErrorWithCore(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR_WITH_CORE, this, msg)); + } + +void MsgThread::InternalWarning(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::INTERNAL_WARNING, this, msg)); + } + +void MsgThread::InternalError(const char* msg) + { + SendOut(new ReporterMessage(ReporterMessage::INTERNAL_ERROR, this, msg)); + } + +#ifdef DEBUG + +void MsgThread::Debug(DebugStream stream, const char* msg) + { + SendOut(new DebugMessage(stream, this, msg)); + } + +#endif + +void MsgThread::SendIn(BasicInputMessage* msg, bool force) + { + if ( Terminating() && ! force ) + return; + + DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str()); + + queue_in.Put(msg); + ++cnt_sent_in; + } + + +void MsgThread::SendOut(BasicOutputMessage* msg, bool force) + { + if ( Terminating() && ! force ) + return; + + queue_out.Put(msg); + ++cnt_sent_out; + } + +BasicOutputMessage* MsgThread::RetrieveOut() + { + BasicOutputMessage* msg = queue_out.Get(); + assert(msg); + +#ifdef DEBUG + if ( msg->Name() != "DebugMessage" ) // Avoid recursion. + { + string s = Fmt("Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str()); + Debug(DBG_THREADING, s.c_str()); + } +#endif + + return msg; + } + +BasicInputMessage* MsgThread::RetrieveIn() + { + BasicInputMessage* msg = queue_in.Get(); + assert(msg); + +#ifdef DEBUG + string s = Fmt("Retrieved '%s' in %s", msg->Name().c_str(), Name().c_str()); + Debug(DBG_THREADING, s.c_str()); +#endif + + return msg; + } + +void MsgThread::Run() + { + while ( true ) + { + // When requested to terminate, we only do so when + // all input has been processed. + if ( Terminating() && ! queue_in.Ready() ) + break; + + BasicInputMessage* msg = RetrieveIn(); + + bool result = msg->Process(); + + if ( ! result ) + { + string s = msg->Name() + " failed, terminating thread"; + Error(s.c_str()); + Stop(); + break; + } + + delete msg; + } + } + +void MsgThread::GetStats(Stats* stats) + { + stats->sent_in = cnt_sent_in; + stats->sent_out = cnt_sent_out; + stats->pending_in = cnt_sent_in - queue_in.Size(); + stats->pending_out = cnt_sent_out - queue_out.Size(); + } + diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h new file mode 100644 index 0000000000..2e976c1773 --- /dev/null +++ b/src/threading/MsgThread.h @@ -0,0 +1,157 @@ + +#ifndef THREADING_MSGTHREAD_H +#define THREADING_MSGTHREAD_H + +#include + +#include "DebugLogger.h" + +#include "BasicThread.h" +#include "Queue.h" + +namespace threading { + +class BasicInputMessage; +class BasicOutputMessage; +class HeartbeatMessage; + +class MsgThread : public BasicThread +{ +public: + MsgThread(const string& name); + + void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); } + void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); } + + BasicOutputMessage* RetrieveOut(); + + // Report an informational message, nothing that needs specific + // attention. + void Info(const char* msg); + + // Report a warning that may indicate a problem. + void Warning(const char* msg); + + // Report a non-fatal error. Processing proceeds normally after the error + // has been reported. + void Error(const char* msg); + + // Report a fatal error. Bro will terminate after the message has been + // reported. + void FatalError(const char* msg); + + // Report a fatal error. Bro will terminate after the message has been + // reported and always generate a core dump. + void FatalErrorWithCore(const char* msg); + + // Report about a potential internal problem. Bro will continue + // normally. + void InternalWarning(const char* msg); + + // Report an internal program error. Bro will terminate with a core + // dump after the message has been reported. + void InternalError(const char* msg); + +#ifdef DEBUG + // Records a debug message for the given stream. + void Debug(DebugStream stream, const char* msg); +#endif + + void Heartbeat(); + + struct Stats + { + uint64_t sent_in; + uint64_t sent_out; + uint64_t pending_in; + uint64_t pending_out; + }; + + void GetStats(Stats* stats); + +protected: + friend class HeartbeatMessage; + + virtual void Run(); + virtual void OnStop(); + + virtual bool DoHeartbeat(double network_time, double current_time) { return true; } + +private: + friend class Manager; + + BasicInputMessage* RetrieveIn(); + + void SendIn(BasicInputMessage* msg, bool force); + void SendOut(BasicOutputMessage* msg, bool force); + + bool HasIn() { return queue_in.Ready(); } + bool HasOut() { return queue_out.Ready(); } + + Queue_ queue_in; + Queue_ queue_out; + + uint64_t cnt_sent_in; + uint64_t cnt_sent_out; +}; + +class Message +{ +public: + virtual ~Message(); + + const string& Name() const { return name; } + + virtual bool Process() = 0; // Thread will be terminated if returngin false. + +protected: + Message(const string& arg_name) { name = arg_name; } + +private: + string name; +}; + +class BasicInputMessage : public Message +{ +protected: + BasicInputMessage(const string& name) : Message(name) {} +}; + +class BasicOutputMessage : public Message +{ +protected: + BasicOutputMessage(const string& name) : Message(name) {} +}; + +template +class InputMessage : public BasicInputMessage +{ +public: + O* Object() const { return object; } + +protected: + InputMessage(const string& name, O* arg_object) : BasicInputMessage(name) + { object = arg_object; } + +private: + O* object; +}; + +template +class OutputMessage : public BasicOutputMessage +{ +public: + O* Object() const { return object; } + +protected: + OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name) + { object = arg_object; } + +private: + O* object; +}; + +} + + +#endif diff --git a/src/threading/Queue.h b/src/threading/Queue.h new file mode 100644 index 0000000000..49859dc051 --- /dev/null +++ b/src/threading/Queue.h @@ -0,0 +1,150 @@ + +#ifndef THREADING_QUEUE_H +#define THREADING_QUEUE_H + +#include +#include +#include +#include + +#include "Reporter.h" + +namespace threading { + +/** + * Just a simple threaded queue wrapper class. Uses multiple queues and reads / writes in rotary fashion in an attempt to limit contention. + * Due to locking granularity, bulk put / get is no faster than single put / get as long as FIFO guarantee is required. + */ + +template +class Queue_ +{ +public: + Queue_(); + ~Queue_(); + + T Get(); + void Put(T data); + bool Ready(); + uint64_t Size(); + +private: + static const int NUM_QUEUES = 8; + + pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. + pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available + std::queue messages[NUM_QUEUES]; // Actually holds the queued messages + + int read_ptr; // Where the next operation will read from + int write_ptr; // Where the next operation will write to + uint64_t size; +}; + +inline static void safe_lock(pthread_mutex_t* mutex) + { + if ( pthread_mutex_lock(mutex) != 0 ) + reporter->FatalErrorWithCore("cannot lock mutex"); + } + +inline static void safe_unlock(pthread_mutex_t* mutex) + { + if ( pthread_mutex_unlock(mutex) != 0 ) + reporter->FatalErrorWithCore("cannot unlock mutex"); + } + +template +inline Queue_::Queue_() + { + read_ptr = 0; + write_ptr = 0; + + for( int i = 0; i < NUM_QUEUES; ++i ) + { + if ( pthread_cond_init(&has_data[i], NULL) != 0 ) + reporter->FatalError("cannot init queue condition variable"); + + if ( pthread_mutex_init(&mutex[i], NULL) != 0 ) + reporter->FatalError("cannot init queue mutex"); + } + } + +template +inline Queue_::~Queue_() + { + for( int i = 0; i < NUM_QUEUES; ++i ) + { + pthread_cond_destroy(&has_data[i]); + pthread_mutex_destroy(&mutex[i]); + } + } + +template +inline T Queue_::Get() + { + safe_lock(&mutex[read_ptr]); + + int old_read_ptr = read_ptr; + + if ( messages[read_ptr].empty() ) + pthread_cond_wait(&has_data[read_ptr], &mutex[read_ptr]); + + T data = messages[read_ptr].front(); + messages[read_ptr].pop(); + --size; + + read_ptr = (read_ptr + 1) % NUM_QUEUES; + + safe_unlock(&mutex[old_read_ptr]); + + return data; + } + +template +inline void Queue_::Put(T data) + { + safe_lock(&mutex[write_ptr]); + + int old_write_ptr = write_ptr; + + bool need_signal = messages[write_ptr].empty(); + + messages[write_ptr].push(data); + ++size; + + if ( need_signal ) + pthread_cond_signal(&has_data[write_ptr]); + + write_ptr = (write_ptr + 1) % NUM_QUEUES; + + safe_unlock(&mutex[old_write_ptr]); + } + + +template +inline bool Queue_::Ready() + { + safe_lock(&mutex[read_ptr]); + + bool ret = (messages[read_ptr].size()); + + safe_unlock(&mutex[read_ptr]); + + return ret; + } + +template +inline uint64_t Queue_::Size() + { + safe_lock(&mutex[read_ptr]); + + uint64_t s = size; + + safe_unlock(&mutex[read_ptr]); + + return s; + } + +} + +#endif +