Threaded logging framework.

This is based on Gilbert's code but I ended up refactoring it quite a
bit. That's why I didn't do a direct merge but started with a new
branch and copied things over to adapt. It looks quite a bit different
now as I tried to generalize things a bit more to also support the
Input Framework.

The larger changes code are:

    - Moved all logging code into subdirectory src/logging/. Code
      here is in namespace "logging".

    - Moved all threading code into subdirectory src/threading/. Code
      here is in namespace "threading".

    - Introduced a central thread manager that tracks threads and is
      in charge of termination and (eventually) statistics.

    - Refactored logging independent threading code into base classes
      BasicThread and MsgThread. The former encapsulates all the
      pthread code with simple start/stop methods and provides a
      single Run() method to override.

      The latter is derived from BasicThread and adds bi-directional
      message passing between main and child threads. The hope is that
      the Input Framework can reuse this part quite directly.

    - A log writer is now split into a general WriterFrontend
      (LogEmissary in Gilbert's code) and a type-specific
      WriterBackend. Specific writers are implemented by deriving from
      the latter. (The plugin interface is almost unchanged compared
      to the 2.0 version.).

      Frontend and backend communicate via MsgThread's message
      passing.

    - MsgThread (and thus WriterBackend) has a Heartbeat() method that
      a thread can override to execute code on a regular basis. It's
      triggered roughly once a second by the main thread.

    - Integration into "the rest of Bro". Threads can send messages to
      the reporter and do debugging output; they are hooked into the
      I/O loop for sending messages back; and there's a new debugging
      stream "threading" that logs, well, threading activity.

This all seems to work for the most part, but it's not done yet.

TODO list:

    - Not all tests pass yet. In particular, diffs for the external
      tests seem to indicate some memory problem (no crashes, just an
      occasional weird character).

    - Only tested in --enable-debug mode.

    - Only tested on Linux.

    - Needs leak check.

    - Each log write is currently a single inter-thread message. Bring
      Gilbert's bulk writes back.

    - Code needs further cleanup.

    - Document the class API.

    - Document the internal structure of the logging framework.

    - Check for robustness: live traffic, aborting, signals, etc.

    - Add thread statistics to profile.log (most of the code is there).

    - Customize the OS-visible thread names on platforms that support it.
This commit is contained in:
Robin Sommer 2012-01-26 17:47:36 -08:00
parent 60ae6f01d1
commit e4e770d475
28 changed files with 1745 additions and 503 deletions

View file

@ -5,7 +5,7 @@
#include "Attr.h" #include "Attr.h"
#include "Expr.h" #include "Expr.h"
#include "Serializer.h" #include "Serializer.h"
#include "LogMgr.h" #include "logging/Manager.h"
const char* attr_name(attr_tag t) const char* attr_name(attr_tag t)
{ {
@ -416,7 +416,7 @@ void Attributes::CheckAttr(Attr* a)
break; break;
case ATTR_LOG: case ATTR_LOG:
if ( ! LogVal::IsCompatibleType(type) ) if ( ! logging::Value::IsCompatibleType(type) )
Error("&log applied to a type that cannot be logged"); Error("&log applied to a type that cannot be logged");
break; break;

View file

@ -213,6 +213,8 @@ binpac_target(syslog.pac
######################################################################## ########################################################################
## bro target ## bro target
find_package (Threads)
# This macro stores associated headers for any C/C++ source files given # This macro stores associated headers for any C/C++ source files given
# as arguments (past _var) as a list in the CMake variable named "_var". # as arguments (past _var) as a list in the CMake variable named "_var".
macro(COLLECT_HEADERS _var) macro(COLLECT_HEADERS _var)
@ -334,10 +336,6 @@ set(bro_SRCS
IRC.cc IRC.cc
List.cc List.cc
Reporter.cc Reporter.cc
LogMgr.cc
LogWriter.cc
LogWriterAscii.cc
LogWriterNone.cc
Login.cc Login.cc
MIME.cc MIME.cc
NCP.cc NCP.cc
@ -409,6 +407,17 @@ set(bro_SRCS
PacketDumper.cc PacketDumper.cc
strsep.c strsep.c
modp_numtoa.c modp_numtoa.c
threading/BasicThread.cc
threading/Manager.cc
threading/MsgThread.cc
logging/Manager.cc
logging/WriterBackend.cc
logging/WriterFrontend.cc
logging/writers/Ascii.cc
logging/writers/None.cc
${dns_SRCS} ${dns_SRCS}
${openssl_SRCS} ${openssl_SRCS}
) )
@ -421,7 +430,7 @@ add_definitions(-DBRO_BUILD_PATH="${CMAKE_CURRENT_BINARY_DIR}")
add_executable(bro ${bro_SRCS} ${bro_HEADERS}) add_executable(bro ${bro_SRCS} ${bro_HEADERS})
target_link_libraries(bro ${brodeps}) target_link_libraries(bro ${brodeps} ${CMAKE_THREAD_LIBS_INIT})
install(TARGETS bro DESTINATION bin) install(TARGETS bro DESTINATION bin)
install(FILES ${INSTALL_BIF_OUTPUTS} DESTINATION ${BRO_SCRIPT_INSTALL_PATH}/base) install(FILES ${INSTALL_BIF_OUTPUTS} DESTINATION ${BRO_SCRIPT_INSTALL_PATH}/base)

View file

@ -15,7 +15,7 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = {
{ "compressor", 0, false }, {"string", 0, false }, { "compressor", 0, false }, {"string", 0, false },
{ "notifiers", 0, false }, { "main-loop", 0, false }, { "notifiers", 0, false }, { "main-loop", 0, false },
{ "dpd", 0, false }, { "tm", 0, false }, { "dpd", 0, false }, { "tm", 0, false },
{ "logging", 0, false } { "logging", 0, false }, { "threading", 0, false }
}; };
DebugLogger::DebugLogger(const char* filename) DebugLogger::DebugLogger(const char* filename)

View file

@ -24,6 +24,7 @@ enum DebugStream {
DBG_DPD, // Dynamic application detection framework DBG_DPD, // Dynamic application detection framework
DBG_TM, // Time-machine packet input via Brocolli DBG_TM, // Time-machine packet input via Brocolli
DBG_LOGGING, // Logging streams DBG_LOGGING, // Logging streams
DBG_THREADING, // Threading system
NUM_DBGS // Has to be last NUM_DBGS // Has to be last
}; };

View file

@ -1,158 +0,0 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "util.h"
#include "LogWriter.h"
LogWriter::LogWriter()
{
buf = 0;
buf_len = 1024;
buffering = true;
disabled = false;
}
LogWriter::~LogWriter()
{
if ( buf )
free(buf);
for(int i = 0; i < num_fields; ++i)
delete fields[i];
delete [] fields;
}
bool LogWriter::Init(string arg_path, int arg_num_fields,
const LogField* const * arg_fields)
{
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
{
disabled = true;
return false;
}
return true;
}
bool LogWriter::Write(int arg_num_fields, LogVal** vals)
{
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
if ( num_fields != arg_num_fields )
{
DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)",
arg_num_fields, num_fields);
DeleteVals(vals);
return false;
}
for ( int i = 0; i < num_fields; ++i )
{
if ( vals[i]->type != fields[i]->type )
{
DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)",
vals[i]->type, fields[i]->type);
DeleteVals(vals);
return false;
}
}
bool result = DoWrite(num_fields, fields, vals);
DeleteVals(vals);
if ( ! result )
disabled = true;
return result;
}
bool LogWriter::SetBuf(bool enabled)
{
if ( enabled == buffering )
// No change.
return true;
buffering = enabled;
if ( ! DoSetBuf(enabled) )
{
disabled = true;
return false;
}
return true;
}
bool LogWriter::Rotate(string rotated_path, double open,
double close, bool terminating)
{
if ( ! DoRotate(rotated_path, open, close, terminating) )
{
disabled = true;
return false;
}
return true;
}
bool LogWriter::Flush()
{
if ( ! DoFlush() )
{
disabled = true;
return false;
}
return true;
}
void LogWriter::Finish()
{
DoFinish();
}
const char* LogWriter::Fmt(const char* format, ...)
{
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al;
va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) realloc(buf, buf_len);
// Is it portable to restart?
va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
}
return buf;
}
void LogWriter::Error(const char *msg)
{
log_mgr->Error(this, msg);
}
void LogWriter::DeleteVals(LogVal** vals)
{
log_mgr->DeleteVals(num_fields, vals);
}
bool LogWriter::FinishedRotation(string new_name, string old_name, double open,
double close, bool terminating)
{
return log_mgr->FinishedRotation(this, new_name, old_name, open, close, terminating);
}

View file

@ -1,16 +0,0 @@
#include "LogWriterNone.h"
bool LogWriterNone::DoRotate(string rotated_path, double open,
double close, bool terminating)
{
if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating))
{
Error(Fmt("error rotating %s", Path().c_str()));
return false;
}
return true;
}

View file

@ -1,30 +0,0 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Dummy log writer that just discards everything (but still pretends to rotate).
#ifndef LOGWRITERNONE_H
#define LOGWRITERNONE_H
#include "LogWriter.h"
class LogWriterNone : public LogWriter {
public:
LogWriterNone() {}
~LogWriterNone() {};
static LogWriter* Instantiate() { return new LogWriterNone; }
protected:
virtual bool DoInit(string path, int num_fields,
const LogField* const * fields) { return true; }
virtual bool DoWrite(int num_fields, const LogField* const * fields,
LogVal** vals) { return true; }
virtual bool DoSetBuf(bool enabled) { return true; }
virtual bool DoRotate(string rotated_path, double open, double close,
bool terminating);
virtual bool DoFlush() { return true; }
virtual void DoFinish() {}
};
#endif

View file

