This commit is contained in:
Robin Sommer 2012-02-03 02:41:10 -08:00
parent 29fc56105d
commit ffb4094d36
9 changed files with 35 additions and 40 deletions

View file

@ -21,7 +21,7 @@ struct WriterDefinition {
bro_int_t type; // The type. bro_int_t type; // The type.
const char *name; // Descriptive name for error messages. const char *name; // Descriptive name for error messages.
bool (*init)(); // An optional one-time initialization function. bool (*init)(); // An optional one-time initialization function.
WriterBackend* (*factory)(); // A factory function creating instances. WriterBackend* (*factory)(WriterFrontend* frontend); // A factory function creating instances.
}; };
// Static table defining all availabel log writers. // Static table defining all availabel log writers.
@ -30,7 +30,7 @@ WriterDefinition log_writers[] = {
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
// End marker, don't touch. // End marker, don't touch.
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)())0 } { BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)(WriterFrontend* frontend))0 }
}; };
struct Manager::Filter { struct Manager::Filter {
@ -436,7 +436,7 @@ Manager::~Manager()
delete *s; delete *s;
} }
WriterBackend* Manager::CreateBackend(bro_int_t type) WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
{ {
WriterDefinition* ld = log_writers; WriterDefinition* ld = log_writers;
@ -478,7 +478,7 @@ WriterBackend* Manager::CreateBackend(bro_int_t type)
assert(ld->factory); assert(ld->factory);
WriterBackend* backend = (*ld->factory)(); WriterBackend* backend = (*ld->factory)(frontend);
assert(backend); assert(backend);
return backend; return backend;
} }

View file

@ -259,7 +259,7 @@ protected:
// Instantiates a new WriterBackend of the given type (note that // Instantiates a new WriterBackend of the given type (note that
// doing so creates a new thread!). // doing so creates a new thread!).
WriterBackend* CreateBackend(bro_int_t type); WriterBackend* CreateBackend(WriterFrontend* frontend, bro_int_t type);
//// Function also used by the RemoteSerializer. //// Function also used by the RemoteSerializer.

View file