@ -183,8 +183,8 @@
#include "Sessions.h" #include "Sessions.h"
#include "File.h" #include "File.h"
#include "Conn.h" #include "Conn.h"
#include "LogMgr.h"
#include "Reporter.h" #include "Reporter.h"
#include "logging/Manager.h"
extern "C" { extern "C" {
#include "setsignal.h" #include "setsignal.h"
@ -2476,7 +2476,7 @@ bool RemoteSerializer::ProcessRemotePrint()
return true; return true;
} }
bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields)
{ {
loop_over_list(peers, i) loop_over_list(peers, i)
{ {
@ -2486,7 +2486,7 @@ bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string
return true; return true;
} }
bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields)
{ {
SetErrorDescr("logging"); SetErrorDescr("logging");
@ -2540,7 +2540,7 @@ error:
return false; return false;
} }
bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals)
{ {
loop_over_list(peers, i) loop_over_list(peers, i)
{ {
@ -2550,7 +2550,7 @@ bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, i
return true; return true;
} }
bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals)
{ {
if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING ) if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING )
return false; return false;
@ -2641,7 +2641,7 @@ bool RemoteSerializer::ProcessLogCreateWriter()
EnumVal* id_val = 0; EnumVal* id_val = 0;
EnumVal* writer_val = 0; EnumVal* writer_val = 0;
LogField** fields = 0; logging::Field** fields = 0;
BinarySerializationFormat fmt; BinarySerializationFormat fmt;
fmt.StartRead(current_args->data, current_args->len); fmt.StartRead(current_args->data, current_args->len);
@ -2658,11 +2658,11 @@ bool RemoteSerializer::ProcessLogCreateWriter()
if ( ! success ) if ( ! success )
goto error; goto error;
fields = new LogField* [num_fields]; fields = new logging::Field* [num_fields];
for ( int i = 0; i < num_fields; i++ ) for ( int i = 0; i < num_fields; i++ )
{ {
fields[i] = new LogField; fields[i] = new logging::Field;
if ( ! fields[i]->Read(&fmt) ) if ( ! fields[i]->Read(&fmt) )
goto error; goto error;
} }
@ -2703,7 +2703,7 @@ bool RemoteSerializer::ProcessLogWrite()
// Unserialize one entry. // Unserialize one entry.
EnumVal* id_val = 0; EnumVal* id_val = 0;
EnumVal* writer_val = 0; EnumVal* writer_val = 0;
LogVal** vals = 0; logging::Value** vals = 0;
int id, writer; int id, writer;
string path; string path;
@ -2717,11 +2717,11 @@ bool RemoteSerializer::ProcessLogWrite()
if ( ! success ) if ( ! success )
goto error; goto error;
vals = new LogVal* [num_fields]; vals = new logging::Value* [num_fields];
for ( int i = 0; i < num_fields; i++ ) for ( int i = 0; i < num_fields; i++ )
{ {
vals[i] = new LogVal; vals[i] = new logging::Value;
if ( ! vals[i]->Read(&fmt) ) if ( ! vals[i]->Read(&fmt) )
goto error; goto error;
} }

View file

@ -14,8 +14,11 @@
// FIXME: Change this to network byte order // FIXME: Change this to network byte order
class IncrementalSendTimer; class IncrementalSendTimer;
class LogField;
class LogVal; namespace logging {
class Field;
class Value;
}
// This class handles the communication done in Bro's main loop. // This class handles the communication done in Bro's main loop.
class RemoteSerializer : public Serializer, public IOSource { class RemoteSerializer : public Serializer, public IOSource {
@ -99,13 +102,13 @@ public:
bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len); bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len);
// Send a request to create a writer on a remote side. // Send a request to create a writer on a remote side.
bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields);
// Broadcasts a request to create a writer. // Broadcasts a request to create a writer.
bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields);
// Broadcast a log entry to everybody interested. // Broadcast a log entry to everybody interested.
bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals);
// Synchronzizes time with all connected peers. Returns number of // Synchronzizes time with all connected peers. Returns number of
// current sync-point, or -1 on error. // current sync-point, or -1 on error.
@ -300,7 +303,7 @@ protected:
bool SendID(SerialInfo* info, Peer* peer, const ID& id); bool SendID(SerialInfo* info, Peer* peer, const ID& id);
bool SendCapabilities(Peer* peer); bool SendCapabilities(Peer* peer);
bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p); bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p);
bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals);
void UnregisterHandlers(Peer* peer); void UnregisterHandlers(Peer* peer);
void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0); void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0);

View file

@ -3,8 +3,9 @@
module Log; module Log;
%%{ %%{
#include "LogMgr.h"
#include "NetVar.h" #include "NetVar.h"
#include "logging/Manager.h"
%%} %%}
type Filter: record; type Filter: record;

View file

@ -2,33 +2,38 @@
#include <algorithm> #include <algorithm>
#include "LogMgr.h" #include "../Event.h"
#include "Event.h" #include "../EventHandler.h"
#include "EventHandler.h" #include "../NetVar.h"
#include "NetVar.h" #include "../Net.h"
#include "Net.h"
#include "LogWriterAscii.h" #include "Manager.h"
#include "LogWriterNone.h" #include "WriterFrontend.h"
#include "WriterBackend.h"
#include "writers/Ascii.h"
#include "writers/None.h"
using namespace logging;
// Structure describing a log writer type. // Structure describing a log writer type.
struct LogWriterDefinition { struct WriterDefinition {
bro_int_t type; // The type. bro_int_t type; // The type.
const char *name; // Descriptive name for error messages. const char *name; // Descriptive name for error messages.
bool (*init)(); // An optional one-time initialization function. bool (*init)(); // An optional one-time initialization function.
LogWriter* (*factory)(); // A factory function creating instances. WriterBackend* (*factory)(); // A factory function creating instances.
}; };
// Static table defining all availabel log writers. // Static table defining all availabel log writers.
LogWriterDefinition log_writers[] = { WriterDefinition log_writers[] = {
{ BifEnum::Log::WRITER_NONE, "None", 0, LogWriterNone::Instantiate }, { BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, LogWriterAscii::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
// End marker, don't touch. // End marker, don't touch.
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (LogWriter* (*)())0 } { BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)())0 }
}; };
struct LogMgr::Filter { struct Manager::Filter {
string name; string name;
EnumVal* id; EnumVal* id;
Func* pred; Func* pred;
@ -42,7 +47,7 @@ struct LogMgr::Filter {
Func* postprocessor; Func* postprocessor;
int num_fields; int num_fields;
LogField** fields; Field** fields;
// Vector indexed by field number. Each element is a list of record // Vector indexed by field number. Each element is a list of record
// indices defining a path leading to the value across potential // indices defining a path leading to the value across potential
@ -52,16 +57,16 @@ struct LogMgr::Filter {
~Filter(); ~Filter();
}; };
struct LogMgr::WriterInfo { struct Manager::WriterInfo {
EnumVal* type; EnumVal* type;
double open_time; double open_time;
Timer* rotation_timer; Timer* rotation_timer;
double interval; double interval;
Func* postprocessor; Func* postprocessor;
LogWriter* writer; WriterFrontend* writer;
}; };
struct LogMgr::Stream { struct Manager::Stream {
EnumVal* id; EnumVal* id;
bool enabled; bool enabled;
string name; string name;
@ -78,7 +83,7 @@ struct LogMgr::Stream {
~Stream(); ~Stream();
}; };
bool LogField::Read(SerializationFormat* fmt) bool Field::Read(SerializationFormat* fmt)
{ {
int t; int t;
int st; int st;
@ -90,12 +95,12 @@ bool LogField::Read(SerializationFormat* fmt)
return success; return success;
} }
bool LogField::Write(SerializationFormat* fmt) const bool Field::Write(SerializationFormat* fmt) const
{ {
return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype"));
} }
LogVal::~LogVal() Value::~Value()
{ {
if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC)
&& present ) && present )
@ -118,7 +123,7 @@ LogVal::~LogVal()
} }
} }
bool LogVal::IsCompatibleType(BroType* t, bool atomic_only) bool Value::IsCompatibleType(BroType* t, bool atomic_only)
{ {
if ( ! t ) if ( ! t )
return false; return false;
@ -169,7 +174,7 @@ bool LogVal::IsCompatibleType(BroType* t, bool atomic_only)
return false; return false;
} }
bool LogVal::Read(SerializationFormat* fmt) bool Value::Read(SerializationFormat* fmt)
{ {
int ty; int ty;
@ -249,11 +254,11 @@ bool LogVal::Read(SerializationFormat* fmt)
if ( ! fmt->Read(&val.set_val.size, "set_size") ) if ( ! fmt->Read(&val.set_val.size, "set_size") )
return false; return false;
val.set_val.vals = new LogVal* [val.set_val.size]; val.set_val.vals = new Value* [val.set_val.size];
for ( int i = 0; i < val.set_val.size; ++i ) for ( int i = 0; i < val.set_val.size; ++i )
{ {
val.set_val.vals[i] = new LogVal; val.set_val.vals[i] = new Value;
if ( ! val.set_val.vals[i]->Read(fmt) ) if ( ! val.set_val.vals[i]->Read(fmt) )
return false; return false;
@ -267,11 +272,11 @@ bool LogVal::Read(SerializationFormat* fmt)
if ( ! fmt->Read(&val.vector_val.size, "vector_size") ) if ( ! fmt->Read(&val.vector_val.size, "vector_size") )
return false; return false;
val.vector_val.vals = new LogVal* [val.vector_val.size]; val.vector_val.vals = new Value* [val.vector_val.size];
for ( int i = 0; i < val.vector_val.size; ++i ) for ( int i = 0; i < val.vector_val.size; ++i )
{ {
val.vector_val.vals[i] = new LogVal; val.vector_val.vals[i] = new Value;
if ( ! val.vector_val.vals[i]->Read(fmt) ) if ( ! val.vector_val.vals[i]->Read(fmt) )
return false; return false;
@ -281,13 +286,13 @@ bool LogVal::Read(SerializationFormat* fmt)
} }
default: default:
reporter->InternalError("unsupported type %s in LogVal::Write", type_name(type)); reporter->InternalError("unsupported type %s in Value::Write", type_name(type));
} }
return false; return false;
} }
bool LogVal::Write(SerializationFormat* fmt) const bool Value::Write(SerializationFormat* fmt) const
{ {
if ( ! (fmt->Write((int)type, "type") && if ( ! (fmt->Write((int)type, "type") &&
fmt->Write(present, "present")) ) fmt->Write(present, "present")) )
@ -382,13 +387,13 @@ bool LogVal::Write(SerializationFormat* fmt) const
} }
default: default:
reporter->InternalError("unsupported type %s in LogVal::REad", type_name(type)); reporter->InternalError("unsupported type %s in Value::REad", type_name(type));
} }
return false; return false;
} }
LogMgr::Filter::~Filter() Manager::Filter::~Filter()
{ {
for ( int i = 0; i < num_fields; ++i ) for ( int i = 0; i < num_fields; ++i )
delete fields[i]; delete fields[i];
@ -398,7 +403,7 @@ LogMgr::Filter::~Filter()
Unref(path_val); Unref(path_val);
} }
LogMgr::Stream::~Stream() Manager::Stream::~Stream()
{ {
Unref(columns); Unref(columns);
@ -421,17 +426,64 @@ LogMgr::Stream::~Stream()
delete *f; delete *f;
} }
LogMgr::LogMgr() Manager::Manager()
{ {
} }
LogMgr::~LogMgr() Manager::~Manager()
{ {
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s ) for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
delete *s; delete *s;
} }
LogMgr::Stream* LogMgr::FindStream(EnumVal* id) WriterBackend* Manager::CreateBackend(bro_int_t type)
{
WriterDefinition* ld = log_writers;
while ( true )
{
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
{
reporter->Error("unknow writer when creating writer");
return 0;
}
if ( ld->type == type )
break;
if ( ! ld->factory )
// Oops, we can't instantiate this guy.
return 0;
// If the writer has an init function, call it.
if ( ld->init )
{
if ( (*ld->init)() )
// Clear the init function so that we won't
// call it again later.
ld->init = 0;
else
// Init failed, disable by deleting factory
// function.
ld->factory = 0;
DBG_LOG(DBG_LOGGING, "failed to init writer class %s",
ld->name);
return false;
}
++ld;
}
assert(ld->factory);
WriterBackend* backend = (*ld->factory)();
assert(backend);
return backend;
}
Manager::Stream* Manager::FindStream(EnumVal* id)
{ {
unsigned int idx = id->AsEnum(); unsigned int idx = id->AsEnum();
@ -441,7 +493,7 @@ LogMgr::Stream* LogMgr::FindStream(EnumVal* id)
return streams[idx]; return streams[idx];
} }
LogMgr::WriterInfo* LogMgr::FindWriter(LogWriter* writer) Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer)
{ {
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s ) for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{ {
@ -460,7 +512,7 @@ LogMgr::WriterInfo* LogMgr::FindWriter(LogWriter* writer)
return 0; return 0;
} }
void LogMgr::RemoveDisabledWriters(Stream* stream) void Manager::RemoveDisabledWriters(Stream* stream)
{ {
list<Stream::WriterPathPair> disabled; list<Stream::WriterPathPair> disabled;
@ -468,6 +520,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
{ {
if ( j->second && j->second->writer->Disabled() ) if ( j->second && j->second->writer->Disabled() )
{ {
j->second->writer->Stop();
delete j->second; delete j->second;
disabled.push_back(j->first); disabled.push_back(j->first);
} }
@ -477,7 +530,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
stream->writers.erase(*j); stream->writers.erase(*j);
} }
bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval) bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
{ {
RecordType* rtype = sval->Type()->AsRecordType(); RecordType* rtype = sval->Type()->AsRecordType();
@ -497,7 +550,7 @@ bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval)
if ( ! (columns->FieldDecl(i)->FindAttr(ATTR_LOG)) ) if ( ! (columns->FieldDecl(i)->FindAttr(ATTR_LOG)) )
continue; continue;
if ( ! LogVal::IsCompatibleType(columns->FieldType(i)) ) if ( ! Value::IsCompatibleType(columns->FieldType(i)) )
{ {
reporter->Error("type of field '%s' is not support for logging output", reporter->Error("type of field '%s' is not support for logging output",
columns->FieldName(i)); columns->FieldName(i));
@ -569,7 +622,7 @@ bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval)
return true; return true;
} }
bool LogMgr::EnableStream(EnumVal* id) bool Manager::EnableStream(EnumVal* id)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -585,7 +638,7 @@ bool LogMgr::EnableStream(EnumVal* id)
return true; return true;
} }
bool LogMgr::DisableStream(EnumVal* id) bool Manager::DisableStream(EnumVal* id)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -602,7 +655,7 @@ bool LogMgr::DisableStream(EnumVal* id)
} }
// Helper for recursive record field unrolling. // Helper for recursive record field unrolling.
bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices) TableVal* include, TableVal* exclude, string path, list<int> indices)
{ {
for ( int i = 0; i < rt->NumFields(); ++i ) for ( int i = 0; i < rt->NumFields(); ++i )
@ -696,9 +749,9 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
filter->indices.push_back(new_indices); filter->indices.push_back(new_indices);
filter->fields = (LogField**) filter->fields = (Field**)
realloc(filter->fields, realloc(filter->fields,
sizeof(LogField) * ++filter->num_fields); sizeof(Field) * ++filter->num_fields);
if ( ! filter->fields ) if ( ! filter->fields )
{ {
@ -706,14 +759,14 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
return false; return false;
} }
LogField* field = new LogField(); Field* field = new Field();
field->name = new_path; field->name = new_path;
field->type = t->Tag(); field->type = t->Tag();
if ( field->type == TYPE_TABLE ) if ( field->type == TYPE_TABLE )
{ {
field->subtype = t->AsSetType()->Indices()->PureType()->Tag(); field->subtype = t->AsSetType()->Indices()->PureType()->Tag();
} }
else if ( field->type == TYPE_VECTOR ) else if ( field->type == TYPE_VECTOR )
{ {
field->subtype = t->AsVectorType()->YieldType()->Tag(); field->subtype = t->AsVectorType()->YieldType()->Tag();
} }
@ -723,7 +776,7 @@ bool LogMgr::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
return true; return true;
} }
bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
{ {
RecordType* rtype = fval->Type()->AsRecordType(); RecordType* rtype = fval->Type()->AsRecordType();
@ -819,7 +872,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
for ( int i = 0; i < filter->num_fields; i++ ) for ( int i = 0; i < filter->num_fields; i++ )
{ {
LogField* field = filter->fields[i]; Field* field = filter->fields[i];
DBG_LOG(DBG_LOGGING, " field %10s: %s", DBG_LOG(DBG_LOGGING, " field %10s: %s",
field->name.c_str(), type_name(field->type)); field->name.c_str(), type_name(field->type));
} }
@ -828,12 +881,12 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
return true; return true;
} }
bool LogMgr::RemoveFilter(EnumVal* id, StringVal* name) bool Manager::RemoveFilter(EnumVal* id, StringVal* name)
{ {
return RemoveFilter(id, name->AsString()->CheckString()); return RemoveFilter(id, name->AsString()->CheckString());
} }
bool LogMgr::RemoveFilter(EnumVal* id, string name) bool Manager::RemoveFilter(EnumVal* id, string name)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
if ( ! stream ) if ( ! stream )
@ -860,7 +913,7 @@ bool LogMgr::RemoveFilter(EnumVal* id, string name)
return true; return true;
} }
bool LogMgr::Write(EnumVal* id, RecordVal* columns) bool Manager::Write(EnumVal* id, RecordVal* columns)
{ {
bool error = false; bool error = false;
@ -980,7 +1033,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
Stream::WriterMap::iterator w = Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path)); stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path));
LogWriter* writer = 0; WriterFrontend* writer = 0;
if ( w != stream->writers.end() ) if ( w != stream->writers.end() )
// We know this writer already. // We know this writer already.
@ -990,12 +1043,12 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
{ {
// No, need to create one. // No, need to create one.
// Copy the fields for LogWriter::Init() as it will take // Copy the fields for WriterFrontend::Init() as it
// ownership. // will take ownership.
LogField** arg_fields = new LogField*[filter->num_fields]; Field** arg_fields = new Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j ) for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new LogField(*filter->fields[j]); arg_fields[j] = new Field(*filter->fields[j]);
if ( filter->remote ) if ( filter->remote )
remote_serializer->SendLogCreateWriter(stream->id, remote_serializer->SendLogCreateWriter(stream->id,
@ -1034,7 +1087,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
if ( filter->local || filter->remote ) if ( filter->local || filter->remote )
{ {
LogVal** vals = RecordToFilterVals(stream, filter, columns); Value** vals = RecordToFilterVals(stream, filter, columns);
if ( filter->remote ) if ( filter->remote )
remote_serializer->SendLogWrite(stream->id, remote_serializer->SendLogWrite(stream->id,
@ -1045,11 +1098,9 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
if ( filter->local ) if ( filter->local )
{ {
assert(writer);
// Write takes ownership of vals. // Write takes ownership of vals.
if ( ! writer->Write(filter->num_fields, vals) ) assert(writer);
error = true; writer->Write(filter->num_fields, vals);
} }
else else
@ -1072,15 +1123,15 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
return true; return true;
} }
LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty) Value* Manager::ValToLogVal(Val* val, BroType* ty)
{ {
if ( ! ty ) if ( ! ty )
ty = val->Type(); ty = val->Type();
if ( ! val ) if ( ! val )
return new LogVal(ty->Tag(), false); return new Value(ty->Tag(), false);
LogVal* lval = new LogVal(ty->Tag()); Value* lval = new Value(ty->Tag());
switch ( lval->type ) { switch ( lval->type ) {
case TYPE_BOOL: case TYPE_BOOL:
@ -1160,7 +1211,7 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty)
set = new ListVal(TYPE_INT); set = new ListVal(TYPE_INT);
lval->val.set_val.size = set->Length(); lval->val.set_val.size = set->Length();
lval->val.set_val.vals = new LogVal* [lval->val.set_val.size]; lval->val.set_val.vals = new Value* [lval->val.set_val.size];
for ( int i = 0; i < lval->val.set_val.size; i++ ) for ( int i = 0; i < lval->val.set_val.size; i++ )
lval->val.set_val.vals[i] = ValToLogVal(set->Index(i)); lval->val.set_val.vals[i] = ValToLogVal(set->Index(i));
@ -1174,7 +1225,7 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty)
VectorVal* vec = val->AsVectorVal(); VectorVal* vec = val->AsVectorVal();
lval->val.vector_val.size = vec->Size(); lval->val.vector_val.size = vec->Size();
lval->val.vector_val.vals = lval->val.vector_val.vals =
new LogVal* [lval->val.vector_val.size]; new Value* [lval->val.vector_val.size];
for ( int i = 0; i < lval->val.vector_val.size; i++ ) for ( int i = 0; i < lval->val.vector_val.size; i++ )
{ {
@ -1193,10 +1244,10 @@ LogVal* LogMgr::ValToLogVal(Val* val, BroType* ty)
return lval; return lval;
} }
LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter, Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns) RecordVal* columns)
{ {
LogVal** vals = new LogVal*[filter->num_fields]; Value** vals = new Value*[filter->num_fields];
for ( int i = 0; i < filter->num_fields; ++i ) for ( int i = 0; i < filter->num_fields; ++i )
{ {
@ -1215,7 +1266,7 @@ LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter,
if ( ! val ) if ( ! val )
{ {
// Value, or any of its parents, is not set. // Value, or any of its parents, is not set.
vals[i] = new LogVal(filter->fields[i]->type, false); vals[i] = new Value(filter->fields[i]->type, false);
break; break;
} }
} }
@ -1227,8 +1278,8 @@ LogVal** LogMgr::RecordToFilterVals(Stream* stream, Filter* filter,
return vals; return vals;
} }
LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, LogField** fields) int num_fields, Field** fields)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -1244,56 +1295,10 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
// return it. // return it.
return w->second->writer; return w->second->writer;
// Need to instantiate a new writer. WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum());
assert(writer_obj);
LogWriterDefinition* ld = log_writers; writer_obj->Init(path, num_fields, fields);
while ( true )
{
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
{
reporter->Error("unknow writer when creating writer");
return 0;
}
if ( ld->type == writer->AsEnum() )
break;
if ( ! ld->factory )
// Oops, we can't instantiate this guy.
return 0;
// If the writer has an init function, call it.
if ( ld->init )
{
if ( (*ld->init)() )
// Clear the init function so that we won't
// call it again later.
ld->init = 0;
else
// Init failed, disable by deleting factory
// function.
ld->factory = 0;
DBG_LOG(DBG_LOGGING, "failed to init writer class %s",
ld->name);
return false;
}
++ld;
}
assert(ld->factory);
LogWriter* writer_obj = (*ld->factory)();
if ( ! writer_obj->Init(path, num_fields, fields) )
{
DBG_LOG(DBG_LOGGING, "failed to init instance of writer %s",
ld->name);
return 0;
}
WriterInfo* winfo = new WriterInfo; WriterInfo* winfo = new WriterInfo;
winfo->type = writer->Ref()->AsEnumVal(); winfo->type = writer->Ref()->AsEnumVal();
@ -1338,16 +1343,17 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return writer_obj; return writer_obj;
} }
void LogMgr::DeleteVals(int num_fields, LogVal** vals) void Manager::DeleteVals(int num_fields, Value** vals)
{ {
// Note this code is duplicated in WriterBackend::DeleteVals().
for ( int i = 0; i < num_fields; i++ ) for ( int i = 0; i < num_fields; i++ )
delete vals[i]; delete vals[i];
delete [] vals; delete [] vals;
} }
bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
LogVal** vals) Value** vals)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -1357,7 +1363,7 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
#ifdef DEBUG #ifdef DEBUG
ODesc desc; ODesc desc;
id->Describe(&desc); id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()", DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()",
desc.Description()); desc.Description());
#endif #endif
DeleteVals(num_fields, vals); DeleteVals(num_fields, vals);
@ -1379,23 +1385,24 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
#ifdef DEBUG #ifdef DEBUG
ODesc desc; ODesc desc;
id->Describe(&desc); id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()", DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()",
desc.Description()); desc.Description());
#endif #endif
DeleteVals(num_fields, vals); DeleteVals(num_fields, vals);
return false; return false;
} }
bool success = (w->second ? w->second->writer->Write(num_fields, vals) : true); if ( w->second )
w->second->writer->Write(num_fields, vals);
DBG_LOG(DBG_LOGGING, DBG_LOG(DBG_LOGGING,
"Wrote pre-filtered record to path '%s' on stream '%s' [%s]", "Wrote pre-filtered record to path '%s' on stream '%s'",
path.c_str(), stream->name.c_str(), (success ? "ok" : "error")); path.c_str(), stream->name.c_str());
return success; return true;
} }
void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer) void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
{ {
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s ) for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{ {
@ -1410,7 +1417,7 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
if ( ! i->second ) if ( ! i->second )
continue; continue;
LogWriter* writer = i->second->writer; WriterFrontend* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id, remote_serializer->SendLogCreateWriter(peer, (*s)->id,
@ -1422,7 +1429,7 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
} }
} }
bool LogMgr::SetBuf(EnumVal* id, bool enabled) bool Manager::SetBuf(EnumVal* id, bool enabled)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
if ( ! stream ) if ( ! stream )
@ -1440,7 +1447,7 @@ bool LogMgr::SetBuf(EnumVal* id, bool enabled)
return true; return true;
} }
bool LogMgr::Flush(EnumVal* id) bool Manager::Flush(EnumVal* id)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
if ( ! stream ) if ( ! stream )
@ -1461,7 +1468,7 @@ bool LogMgr::Flush(EnumVal* id)
return true; return true;
} }
void LogMgr::Error(LogWriter* writer, const char* msg) void Manager::Error(WriterFrontend* writer, const char* msg)
{ {
reporter->Error("error with writer for %s: %s", reporter->Error("error with writer for %s: %s",
writer->Path().c_str(), msg); writer->Path().c_str(), msg);
@ -1470,7 +1477,7 @@ void LogMgr::Error(LogWriter* writer, const char* msg)
// Timer which on dispatching rotates the filter. // Timer which on dispatching rotates the filter.
class RotationTimer : public Timer { class RotationTimer : public Timer {
public: public:
RotationTimer(double t, LogMgr::WriterInfo* arg_winfo, bool arg_rotate) RotationTimer(double t, Manager::WriterInfo* arg_winfo, bool arg_rotate)
: Timer(t, TIMER_ROTATE) : Timer(t, TIMER_ROTATE)
{ {
winfo = arg_winfo; winfo = arg_winfo;
@ -1482,7 +1489,7 @@ public:
void Dispatch(double t, int is_expire); void Dispatch(double t, int is_expire);
protected: protected:
LogMgr::WriterInfo* winfo; Manager::WriterInfo* winfo;
bool rotate; bool rotate;
}; };
@ -1506,7 +1513,7 @@ void RotationTimer::Dispatch(double t, int is_expire)
} }
} }
void LogMgr::InstallRotationTimer(WriterInfo* winfo) void Manager::InstallRotationTimer(WriterInfo* winfo)
{ {
if ( terminating ) if ( terminating )
return; return;
@ -1548,7 +1555,7 @@ void LogMgr::InstallRotationTimer(WriterInfo* winfo)
} }
} }
void LogMgr::Rotate(WriterInfo* winfo) void Manager::Rotate(WriterInfo* winfo)
{ {
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Path().c_str(), network_time); winfo->writer->Path().c_str(), network_time);
@ -1568,7 +1575,7 @@ void LogMgr::Rotate(WriterInfo* winfo)
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating); winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
} }
bool LogMgr::FinishedRotation(LogWriter* writer, string new_name, string old_name, bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating) double open, double close, bool terminating)
{ {
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",

View file

@ -2,24 +2,28 @@
// //
// A class managing log writers and filters. // A class managing log writers and filters.
#ifndef LOGMGR_H #ifndef LOGGING_MANAGER_H
#define LOGMGR_H #define LOGGING_MANAGER_H
#include "Val.h" #include "../Val.h"
#include "EventHandler.h" #include "../EventHandler.h"
#include "RemoteSerializer.h" #include "../RemoteSerializer.h"
class SerializationFormat; class SerializationFormat;
class RemoteSerializer;
class RotationTimer;
namespace logging {
// Description of a log field. // Description of a log field.
struct LogField { struct Field {
string name; string name;
TypeTag type; TypeTag type;
// inner type of sets // inner type of sets
TypeTag subtype; TypeTag subtype;
LogField() { subtype = TYPE_VOID; } Field() { subtype = TYPE_VOID; }
LogField(const LogField& other) Field(const Field& other)
: name(other.name), type(other.type), subtype(other.subtype) { } : name(other.name), type(other.type), subtype(other.subtype) { }
// (Un-)serialize. // (Un-)serialize.
@ -28,13 +32,13 @@ struct LogField {
}; };
// Values as logged by a writer. // Values as logged by a writer.
struct LogVal { struct Value {
TypeTag type; TypeTag type;
bool present; // False for unset fields. bool present; // False for unset fields.
// The following union is a subset of BroValUnion, including only the // The following union is a subset of BroValUnion, including only the
// types we can log directly. // types we can log directly.
struct set_t { bro_int_t size; LogVal** vals; }; struct set_t { bro_int_t size; Value** vals; };
typedef set_t vec_t; typedef set_t vec_t;
union _val { union _val {
@ -48,9 +52,9 @@ struct LogVal {
vec_t vector_val; vec_t vector_val;
} val; } val;
LogVal(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) Value(TypeTag arg_type = TYPE_ERROR, bool arg_present = true)
: type(arg_type), present(arg_present) {} : type(arg_type), present(arg_present) {}
~LogVal(); ~Value();
// (Un-)serialize. // (Un-)serialize.
bool Read(SerializationFormat* fmt); bool Read(SerializationFormat* fmt);
@ -61,17 +65,17 @@ struct LogVal {
static bool IsCompatibleType(BroType* t, bool atomic_only=false); static bool IsCompatibleType(BroType* t, bool atomic_only=false);
private: private:
LogVal(const LogVal& other) { } Value(const Value& other) { }
}; };
class LogWriter; class WriterBackend;
class RemoteSerializer; class WriterFrontend;
class RotationTimer; class RotationFinishedMessage;
class LogMgr { class Manager {
public: public:
LogMgr(); Manager();
~LogMgr(); ~Manager();
// These correspond to the BiFs visible on the scripting layer. The // These correspond to the BiFs visible on the scripting layer. The
// actual BiFs just forward here. // actual BiFs just forward here.
@ -86,19 +90,24 @@ public:
bool Flush(EnumVal* id); // Flushes all writers.. bool Flush(EnumVal* id); // Flushes all writers..
protected: protected:
friend class LogWriter; friend class WriterFrontend;
friend class RemoteSerializer; friend class RotationFinishedMessage;
friend class RotationTimer; friend class ::RemoteSerializer;
friend class ::RotationTimer;
// Instantiates a new WriterBackend of the given type (note that
// doing so creates a new thread!).
WriterBackend* CreateBackend(bro_int_t type);
//// Function also used by the RemoteSerializer. //// Function also used by the RemoteSerializer.
// Takes ownership of fields. // Takes ownership of fields.
LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, LogField** fields); int num_fields, Field** fields);
// Takes ownership of values.. // Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path, bool Write(EnumVal* id, EnumVal* writer, string path,
int num_fields, LogVal** vals); int num_fields, Value** vals);
// Announces all instantiated writers to peer. // Announces all instantiated writers to peer.
void SendAllWritersTo(RemoteSerializer::PeerID peer); void SendAllWritersTo(RemoteSerializer::PeerID peer);
@ -106,14 +115,14 @@ protected:
//// Functions safe to use by writers. //// Functions safe to use by writers.
// Signals that a file has been rotated. // Signals that a file has been rotated.
bool FinishedRotation(LogWriter* writer, string new_name, string old_name, bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating); double open, double close, bool terminating);
// Reports an error for the given writer. // Reports an error for the given writer.
void Error(LogWriter* writer, const char* msg); void Error(WriterFrontend* writer, const char* msg);
// Deletes the values as passed into Write(). // Deletes the values as passed into Write().
void DeleteVals(int num_fields, LogVal** vals); void DeleteVals(int num_fields, Value** vals);
private: private:
struct Filter; struct Filter;
@ -123,20 +132,22 @@ private:
bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices); TableVal* include, TableVal* exclude, string path, list<int> indices);
LogVal** RecordToFilterVals(Stream* stream, Filter* filter, Value** RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns); RecordVal* columns);
LogVal* ValToLogVal(Val* val, BroType* ty = 0); Value* ValToLogVal(Val* val, BroType* ty = 0);
Stream* FindStream(EnumVal* id); Stream* FindStream(EnumVal* id);
void RemoveDisabledWriters(Stream* stream); void RemoveDisabledWriters(Stream* stream);
void InstallRotationTimer(WriterInfo* winfo); void InstallRotationTimer(WriterInfo* winfo);
void Rotate(WriterInfo* info); void Rotate(WriterInfo* info);
Filter* FindFilter(EnumVal* id, StringVal* filter); Filter* FindFilter(EnumVal* id, StringVal* filter);
WriterInfo* FindWriter(LogWriter* writer); WriterInfo* FindWriter(WriterFrontend* writer);
vector<Stream *> streams; // Indexed by stream enum. vector<Stream *> streams; // Indexed by stream enum.
}; };
extern LogMgr* log_mgr; }
extern logging::Manager* log_mgr;
#endif #endif