@ -55,12 +55,13 @@ public:
using namespace logging; using namespace logging;
WriterBackend::WriterBackend(const string& name) : MsgThread(name) WriterBackend::WriterBackend(WriterFrontend* arg_frontend, const string& name) : MsgThread(name)
{ {
path = "<not set>"; path = "<not set>";
num_fields = 0; num_fields = 0;
fields = 0; fields = 0;
buffering = true; buffering = true;
frontend = arg_frontend;
} }
WriterBackend::~WriterBackend() WriterBackend::~WriterBackend()
@ -100,10 +101,8 @@ void WriterBackend::DisableFrontend()
SendOut(new DisableMessage(frontend)); SendOut(new DisableMessage(frontend));
} }
bool WriterBackend::Init(WriterFrontend* arg_frontend, string arg_path, int arg_num_fields, bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const* arg_fields)
const Field* const* arg_fields)
{ {
frontend = arg_frontend;
path = arg_path; path = arg_path;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
@ -227,6 +226,7 @@ bool WriterBackend::Finish()
bool WriterBackend::DoHeartbeat(double network_time, double current_time) bool WriterBackend::DoHeartbeat(double network_time, double current_time)
{ {
SendOut(new FlushWriteBufferMessage(frontend)); SendOut(new FlushWriteBufferMessage(frontend));
return true; return true;
} }

View file

@ -26,9 +26,15 @@ public:
/** /**
* Constructor. * Constructor.
* *
* @param frontend The frontend writer that created this backend. The
* *only* purpose of this value is to be passed back via messages as
* a argument to callbacks. One must not otherwise access the
* frontend, it's running in a different thread.
*
* @param name A descriptive name for writer's type (e.g., \c Ascii). * @param name A descriptive name for writer's type (e.g., \c Ascii).
*
*/ */
WriterBackend(const string& name); WriterBackend(WriterFrontend* frontend, const string& name);
/** /**
* Destructor. * Destructor.
@ -38,11 +44,6 @@ public:
/** /**
* One-time initialization of the writer to define the logged fields. * One-time initialization of the writer to define the logged fields.
* *
* @param frontend The frontend writer that created this backend. The
* *only* purpose of this value is to be passed back via messages as
* a argument to callbacks. One must not otherwise access the
* frontend, it's running in a different thread.
*
* @param path A string left to the interpretation of the writer * @param path A string left to the interpretation of the writer
* implementation; it corresponds to the value configured on the * implementation; it corresponds to the value configured on the
* script-level for the logging filter. * script-level for the logging filter.
@ -54,7 +55,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(WriterFrontend* frontend, string path, int num_fields, const Field* const* fields); bool Init(string path, int num_fields, const Field* const* fields);
/** /**
* Writes one log entry. * Writes one log entry.

View file

@ -9,14 +9,13 @@ namespace logging {
class InitMessage : public threading::InputMessage<WriterBackend> class InitMessage : public threading::InputMessage<WriterBackend>
{ {
public: public:
InitMessage(WriterBackend* backend, WriterFrontend* frontend, const string path, const int num_fields, const Field* const* fields) InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const* fields)
: threading::InputMessage<WriterBackend>("Init", backend), : threading::InputMessage<WriterBackend>("Init", backend),
path(path), num_fields(num_fields), fields(fields) { } path(path), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(frontend, path, num_fields, fields); } virtual bool Process() { return Object()->Init(path, num_fields, fields); }
private: private:
WriterFrontend* frontend;
const string path; const string path;
const int num_fields; const int num_fields;
const Field * const* fields; const Field * const* fields;
@ -47,7 +46,7 @@ class WriteMessage : public threading::InputMessage<WriterBackend>
public: public:
WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals) WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals)
: threading::InputMessage<WriterBackend>("Write", backend), : threading::InputMessage<WriterBackend>("Write", backend),
num_fields(num_fields), vals(vals) {} num_fields(num_fields), num_writes(num_writes), vals(vals) {}
virtual bool Process() { return Object()->Write(num_fields, num_writes, vals); } virtual bool Process() { return Object()->Write(num_fields, num_writes, vals); }
@ -98,8 +97,9 @@ WriterFrontend::WriterFrontend(bro_int_t type)
{ {
disabled = initialized = false; disabled = initialized = false;
buf = true; buf = true;
write_buffer = 0;
write_buffer_pos = 0; write_buffer_pos = 0;
backend = log_mgr->CreateBackend(type); backend = log_mgr->CreateBackend(this, type);
assert(backend); assert(backend);
backend->Start(); backend->Start();
@ -129,7 +129,7 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
fields = arg_fields; fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, this, arg_path, arg_num_fields, arg_fields)); backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
} }
void WriterFrontend::Write(int num_fields, Value** vals) void WriterFrontend::Write(int num_fields, Value** vals)
@ -144,15 +144,12 @@ void WriterFrontend::Write(int num_fields, Value** vals)
write_buffer_pos = 0; write_buffer_pos = 0;
} }
if ( write_buffer_pos >= WRITER_BUFFER_SIZE )
// Buffer full.
FlushWriteBuffer();
write_buffer[write_buffer_pos++] = vals; write_buffer[write_buffer_pos++] = vals;
if ( ! buf ) if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf )
// Send out immediately if we don't want buffering. // Buffer full (or no bufferin desired).
FlushWriteBuffer(); FlushWriteBuffer();
} }
void WriterFrontend::FlushWriteBuffer() void WriterFrontend::FlushWriteBuffer()
@ -165,6 +162,7 @@ void WriterFrontend::FlushWriteBuffer()
// Clear buffer (no delete, we pass ownership to child thread.) // Clear buffer (no delete, we pass ownership to child thread.)
write_buffer = 0; write_buffer = 0;
write_buffer_pos = 0;
} }
void WriterFrontend::SetBuf(bool enabled) void WriterFrontend::SetBuf(bool enabled)

View file

@ -10,7 +10,7 @@
using namespace logging; using namespace logging;
using namespace writer; using namespace writer;
Ascii::Ascii() : WriterBackend("Ascii") Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend, "Ascii")
{ {
file = 0; file = 0;

View file

@ -11,10 +11,11 @@ namespace logging { namespace writer {
class Ascii : public WriterBackend { class Ascii : public WriterBackend {
public: public:
Ascii(); Ascii(WriterFrontend* frontend);
~Ascii(); ~Ascii();
static WriterBackend* Instantiate() { return new Ascii; } static WriterBackend* Instantiate(WriterFrontend* frontend)
{ return new Ascii(frontend); }
static string LogExt(); static string LogExt();
protected: protected:

View file

@ -11,10 +11,11 @@ namespace logging { namespace writer {
class None : public WriterBackend { class None : public WriterBackend {
public: public:
None() : WriterBackend("None") {} None(WriterFrontend* frontend) : WriterBackend(frontend, "None") {}
~None() {}; ~None() {};
static WriterBackend* Instantiate() { return new None; } static WriterBackend* Instantiate(WriterFrontend* frontend)
{ return new None(frontend); }
protected: protected:
virtual bool DoInit(string path, int num_fields, virtual bool DoInit(string path, int num_fields,

View file

@ -214,13 +214,7 @@ BasicOutputMessage* MsgThread::RetrieveOut()
BasicOutputMessage* msg = queue_out.Get(); BasicOutputMessage* msg = queue_out.Get();
assert(msg); assert(msg);
#ifdef DEBUG DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str());
if ( msg->Name() != "DebugMessage" ) // Avoid recursion.
{
string s = Fmt("Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str());
Debug(DBG_THREADING, s.c_str());
}
#endif
return msg; return msg;
} }