View file

@ -0,0 +1,161 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "util.h"
#include "WriterBackend.h"
#include "WriterFrontend.h"
// Messages sent from backend to frontend (i.e., "OutputMessages").
namespace logging {
class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
new_name(new_name), old_name(old_name), open(open),
close(close), terminating(terminating) { }
virtual bool Process()
{
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
}
private:
string new_name;
string old_name;
double open;
double close;
bool terminating;
};
class DisableMessage : public threading::OutputMessage<WriterFrontend>
{
public:
DisableMessage(WriterFrontend* writer)
: threading::OutputMessage<WriterFrontend>("Disable", writer) {}
virtual bool Process() { Object()->SetDisable(); return true; }
};
}
// Backend methods.
using namespace logging;
WriterBackend::WriterBackend(const string& name) : MsgThread(name)
{
path = "<not set>";
num_fields = 0;
fields = 0;
buffering = true;
}
WriterBackend::~WriterBackend()
{
if ( fields )
{
for(int i = 0; i < num_fields; ++i)
delete fields[i];
delete [] fields;
}
}
void WriterBackend::DeleteVals(Value** vals)
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}
bool WriterBackend::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating)
{
SendOut(new RotationFinishedMessage(writer, new_name, old_name, open, close, terminating));
return true;
}
bool WriterBackend::Init(string arg_path, int arg_num_fields,
const Field* const * arg_fields)
{
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
return false;
return true;
}
bool WriterBackend::Write(int arg_num_fields, Value** vals)
{
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
if ( num_fields != arg_num_fields )
{
#ifdef DEBUG
const char* msg = Fmt("Number of fields don't match in WriterBackend::Write() (%d vs. %d)",
arg_num_fields, num_fields);
Debug(DBG_LOGGING, msg);
#endif
DeleteVals(vals);
return false;
}
for ( int i = 0; i < num_fields; ++i )
{
if ( vals[i]->type != fields[i]->type )
{
#ifdef DEBUG
const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)",
vals[i]->type, fields[i]->type);
Debug(DBG_LOGGING, msg);
#endif
DeleteVals(vals);
return false;
}
}
bool result = DoWrite(num_fields, fields, vals);
DeleteVals(vals);
return result;
}
bool WriterBackend::SetBuf(bool enabled)
{
if ( enabled == buffering )
// No change.
return true;
buffering = enabled;
return DoSetBuf(enabled);
}
bool WriterBackend::Rotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating)
{
return DoRotate(writer, rotated_path, open, close, terminating);
}
bool WriterBackend::Flush()
{
return DoFlush();
}
bool WriterBackend::Finish()
{
return DoFinish();
}

View file

@ -1,32 +1,22 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
// //
// Interface API for a log writer backend. The LogMgr creates a separate // Bridge class between main process and writer threads.
// writer instance of pair of (writer type, output path).
//
// Note thay classes derived from LogWriter must be fully thread-safe and not
// use any non-thread-safe Bro functionality (which includes almost
// everything ...). In particular, do not use fmt() but LogWriter::Fmt()!.
//
// The one exception to this rule is the constructor: it is guaranteed to be
// executed inside the main thread and can thus in particular access global
// script variables.
#ifndef LOGWRITER_H #ifndef LOGGING_WRITERBACKEND_H
#define LOGWRITER_H #define LOGGING_WRITERBACKEND_H
#include "LogMgr.h" #include "Manager.h"
#include "BroString.h"
class LogWriter { #include "threading/MsgThread.h"
namespace logging {
// The backend runs in its own thread, separate from the main process.
class WriterBackend : public threading::MsgThread
{
public: public:
LogWriter(); WriterBackend(const string& name);
virtual ~LogWriter(); virtual ~WriterBackend();
//// Interface methods to interact with the writer. Note that these
//// methods are not necessarily thread-safe and must be called only
//// from the main thread (which will typically mean only from the
//// LogMgr). In particular, they must not be called from the
//// writer's derived implementation.
// One-time initialization of the writer to define the logged fields. // One-time initialization of the writer to define the logged fields.
// Interpretation of "path" is left to the writer, and will be // Interpretation of "path" is left to the writer, and will be
@ -37,18 +27,18 @@ public:
// //
// The new instance takes ownership of "fields", and will delete them // The new instance takes ownership of "fields", and will delete them
// when done. // when done.
bool Init(string path, int num_fields, const LogField* const * fields); bool Init(string path, int num_fields, const Field* const * fields);
// Writes one log entry. The method takes ownership of "vals" and // Writes one log entry. The method takes ownership of "vals" and
// will return immediately after queueing the write request, which is // will return immediately after queueing the write request, which is
// potentially before output has actually been written out. // potentially before output has actually been written out.
// //
// num_fields and the types of the LogVals must match what was passed // num_fields and the types of the Values must match what was passed
// to Init(). // to Init().
// //
// Returns false if an error occured, in which case the writer must // Returns false if an error occured, in which case the writer must
// not be used any further. // not be used any further.
bool Write(int num_fields, LogVal** vals); bool Write(int num_fields, Value** vals);
// Sets the buffering status for the writer, if the writer supports // Sets the buffering status for the writer, if the writer supports
// that. (If not, it will be ignored). // that. (If not, it will be ignored).
@ -60,12 +50,12 @@ public:
// Triggers rotation, if the writer supports that. (If not, it will // Triggers rotation, if the writer supports that. (If not, it will
// be ignored). // be ignored).
bool Rotate(string rotated_path, double open, double close, bool terminating); bool Rotate(WriterFrontend* writer, string rotated_path, double open, double close, bool terminating);
// Finishes writing to this logger regularly. Must not be called if // Finishes writing to this logger regularly. Must not be called if
// an error has been indicated earlier. After calling this, no // an error has been indicated earlier. After calling this, no
// further writing must be performed. // further writing must be performed.
void Finish(); bool Finish();
//// Thread-safe methods that may be called from the writer //// Thread-safe methods that may be called from the writer
//// implementation. //// implementation.
@ -73,24 +63,43 @@ public:
// The following methods return the information as passed to Init(). // The following methods return the information as passed to Init().
const string Path() const { return path; } const string Path() const { return path; }
int NumFields() const { return num_fields; } int NumFields() const { return num_fields; }
const LogField* const * Fields() const { return fields; } const Field* const * Fields() const { return fields; }
// Returns the current buffering state.
bool IsBuf() { return buffering; }
// Signals to the log manager that a file has been rotated.
//
// writer: The frontend writer that triggered the rotation. This must
// be the value passed into DoRotate().
//
// new_name: The filename of the rotated file. old_name: The filename
// of the origina file.
//
// open/close: The timestamps when the original file was opened and
// closed, respectively.
//
// terminating: True if rotation request occured due to the main Bro
// process shutting down.
bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating);
protected: protected:
// Methods for writers to override. If any of these returs false, it // Methods for writers to override. If any of these returs false, it
// will be assumed that a fatal error has occured that prevents the // will be assumed that a fatal error has occured that prevents the
// writer from further operation. It will then be disabled and // writer from further operation. It will then be disabled and
// deleted. When return false, the writer should also report the // deleted. When returning false, the writer should also report the
// error via Error(). Note that even if a writer does not support the // error via Error(). Note that even if a writer does not support the
// functionality for one these methods (like rotation), it must still // functionality for one these methods (like rotation), it must still
// return true if that is not to be considered a fatal error. // return true if that is not to be considered a fatal error.
// //
// Called once for initialization of the writer. // Called once for initialization of the writer.
virtual bool DoInit(string path, int num_fields, virtual bool DoInit(string path, int num_fields,
const LogField* const * fields) = 0; const Field* const * fields) = 0;
// Called once per log entry to record. // Called once per log entry to record.
virtual bool DoWrite(int num_fields, const LogField* const * fields, virtual bool DoWrite(int num_fields, const Field* const * fields,
LogVal** vals) = 0; Value** vals) = 0;
// Called when the buffering status for this writer is changed. If // Called when the buffering status for this writer is changed. If
// buffering is disabled, the writer should attempt to write out // buffering is disabled, the writer should attempt to write out
@ -119,6 +128,11 @@ protected:
// RotationDone() to signal the log manager that potential // RotationDone() to signal the log manager that potential
// postprocessors can now run. // postprocessors can now run.
// //
// "writer" is the frontend writer that triggered the rotation. The
// *only* purpose of this value is to be passed into
// FinishedRotation() once done. You must not otherwise access the
// frontend, it's running in a different thread.
//
// "rotate_path" reflects the path to where the rotated output is to // "rotate_path" reflects the path to where the rotated output is to
// be moved, with specifics depending on the writer. It should // be moved, with specifics depending on the writer. It should
// generally be interpreted in a way consistent with that of "path" // generally be interpreted in a way consistent with that of "path"
@ -135,52 +149,31 @@ protected:
// //
// A writer may ignore rotation requests if it doesn't fit with its // A writer may ignore rotation requests if it doesn't fit with its
// semantics (but must still return true in that case). // semantics (but must still return true in that case).
virtual bool DoRotate(string rotated_path, double open, double close, virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
bool terminating) = 0; double open, double close, bool terminating) = 0;
// Called once on termination. Not called when any of the other // Called once on termination. Not called when any of the other
// methods has previously signaled an error, i.e., executing this // methods has previously signaled an error, i.e., executing this
// method signals a regular shutdown of the writer. // method signals a regular shutdown of the writer.
virtual void DoFinish() = 0; virtual bool DoFinish() = 0;
//// Methods for writers to use. These are thread-safe. // Triggered by regular heartbeat messages from the main process.
virtual bool DoHeartbeat(double network_time, double current_time) { return true; };
// A thread-safe version of fmt().
const char* Fmt(const char* format, ...);
// Returns the current buffering state.
bool IsBuf() { return buffering; }
// Reports an error to the user.
void Error(const char *msg);
// Signals to the log manager that a file has been rotated.
//
// new_name: The filename of the rotated file. old_name: The filename
// of the origina file.
//
// open/close: The timestamps when the original file was opened and
// closed, respectively.
//
// terminating: True if rotation request occured due to the main Bro
// process shutting down.
bool FinishedRotation(string new_name, string old_name, double open,
double close, bool terminating);
private: private:
friend class LogMgr; friend class Manager;
// When an error occurs, we call this method to set a flag marking // When an error occurs, we call this method to set a flag marking
// the writer as disabled. The LogMgr will check the flag later and // the writer as disabled. The Manager will check the flag later and
// remove the writer. // remove the writer.
bool Disabled() { return disabled; } bool Disabled() { return disabled; }
// Deletes the values as passed into Write(). // Deletes the values as passed into Write().
void DeleteVals(LogVal** vals); void DeleteVals(Value** vals);
string path; string path;
int num_fields; int num_fields;
const LogField* const * fields; const Field* const * fields;
bool buffering; bool buffering;
bool disabled; bool disabled;
@ -189,4 +182,8 @@ private:
unsigned int buf_len; unsigned int buf_len;
}; };
}
#endif #endif

View file

@ -0,0 +1,175 @@
#include "WriterFrontend.h"
#include "WriterBackend.h"
namespace logging {
// Messages sent from frontend to backend (i.e., "InputMessages").
class InitMessage : public threading::InputMessage<WriterBackend>
{
public:
InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const *fields)
: threading::InputMessage<WriterBackend>("Init", backend),
path(path), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(path, num_fields, fields); }
private:
const string path;
const int num_fields;
const Field * const* fields;
};
class RotateMessage : public threading::InputMessage<WriterBackend>
{
public:
RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open,
const double close, const bool terminating)
: threading::InputMessage<WriterBackend>("Rotate", backend),
frontend(frontend),
rotated_path(rotated_path), open(open),
close(close), terminating(terminating) { }
virtual bool Process() { return Object()->Rotate(frontend, rotated_path, open, close, terminating); }
private:
WriterFrontend* frontend;
const string rotated_path;
const double open;
const double close;
const bool terminating;
};
class WriteMessage : public threading::InputMessage<WriterBackend>
{
public:
WriteMessage(WriterBackend* backend, const int num_fields, Value **vals)
: threading::InputMessage<WriterBackend>("Write", backend),
num_fields(num_fields), fields(fields), vals(vals) {}
virtual bool Process() { return Object()->Write(num_fields, vals); }
private:
int num_fields;
Field* const* fields;
Value **vals;
};
class SetBufMessage : public threading::InputMessage<WriterBackend>
{
public:
SetBufMessage(WriterBackend* backend, const bool enabled)
: threading::InputMessage<WriterBackend>("SetBuf", backend),
enabled(enabled) { }
virtual bool Process() { return Object()->SetBuf(enabled); }
private:
const bool enabled;
};
class FlushMessage : public threading::InputMessage<WriterBackend>
{
public:
FlushMessage(WriterBackend* backend)
: threading::InputMessage<WriterBackend>("Flush", backend) {}
virtual bool Process() { return Object()->Flush(); }
};
class FinishMessage : public threading::InputMessage<WriterBackend>
{
public:
FinishMessage(WriterBackend* backend)
: threading::InputMessage<WriterBackend>("Finish", backend) {}
virtual bool Process() { return Object()->Finish(); }
};
}
// Frontend methods.
using namespace logging;
WriterFrontend::WriterFrontend(bro_int_t type)
{
disabled = initialized = false;
backend = log_mgr->CreateBackend(type);
assert(backend);
backend->Start();
}
WriterFrontend::~WriterFrontend()
{
}
void WriterFrontend::Stop()
{
SetDisable();
backend->Stop();
}
void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields)
{
if ( disabled )
return;
if ( initialized )
reporter->InternalError("writer initialize twice");
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
}
void WriterFrontend::Write(int num_fields, Value** vals)
{
if ( disabled )
return;
backend->SendIn(new WriteMessage(backend, num_fields, vals));
}
void WriterFrontend::SetBuf(bool enabled)
{
if ( disabled )
return;
backend->SendIn(new SetBufMessage(backend, enabled));
}
void WriterFrontend::Flush()
{
if ( disabled )
return;
backend->SendIn(new FlushMessage(backend));
}
void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating)
{
if ( disabled )
return;
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
}
void WriterFrontend::Finish()
{
if ( disabled )
return;
backend->SendIn(new FinishMessage(backend));
}

View file

@ -0,0 +1,66 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Bridge class between main process and writer threads.
#ifndef LOGGING_WRITERFRONTEND_H
#define LOGGING_WRITERFRONTEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
namespace logging {
class WriterBackend;
class WriterFrontend {
public:
WriterFrontend(bro_int_t type);
virtual ~WriterFrontend();
// Disables the writers and stop the backend thread.
void Stop();
// Interface methods to interact with the writer from the main thread
// (and only from the main thread), typicalli from the log manager.
// All these methods forward (via inter-thread messaging) to the
// corresponding methods of an internally created WriterBackend. See
// there for documentation.
//
// If any of these operations fails, the writer will be automatically
// (but asynchronoulsy) disabled.
void Init(string path, int num_fields, const Field* const * fields);
void Write(int num_fields, Value** vals);
void SetBuf(bool enabled);
void Flush();
void Rotate(string rotated_path, double open, double close, bool terminating);
void Finish();
// Calling this disable the writer. All methods calls will be no-ops
// from now on. The Manager will eventually remove disabled writers.
void SetDisable() { disabled = true; }
bool Disabled() { return disabled; }
const string Path() const { return path; }
int NumFields() const { return num_fields; }
const Field* const * Fields() const { return fields; }
protected:
friend class Manager;
WriterBackend* backend;
bool disabled;
bool initialized;
string path;
int num_fields;
const Field* const * fields;
};
}
#endif

View file

@ -3,10 +3,14 @@
#include <string> #include <string>
#include <errno.h> #include <errno.h>
#include "LogWriterAscii.h" #include "../../NetVar.h"
#include "NetVar.h"
LogWriterAscii::LogWriterAscii() #include "Ascii.h"
using namespace logging;
using namespace writer;
Ascii::Ascii() : WriterBackend("Ascii")
{ {
file = 0; file = 0;
@ -42,7 +46,7 @@ LogWriterAscii::LogWriterAscii()
desc.AddEscapeSequence(separator, separator_len); desc.AddEscapeSequence(separator, separator_len);
} }
LogWriterAscii::~LogWriterAscii() Ascii::~Ascii()
{ {
if ( file ) if ( file )
fclose(file); fclose(file);
@ -54,7 +58,7 @@ LogWriterAscii::~LogWriterAscii()
delete [] header_prefix; delete [] header_prefix;
} }
bool LogWriterAscii::WriteHeaderField(const string& key, const string& val) bool Ascii::WriteHeaderField(const string& key, const string& val)
{ {
string str = string(header_prefix, header_prefix_len) + string str = string(header_prefix, header_prefix_len) +
key + string(separator, separator_len) + val + "\n"; key + string(separator, separator_len) + val + "\n";
@ -62,8 +66,8 @@ bool LogWriterAscii::WriteHeaderField(const string& key, const string& val)
return (fwrite(str.c_str(), str.length(), 1, file) == 1); return (fwrite(str.c_str(), str.length(), 1, file) == 1);
} }
bool LogWriterAscii::DoInit(string path, int num_fields, bool Ascii::DoInit(string path, int num_fields,
const LogField* const * fields) const Field* const * fields)
{ {
if ( output_to_stdout ) if ( output_to_stdout )
path = "/dev/stdout"; path = "/dev/stdout";
@ -108,7 +112,7 @@ bool LogWriterAscii::DoInit(string path, int num_fields,
types += string(separator, separator_len); types += string(separator, separator_len);
} }
const LogField* field = fields[i]; const Field* field = fields[i];
names += field->name; names += field->name;
types += type_name(field->type); types += type_name(field->type);
if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) ) if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) )
@ -131,17 +135,18 @@ write_error:
return false; return false;
} }
bool LogWriterAscii::DoFlush() bool Ascii::DoFlush()
{ {
fflush(file); fflush(file);
return true; return true;
} }
void LogWriterAscii::DoFinish() bool Ascii::DoFinish()
{ {
return true;
} }
bool LogWriterAscii::DoWriteOne(ODesc* desc, LogVal* val, const LogField* field) bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
{ {
if ( ! val->present ) if ( ! val->present )
{ {
@ -281,8 +286,8 @@ bool LogWriterAscii::DoWriteOne(ODesc* desc, LogVal* val, const LogField* field)
return true; return true;
} }
bool LogWriterAscii::DoWrite(int num_fields, const LogField* const * fields, bool Ascii::DoWrite(int num_fields, const Field* const * fields,
LogVal** vals) Value** vals)
{ {
if ( ! file ) if ( ! file )
DoInit(Path(), NumFields(), Fields()); DoInit(Path(), NumFields(), Fields());
@ -312,8 +317,8 @@ bool LogWriterAscii::DoWrite(int num_fields, const LogField* const * fields,
return true; return true;
} }
bool LogWriterAscii::DoRotate(string rotated_path, double open, bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open,
double close, bool terminating) double close, bool terminating)
{ {
// Don't rotate special files or if there's not one currently open. // Don't rotate special files or if there's not one currently open.
if ( ! file || IsSpecial(Path()) ) if ( ! file || IsSpecial(Path()) )
@ -325,7 +330,7 @@ bool LogWriterAscii::DoRotate(string rotated_path, double open,
string nname = rotated_path + "." + LogExt(); string nname = rotated_path + "." + LogExt();
rename(fname.c_str(), nname.c_str()); rename(fname.c_str(), nname.c_str());
if ( ! FinishedRotation(nname, fname, open, close, terminating) ) if ( ! FinishedRotation(writer, nname, fname, open, close, terminating) )
{ {
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false; return false;
@ -334,13 +339,13 @@ bool LogWriterAscii::DoRotate(string rotated_path, double open,
return true; return true;
} }
bool LogWriterAscii::DoSetBuf(bool enabled) bool Ascii::DoSetBuf(bool enabled)
{ {
// Nothing to do. // Nothing to do.
return true; return true;
} }
string LogWriterAscii::LogExt() string Ascii::LogExt()
{ {
const char* ext = getenv("BRO_LOG_SUFFIX"); const char* ext = getenv("BRO_LOG_SUFFIX");
if ( ! ext ) ext = "log"; if ( ! ext ) ext = "log";

View file

@ -2,33 +2,35 @@
// //
// Log writer for delimiter-separated ASCII logs. // Log writer for delimiter-separated ASCII logs.
#ifndef LOGWRITERASCII_H #ifndef LOGGING_WRITER_ASCII_H
#define LOGWRITERASCII_H #define LOGGING_WRITER_ASCII_H
#include "LogWriter.h" #include "../WriterBackend.h"
class LogWriterAscii : public LogWriter { namespace logging { namespace writer {
class Ascii : public WriterBackend {
public: public:
LogWriterAscii(); Ascii();
~LogWriterAscii(); ~Ascii();
static LogWriter* Instantiate() { return new LogWriterAscii; } static WriterBackend* Instantiate() { return new Ascii; }
static string LogExt(); static string LogExt();
protected: protected:
virtual bool DoInit(string path, int num_fields, virtual bool DoInit(string path, int num_fields,
const LogField* const * fields); const Field* const * fields);
virtual bool DoWrite(int num_fields, const LogField* const * fields, virtual bool DoWrite(int num_fields, const Field* const * fields,
LogVal** vals); Value** vals);
virtual bool DoSetBuf(bool enabled); virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(string rotated_path, double open, double close, virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
bool terminating); double open, double close, bool terminating);
virtual bool DoFlush(); virtual bool DoFlush();
virtual void DoFinish(); virtual bool DoFinish();
private: private:
bool IsSpecial(string path) { return path.find("/dev/") == 0; } bool IsSpecial(string path) { return path.find("/dev/") == 0; }
bool DoWriteOne(ODesc* desc, LogVal* val, const LogField* field); bool DoWriteOne(ODesc* desc, Value* val, const Field* field);
bool WriteHeaderField(const string& key, const string& value); bool WriteHeaderField(const string& key, const string& value);
FILE* file; FILE* file;
@ -55,4 +57,8 @@ private:
int header_prefix_len; int header_prefix_len;
}; };
}
}
#endif #endif

View file

@ -0,0 +1,19 @@
#include "None.h"
using namespace logging;
using namespace writer;
bool None::DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating)
{
if ( ! FinishedRotation(writer, string("/dev/null"), Path(), open, close, terminating))
{
Error(Fmt("error rotating %s", Path().c_str()));
return false;
}
return true;
}

View file

@ -0,0 +1,35 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Dummy log writer that just discards everything (but still pretends to rotate).
#ifndef LOGGING_WRITER_NONE_H
#define LOGGING_WRITER_NONE_H
#include "../WriterBackend.h"
namespace logging { namespace writer {
class None : public WriterBackend {
public:
None() : WriterBackend("None") {}
~None() {};
static WriterBackend* Instantiate() { return new None; }
protected:
virtual bool DoInit(string path, int num_fields,
const Field* const * fields) { return true; }
virtual bool DoWrite(int num_fields, const Field* const * fields,
Value** vals) { return true; }
virtual bool DoSetBuf(bool enabled) { return true; }
virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating);
virtual bool DoFlush() { return true; }
virtual bool DoFinish() { return true; }
};
}
}
#endif

View file

@ -29,7 +29,6 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void);
#include "Event.h" #include "Event.h"
#include "File.h" #include "File.h"
#include "Reporter.h" #include "Reporter.h"
#include "LogMgr.h"
#include "Net.h" #include "Net.h"
#include "NetVar.h" #include "NetVar.h"
#include "Var.h" #include "Var.h"
@ -48,7 +47,10 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void);
#include "DPM.h" #include "DPM.h"
#include "BroDoc.h" #include "BroDoc.h"
#include "Brofiler.h" #include "Brofiler.h"
#include "LogWriterAscii.h"
#include "threading/Manager.h"
#include "logging/Manager.h"
#include "logging/writers/Ascii.h"
#include "binpac_bro.h" #include "binpac_bro.h"
@ -75,7 +77,8 @@ char* writefile = 0;
name_list prefixes; name_list prefixes;
DNS_Mgr* dns_mgr; DNS_Mgr* dns_mgr;
TimerMgr* timer_mgr; TimerMgr* timer_mgr;
LogMgr* log_mgr; logging::Manager* log_mgr = 0;
threading::Manager* thread_mgr = 0;
Stmt* stmts; Stmt* stmts;
EventHandlerPtr net_done = 0; EventHandlerPtr net_done = 0;
RuleMatcher* rule_matcher = 0; RuleMatcher* rule_matcher = 0;
@ -197,7 +200,7 @@ void usage()
fprintf(stderr, " $BRO_PREFIXES | prefix list (%s)\n", bro_prefixes()); fprintf(stderr, " $BRO_PREFIXES | prefix list (%s)\n", bro_prefixes());
fprintf(stderr, " $BRO_DNS_FAKE | disable DNS lookups (%s)\n", bro_dns_fake()); fprintf(stderr, " $BRO_DNS_FAKE | disable DNS lookups (%s)\n", bro_dns_fake());
fprintf(stderr, " $BRO_SEED_FILE | file to load seeds from (not set)\n"); fprintf(stderr, " $BRO_SEED_FILE | file to load seeds from (not set)\n");
fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", LogWriterAscii::LogExt().c_str()); fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", logging::writer::Ascii::LogExt().c_str());
fprintf(stderr, " $BRO_PROFILER_FILE | Output file for script execution statistics (not set)\n"); fprintf(stderr, " $BRO_PROFILER_FILE | Output file for script execution statistics (not set)\n");
exit(1); exit(1);
@ -287,6 +290,8 @@ void terminate_bro()
if ( remote_serializer ) if ( remote_serializer )
remote_serializer->LogStats(); remote_serializer->LogStats();
thread_mgr->Terminate();
delete timer_mgr; delete timer_mgr;
delete dns_mgr; delete dns_mgr;
delete persistence_serializer; delete persistence_serializer;
@ -299,6 +304,7 @@ void terminate_bro()
delete remote_serializer; delete remote_serializer;
delete dpm; delete dpm;
delete log_mgr; delete log_mgr;
delete thread_mgr;
delete reporter; delete reporter;
} }
@ -661,7 +667,9 @@ int main(int argc, char** argv)
set_processing_status("INITIALIZING", "main"); set_processing_status("INITIALIZING", "main");
bro_start_time = current_time(true); bro_start_time = current_time(true);
reporter = new Reporter(); reporter = new Reporter();
thread_mgr = new threading::Manager();
#ifdef DEBUG #ifdef DEBUG
if ( debug_streams ) if ( debug_streams )
@ -727,7 +735,7 @@ int main(int argc, char** argv)
persistence_serializer = new PersistenceSerializer(); persistence_serializer = new PersistenceSerializer();
remote_serializer = new RemoteSerializer(); remote_serializer = new RemoteSerializer();
event_registry = new EventRegistry(); event_registry = new EventRegistry();
log_mgr = new LogMgr(); log_mgr = new logging::Manager();
if ( events_file ) if ( events_file )
event_player = new EventPlayer(events_file); event_player = new EventPlayer(events_file);
@ -1001,6 +1009,8 @@ int main(int argc, char** argv)
have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; have_pending_timers = ! reading_traces && timer_mgr->Size() > 0;
io_sources.Register(thread_mgr, true);
if ( io_sources.Size() > 0 || have_pending_timers ) if ( io_sources.Size() > 0 || have_pending_timers )
{ {
if ( profiling_logger ) if ( profiling_logger )

View file

@ -0,0 +1,129 @@
#include <sys/signal.h>
#include <signal.h>
#include "BasicThread.h"
#include "Manager.h"
using namespace threading;
BasicThread::BasicThread(const string& arg_name)
{
started = false;
terminating = false;
pthread = 0;
buf = 0;
buf_len = 1024;
char tmp[128];
snprintf(tmp, sizeof(tmp), "%s@%p", arg_name.c_str(), this);
name = string(tmp);
thread_mgr->AddThread(this);
}
BasicThread::~BasicThread()
{
}
const char* BasicThread::Fmt(const char* format, ...)
{
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al;
va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) realloc(buf, buf_len);
// Is it portable to restart?
va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al);
}
return buf;
}
void BasicThread::Start()
{
if ( sem_init(&terminate, 0, 0) != 0 )
reporter->FatalError("Cannot create terminate semaphore for thread %s", name.c_str());
if ( pthread_create(&pthread, 0, BasicThread::launcher, this) != 0 )
reporter->FatalError("Cannot create thread %s", name.c_str());
DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str());
started = true;
OnStart();
}
void BasicThread::Stop()
{
if ( ! started )
return;
if ( terminating )
return;
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str());
// Signal that it's ok for the thread to exit now.
if ( sem_post(&terminate) != 0 )
reporter->FatalError("Failure flagging terminate condition for thread %s", name.c_str());
terminating = true;
OnStop();
}
void BasicThread::Join()
{
if ( ! started )
return;
if ( ! terminating )
Stop();
DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str());
if ( pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name.c_str());
sem_destroy(&terminate);
DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str());
pthread = 0;
}
void* BasicThread::launcher(void *arg)
{
BasicThread* thread = (BasicThread *)arg;
// Block signals in thread. We handle signals only in the main
// process.
sigset_t mask_set;
sigfillset(&mask_set);
int res = pthread_sigmask(SIG_BLOCK, &mask_set, 0);
assert(res == 0); //
// Run thread's main function.
thread->Run();
// Wait until somebody actually wants us to terminate.
if ( sem_wait(&thread->terminate) != 0 )
reporter->FatalError("Failure flagging terminate condition for thread %s", thread->Name().c_str());
return 0;
}

View file

@ -0,0 +1,63 @@
#ifndef THREADING_BASICTHREAD_H
#define THREADING_BASICTHREAD_H
#include <pthread.h>
#include <semaphore.h>
#include "Queue.h"
#include "util.h"
using namespace std;
namespace threading {
class Manager;
class BasicThread
{
public:
BasicThread(const string& name); // Managed by manager, must not delete otherwise.
virtual ~BasicThread();
const string& Name() const { return name; }
void Start(); // Spawns the thread and enters Run().
void Stop(); // Signals the thread to terminate.
bool Terminating() const { return terminating; }
// A thread-safe version of fmt().
const char* Fmt(const char* format, ...);
protected:
virtual void Run() = 0;
virtual void OnStart() {}
virtual void OnStop() {}
private:
friend class Manager;
static void* launcher(void *arg);
// Used from the ThreadMgr.
void Join(); // Waits until the thread has terminated and then joins it.
bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination.
string name;
pthread_t pthread;
sem_t terminate;
// For implementing Fmt().
char* buf;
unsigned int buf_len;
};
}
extern threading::Manager* thread_mgr;
#endif

104
src/threading/Manager.cc Normal file
View file

@ -0,0 +1,104 @@
#include "Manager.h"
using namespace threading;
Manager::Manager()
{
DBG_LOG(DBG_THREADING, "Creating thread manager ...");
did_process = false;
next_beat = 0;
}
Manager::~Manager()
{
if ( all_threads.size() )
Terminate();
}
void Manager::Terminate()
{
DBG_LOG(DBG_THREADING, "Terminating thread manager ...");
// First process remaining thread output for the message threads.
do Process(); while ( did_process );
// Signal all to stop.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->Stop();
// Then join them all.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
(*i)->Join();
delete *i;
}
all_threads.clear();
msg_threads.clear();
}
void Manager::AddThread(BasicThread* thread)
{
DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name().c_str());
all_threads.push_back(thread);
}
void Manager::AddMsgThread(MsgThread* thread)
{
DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name().c_str());
msg_threads.push_back(thread);
}
void Manager::GetFds(int* read, int* write, int* except)
{
}
double Manager::NextTimestamp(double* network_time)
{
if ( did_process || ! next_beat == 0 )
// If we had something to process last time (or haven't had a
// chance to check yet), we want to check for more asap.
return timer_mgr->Time();
// Else we assume we don't have much to do at all and wait for the next heart beat.
return next_beat;
}
void Manager::Process()
{
bool do_beat = (next_beat == 0 || network_time >= next_beat);
did_process = false;
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
{
MsgThread* t = *i;
if ( do_beat )
t->Heartbeat();
if ( ! t->HasOut() )
continue;
Message* msg = t->RetrieveOut();
if ( msg->Process() )
did_process = true;
else
{
string s = msg->Name() + " failed, terminating thread";
reporter->Error(s.c_str());
t->Stop();
}
delete msg;
}
if ( do_beat )
next_beat = network_time + HEART_BEAT_INTERVAL;
}

52
src/threading/Manager.h Normal file
View file

@ -0,0 +1,52 @@
#ifndef THREADING_MANAGER_H
#define THREADING_MANAGER_H
#include <list>
#include "IOSource.h"
#include "BasicThread.h"
#include "MsgThread.h"
namespace threading {
class Manager : public IOSource
{
public:
Manager();
~Manager();
void Terminate();
protected:
friend class BasicThread;
friend class MsgThread;
void AddThread(BasicThread* thread);
void AddMsgThread(MsgThread* thread);
// IOSource interface.
virtual void GetFds(int* read, int* write, int* except);
virtual double NextTimestamp(double* network_time);
virtual void Process();
virtual const char* Tag() { return "threading::Manager"; }
private:
static const int HEART_BEAT_INTERVAL = 1;
typedef std::list<BasicThread*> all_thread_list;
all_thread_list all_threads;
typedef std::list<MsgThread*> msg_thread_list;
msg_thread_list msg_threads;
bool did_process;
double next_beat;
};
}
extern threading::Manager* thread_mgr;
#endif

285
src/threading/MsgThread.cc Normal file
View file

@ -0,0 +1,285 @@
#include "DebugLogger.h"
#include "MsgThread.h"
#include "Manager.h"
using namespace threading;
static void strreplace(const string& s, const string& o, const string& n)
{
string r = s;
while ( true )
{
size_t i = r.find(o);
if ( i == std::string::npos )
break;
r.replace(i, o.size(), n);
}
}
namespace threading {
// Standard messages.
class TerminateMessage : public InputMessage<MsgThread>
{
public:
TerminateMessage(MsgThread* thread) : InputMessage("Terminate", thread) { }
virtual bool Process() { return true; }
};
class ReporterMessage : public OutputMessage<MsgThread>
{
public:
enum Type {
INFO, WARNING, ERROR, FATAL_ERROR, FATAL_ERROR_WITH_CORE,
INTERNAL_WARNING, INTERNAL_ERROR
};
ReporterMessage(Type arg_type, MsgThread* thread, const string& arg_msg)
: OutputMessage<MsgThread>("ReporterMessage", thread)
{ type = arg_type; msg = arg_msg; }
virtual bool Process();
private:
string msg;
Type type;
};
class HeartbeatMessage : public InputMessage<MsgThread>
{
public:
HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time)
: InputMessage("Heartbeat", thread)
{ network_time = arg_network_time; current_time = arg_current_time; }
virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); }
private:
double network_time;
double current_time;
};
#ifdef DEBUG
class DebugMessage : public OutputMessage<MsgThread>
{
public:
DebugMessage(DebugStream arg_stream, MsgThread* thread, const string& arg_msg)
: OutputMessage<MsgThread>("DebugMessage", thread)
{ stream = arg_stream; msg = arg_msg; }
virtual bool Process()
{
string s = Object()->Name() + ": " + msg;
strreplace(s, "%", "%%");
debug_logger.Log(stream, s.c_str());
return true;
}
private:
string msg;
DebugStream stream;
};
#endif
}
// Methods.
Message::~Message()
{
}
bool ReporterMessage::Process()
{
string s = Object()->Name() + ": " + msg;
strreplace(s, "%", "%%");
const char* cmsg = s.c_str();
switch ( type ) {
case INFO:
reporter->Info(cmsg);
break;
case WARNING:
reporter->Warning(cmsg);
break;
case ERROR:
reporter->Error(cmsg);
break;
case FATAL_ERROR:
reporter->FatalError(cmsg);
break;
case FATAL_ERROR_WITH_CORE:
reporter->FatalErrorWithCore(cmsg);
break;
case INTERNAL_WARNING:
reporter->InternalWarning(cmsg);
break;
case INTERNAL_ERROR :
reporter->InternalError(cmsg);
break;
default:
reporter->InternalError("unknown ReporterMessage type %d", type);
}
return true;
}
MsgThread::MsgThread(const string& name) : BasicThread(name)
{
cnt_sent_in = cnt_sent_out = 0;
thread_mgr->AddMsgThread(this);
}
void MsgThread::OnStop()
{
// This is to unblock the current queue read operation.
SendIn(new TerminateMessage(this), true);
}
void MsgThread::Heartbeat()
{
SendIn(new HeartbeatMessage(this, network_time, current_time()));
}
void MsgThread::Info(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg));
}
void MsgThread::Warning(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::WARNING, this, msg));
}
void MsgThread::Error(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::ERROR, this, msg));
}
void MsgThread::FatalError(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR, this, msg));
}
void MsgThread::FatalErrorWithCore(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR_WITH_CORE, this, msg));
}
void MsgThread::InternalWarning(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INTERNAL_WARNING, this, msg));
}
void MsgThread::InternalError(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INTERNAL_ERROR, this, msg));
}
#ifdef DEBUG
void MsgThread::Debug(DebugStream stream, const char* msg)
{
SendOut(new DebugMessage(stream, this, msg));
}
#endif
void MsgThread::SendIn(BasicInputMessage* msg, bool force)
{
if ( Terminating() && ! force )
return;
DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str());
queue_in.Put(msg);
++cnt_sent_in;
}
void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
{
if ( Terminating() && ! force )
return;
queue_out.Put(msg);
++cnt_sent_out;
}
BasicOutputMessage* MsgThread::RetrieveOut()
{
BasicOutputMessage* msg = queue_out.Get();
assert(msg);
#ifdef DEBUG
if ( msg->Name() != "DebugMessage" ) // Avoid recursion.
{
string s = Fmt("Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str());
Debug(DBG_THREADING, s.c_str());
}
#endif
return msg;
}
BasicInputMessage* MsgThread::RetrieveIn()
{
BasicInputMessage* msg = queue_in.Get();
assert(msg);
#ifdef DEBUG
string s = Fmt("Retrieved '%s' in %s", msg->Name().c_str(), Name().c_str());
Debug(DBG_THREADING, s.c_str());
#endif
return msg;
}
void MsgThread::Run()
{
while ( true )
{
// When requested to terminate, we only do so when
// all input has been processed.
if ( Terminating() && ! queue_in.Ready() )
break;
BasicInputMessage* msg = RetrieveIn();
bool result = msg->Process();
if ( ! result )
{
string s = msg->Name() + " failed, terminating thread";
Error(s.c_str());
Stop();
break;
}
delete msg;
}
}
void MsgThread::GetStats(Stats* stats)
{
stats->sent_in = cnt_sent_in;
stats->sent_out = cnt_sent_out;
stats->pending_in = cnt_sent_in - queue_in.Size();
stats->pending_out = cnt_sent_out - queue_out.Size();
}

157
src/threading/MsgThread.h Normal file
View file

@ -0,0 +1,157 @@
#ifndef THREADING_MSGTHREAD_H
#define THREADING_MSGTHREAD_H
#include <pthread.h>
#include "DebugLogger.h"
#include "BasicThread.h"
#include "Queue.h"
namespace threading {
class BasicInputMessage;
class BasicOutputMessage;
class HeartbeatMessage;
class MsgThread : public BasicThread
{
public:
MsgThread(const string& name);
void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); }
void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); }
BasicOutputMessage* RetrieveOut();
// Report an informational message, nothing that needs specific
// attention.
void Info(const char* msg);
// Report a warning that may indicate a problem.
void Warning(const char* msg);
// Report a non-fatal error. Processing proceeds normally after the error
// has been reported.
void Error(const char* msg);
// Report a fatal error. Bro will terminate after the message has been
// reported.
void FatalError(const char* msg);
// Report a fatal error. Bro will terminate after the message has been
// reported and always generate a core dump.
void FatalErrorWithCore(const char* msg);
// Report about a potential internal problem. Bro will continue
// normally.
void InternalWarning(const char* msg);
// Report an internal program error. Bro will terminate with a core
// dump after the message has been reported.
void InternalError(const char* msg);
#ifdef DEBUG
// Records a debug message for the given stream.
void Debug(DebugStream stream, const char* msg);
#endif
void Heartbeat();
struct Stats
{
uint64_t sent_in;
uint64_t sent_out;
uint64_t pending_in;
uint64_t pending_out;
};
void GetStats(Stats* stats);
protected:
friend class HeartbeatMessage;
virtual void Run();
virtual void OnStop();
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
private:
friend class Manager;
BasicInputMessage* RetrieveIn();
void SendIn(BasicInputMessage* msg, bool force);
void SendOut(BasicOutputMessage* msg, bool force);
bool HasIn() { return queue_in.Ready(); }
bool HasOut() { return queue_out.Ready(); }
Queue_<BasicInputMessage *> queue_in;
Queue_<BasicOutputMessage *> queue_out;
uint64_t cnt_sent_in;
uint64_t cnt_sent_out;
};
class Message
{
public:
virtual ~Message();
const string& Name() const { return name; }
virtual bool Process() = 0; // Thread will be terminated if returngin false.
protected:
Message(const string& arg_name) { name = arg_name; }
private:
string name;
};
class BasicInputMessage : public Message
{
protected:
BasicInputMessage(const string& name) : Message(name) {}
};
class BasicOutputMessage : public Message
{
protected:
BasicOutputMessage(const string& name) : Message(name) {}
};
template<typename O>
class InputMessage : public BasicInputMessage
{
public:
O* Object() const { return object; }
protected:
InputMessage(const string& name, O* arg_object) : BasicInputMessage(name)
{ object = arg_object; }
private:
O* object;
};
template<typename O>
class OutputMessage : public BasicOutputMessage
{
public:
O* Object() const { return object; }
protected:
OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name)
{ object = arg_object; }
private:
O* object;
};
}
#endif

150
src/threading/Queue.h Normal file
View file

@ -0,0 +1,150 @@
#ifndef THREADING_QUEUE_H
#define THREADING_QUEUE_H
#include <pthread.h>
#include <queue>
#include <deque>
#include <stdint.h>
#include "Reporter.h"
namespace threading {
/**
* Just a simple threaded queue wrapper class. Uses multiple queues and reads / writes in rotary fashion in an attempt to limit contention.
* Due to locking granularity, bulk put / get is no faster than single put / get as long as FIFO guarantee is required.
*/
template<typename T>
class Queue_
{
public:
Queue_();
~Queue_();
T Get();
void Put(T data);
bool Ready();
uint64_t Size();
private:
static const int NUM_QUEUES = 8;
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses.
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to
uint64_t size;
};
inline static void safe_lock(pthread_mutex_t* mutex)
{
if ( pthread_mutex_lock(mutex) != 0 )
reporter->FatalErrorWithCore("cannot lock mutex");
}
inline static void safe_unlock(pthread_mutex_t* mutex)
{
if ( pthread_mutex_unlock(mutex) != 0 )
reporter->FatalErrorWithCore("cannot unlock mutex");
}
template<typename T>
inline Queue_<T>::Queue_()
{
read_ptr = 0;
write_ptr = 0;
for( int i = 0; i < NUM_QUEUES; ++i )
{
if ( pthread_cond_init(&has_data[i], NULL) != 0 )
reporter->FatalError("cannot init queue condition variable");
if ( pthread_mutex_init(&mutex[i], NULL) != 0 )
reporter->FatalError("cannot init queue mutex");
}
}
template<typename T>
inline Queue_<T>::~Queue_()
{
for( int i = 0; i < NUM_QUEUES; ++i )
{
pthread_cond_destroy(&has_data[i]);
pthread_mutex_destroy(&mutex[i]);
}
}
template<typename T>
inline T Queue_<T>::Get()
{
safe_lock(&mutex[read_ptr]);
int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() )
pthread_cond_wait(&has_data[read_ptr], &mutex[read_ptr]);
T data = messages[read_ptr].front();
messages[read_ptr].pop();
--size;
read_ptr = (read_ptr + 1) % NUM_QUEUES;
safe_unlock(&mutex[old_read_ptr]);
return data;
}
template<typename T>
inline void Queue_<T>::Put(T data)
{
safe_lock(&mutex[write_ptr]);
int old_write_ptr = write_ptr;
bool need_signal = messages[write_ptr].empty();
messages[write_ptr].push(data);
++size;
if ( need_signal )
pthread_cond_signal(&has_data[write_ptr]);
write_ptr = (write_ptr + 1) % NUM_QUEUES;
safe_unlock(&mutex[old_write_ptr]);
}
template<typename T>
inline bool Queue_<T>::Ready()
{
safe_lock(&mutex[read_ptr]);
bool ret = (messages[read_ptr].size());
safe_unlock(&mutex[read_ptr]);
return ret;
}
template<typename T>
inline uint64_t Queue_<T>::Size()
{
safe_lock(&mutex[read_ptr]);
uint64_t s = size;
safe_unlock(&mutex[read_ptr]);
return s;
}
}
#endif