Doing bulkd writes instead of individual writes now.

Also slight change to Writer API, going back to how the rotate methods
were before.
This commit is contained in:
Robin Sommer 2012-02-01 00:34:18 -08:00
parent a428645b2a
commit 4f0fc571ef
9 changed files with 155 additions and 67 deletions

View file

@ -31,13 +31,13 @@ private:
bool terminating;
};
class DisableMessage : public threading::OutputMessage<WriterFrontend>
class FlushWriteBufferMessage : public threading::OutputMessage<WriterFrontend>
{
public:
DisableMessage(WriterFrontend* writer)
: threading::OutputMessage<WriterFrontend>("Disable", writer) {}
FlushWriteBufferMessage(WriterFrontend* writer)
: threading::OutputMessage<WriterFrontend>("FlushWriteBuffer", writer) {}
virtual bool Process() { Object()->SetDisable(); return true; }
virtual bool Process() { Object()->FlushWriteBuffer(); return true; }
};
}
@ -65,25 +65,31 @@ WriterBackend::~WriterBackend()
}
}
void WriterBackend::DeleteVals(Value** vals)
void WriterBackend::DeleteVals(int num_writes, Value*** vals)
{
for ( int j = 0; j < num_writes; ++j )
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete vals[j][i];
delete [] vals[j];
}
delete [] vals;
}
bool WriterBackend::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
bool WriterBackend::FinishedRotation(string new_name, string old_name,
double open, double close, bool terminating)
{
SendOut(new RotationFinishedMessage(writer, new_name, old_name, open, close, terminating));
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
return true;
}
bool WriterBackend::Init(string arg_path, int arg_num_fields,
bool WriterBackend::Init(WriterFrontend* arg_frontend, string arg_path, int arg_num_fields,
const Field* const * arg_fields)
{
frontend = arg_frontend;
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
@ -94,7 +100,7 @@ bool WriterBackend::Init(string arg_path, int arg_num_fields,
return true;
}
bool WriterBackend::Write(int arg_num_fields, Value** vals)
bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
{
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
@ -107,30 +113,42 @@ bool WriterBackend::Write(int arg_num_fields, Value** vals)
Debug(DBG_LOGGING, msg);
#endif
DeleteVals(vals);
DeleteVals(num_writes, vals);
return false;
}
#ifdef DEBUG
// Double-check all the types match.
for ( int j = 0; j < num_writes; j++ )
{
for ( int i = 0; i < num_fields; ++i )
{
if ( vals[i]->type != fields[i]->type )
if ( vals[j][i]->type != fields[i]->type )
{
#ifdef DEBUG
const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)",
vals[i]->type, fields[i]->type);
vals[j][i]->type, fields[i]->type);
Debug(DBG_LOGGING, msg);
#endif
DeleteVals(vals);
DeleteVals(num_writes, vals);
return false;
}
}
}
#endif
bool result = DoWrite(num_fields, fields, vals);
bool success = true;
DeleteVals(vals);
for ( int j = 0; j < num_writes; j++ )
{
success = DoWrite(num_fields, fields, vals[j]);
return result;
if ( ! success )
break;
}
DeleteVals(num_writes, vals);
return success;
}
bool WriterBackend::SetBuf(bool enabled)
@ -144,10 +162,10 @@ bool WriterBackend::SetBuf(bool enabled)
return DoSetBuf(enabled);
}
bool WriterBackend::Rotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating)
bool WriterBackend::Rotate(string rotated_path, double open,
double close, bool terminating)
{
return DoRotate(writer, rotated_path, open, close, terminating);
return DoRotate(rotated_path, open, close, terminating);
}
bool WriterBackend::Flush()
@ -159,3 +177,11 @@ bool WriterBackend::Finish()
{
return DoFinish();
}
bool WriterBackend::DoHeartbeat(double network_time, double current_time)
{
SendOut(new FlushWriteBufferMessage(frontend));
return true;
}

View file

@ -19,6 +19,12 @@ public:
virtual ~WriterBackend();
// One-time initialization of the writer to define the logged fields.
//
// "frontend" is the frontend writer that created this backend. The
// *only* purpose of this value is to be passed back via messages as
// a argument to callbacks. One must not otherwise access the
// frontend, it's running in a different thread.
//
// Interpretation of "path" is left to the writer, and will be
// corresponding the value configured on the script-level.
//
@ -27,7 +33,7 @@ public:
//
// The new instance takes ownership of "fields", and will delete them
// when done.
bool Init(string path, int num_fields, const Field* const * fields);
bool Init(WriterFrontend* frontend, string path, int num_fields, const Field* const * fields);
// Writes one log entry. The method takes ownership of "vals" and
// will return immediately after queueing the write request, which is
@ -38,7 +44,7 @@ public:
//
// Returns false if an error occured, in which case the writer must
// not be used any further.
bool Write(int num_fields, Value** vals);
bool Write(int num_fields, int num_writes, Value*** vals);
// Sets the buffering status for the writer, if the writer supports
// that. (If not, it will be ignored).
@ -50,7 +56,7 @@ public:
// Triggers rotation, if the writer supports that. (If not, it will
// be ignored).
bool Rotate(WriterFrontend* writer, string rotated_path, double open, double close, bool terminating);
bool Rotate(string rotated_path, double open, double close, bool terminating);
// Finishes writing to this logger regularly. Must not be called if
// an error has been indicated earlier. After calling this, no
@ -81,9 +87,10 @@ public:
//
// terminating: True if rotation request occured due to the main Bro
// process shutting down.
bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
bool FinishedRotation(string new_name, string old_name,
double open, double close, bool terminating);
protected:
// Methods for writers to override. If any of these returs false, it
// will be assumed that a fatal error has occured that prevents the
@ -128,11 +135,6 @@ protected:
// RotationDone() to signal the log manager that potential
// postprocessors can now run.
//
// "writer" is the frontend writer that triggered the rotation. The
// *only* purpose of this value is to be passed into
// FinishedRotation() once done. You must not otherwise access the
// frontend, it's running in a different thread.
//
// "rotate_path" reflects the path to where the rotated output is to
// be moved, with specifics depending on the writer. It should
// generally be interpreted in a way consistent with that of "path"
@ -149,8 +151,8 @@ protected:
//
// A writer may ignore rotation requests if it doesn't fit with its
// semantics (but must still return true in that case).
virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating) = 0;
virtual bool DoRotate(string rotated_path, double open, double close,
bool terminating) = 0;
// Called once on termination. Not called when any of the other
// methods has previously signaled an error, i.e., executing this
@ -158,7 +160,9 @@ protected:
virtual bool DoFinish() = 0;
// Triggered by regular heartbeat messages from the main process.
virtual bool DoHeartbeat(double network_time, double current_time) { return true; };
//
// Note when overriding, you must call WriterBackend::DoHeartbeat().
virtual bool DoHeartbeat(double network_time, double current_time);
private:
friend class Manager;
@ -169,8 +173,9 @@ private:
bool Disabled() { return disabled; }
// Deletes the values as passed into Write().
void DeleteVals(Value** vals);
void DeleteVals(int num_writes, Value*** vals);
WriterFrontend* frontend;
string path;
int num_fields;
const Field* const * fields;

View file

@ -9,13 +9,14 @@ namespace logging {
class InitMessage : public threading::InputMessage<WriterBackend>
{
public:
InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const *fields)
InitMessage(WriterBackend* backend, WriterFrontend* frontend, const string path, const int num_fields, const Field* const *fields)
: threading::InputMessage<WriterBackend>("Init", backend),
path(path), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(path, num_fields, fields); }
virtual bool Process() { return Object()->Init(frontend, path, num_fields, fields); }
private:
WriterFrontend* frontend;
const string path;
const int num_fields;
const Field * const* fields;
@ -31,7 +32,7 @@ public:
rotated_path(rotated_path), open(open),
close(close), terminating(terminating) { }
virtual bool Process() { return Object()->Rotate(frontend, rotated_path, open, close, terminating); }
virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); }
private:
WriterFrontend* frontend;
@ -44,16 +45,16 @@ private:
class WriteMessage : public threading::InputMessage<WriterBackend>
{
public:
WriteMessage(WriterBackend* backend, const int num_fields, Value **vals)
WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals)
: threading::InputMessage<WriterBackend>("Write", backend),
num_fields(num_fields), fields(fields), vals(vals) {}
num_fields(num_fields), vals(vals) {}
virtual bool Process() { return Object()->Write(num_fields, vals); }
virtual bool Process() { return Object()->Write(num_fields, num_writes, vals); }
private:
int num_fields;
Field* const* fields;
Value **vals;
int num_writes;
Value ***vals;
};
class SetBufMessage : public threading::InputMessage<WriterBackend>
@ -96,6 +97,8 @@ using namespace logging;
WriterFrontend::WriterFrontend(bro_int_t type)
{
disabled = initialized = false;
buf = true;
write_buffer_pos = 0;
backend = log_mgr->CreateBackend(type);
assert(backend);
@ -108,6 +111,7 @@ WriterFrontend::~WriterFrontend()
void WriterFrontend::Stop()
{
FlushWriteBuffer();
SetDisable();
backend->Stop();
}
@ -125,7 +129,7 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
fields = arg_fields;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
backend->SendIn(new InitMessage(backend, this, arg_path, arg_num_fields, arg_fields));
}
void WriterFrontend::Write(int num_fields, Value** vals)
@ -133,7 +137,34 @@ void WriterFrontend::Write(int num_fields, Value** vals)
if ( disabled )
return;
backend->SendIn(new WriteMessage(backend, num_fields, vals));
if ( ! write_buffer )
{
// Need new buffer.
write_buffer = new Value**[WRITER_BUFFER_SIZE];
write_buffer_pos = 0;
}
if ( write_buffer_pos >= WRITER_BUFFER_SIZE )
// Buffer full.
FlushWriteBuffer();
write_buffer[write_buffer_pos++] = vals;
if ( ! buf )
// Send out immediately if we don't want buffering.
FlushWriteBuffer();
}
void WriterFrontend::FlushWriteBuffer()
{
if ( ! write_buffer_pos )
// Nothing to do.
return;
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer));
// Clear buffer (no delete, we pass ownership to child thread.)
write_buffer = 0;
}
void WriterFrontend::SetBuf(bool enabled)
@ -141,7 +172,13 @@ void WriterFrontend::SetBuf(bool enabled)
if ( disabled )
return;
buf = enabled;
backend->SendIn(new SetBufMessage(backend, enabled));
if ( ! buf )
// Make sure no longer buffer any still queued data.
FlushWriteBuffer();
}
void WriterFrontend::Flush()
@ -149,6 +186,7 @@ void WriterFrontend::Flush()
if ( disabled )
return;
FlushWriteBuffer();
backend->SendIn(new FlushMessage(backend));
}
@ -157,6 +195,7 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool
if ( disabled )
return;
FlushWriteBuffer();
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
}
@ -165,6 +204,7 @@ void WriterFrontend::Finish()
if ( disabled )
return;
FlushWriteBuffer();
backend->SendIn(new FinishMessage(backend));
}

View file

@ -34,6 +34,7 @@ public:
void Write(int num_fields, Value** vals);
void SetBuf(bool enabled);
void Flush();
void FlushWriteBuffer();
void Rotate(string rotated_path, double open, double close, bool terminating);
void Finish();
@ -49,18 +50,22 @@ public:
protected:
friend class Manager;
WriterBackend* backend;
bool disabled;
bool initialized;
bool buf;
string path;
int num_fields;
const Field* const * fields;
// Buffer for bulk writes.
static const int WRITER_BUFFER_SIZE = 50;
int write_buffer_pos;
Value*** write_buffer;
};
}
#endif

View file

@ -317,8 +317,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
return true;
}
bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open,
double close, bool terminating)
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
{
// Don't rotate special files or if there's not one currently open.
if ( ! file || IsSpecial(Path()) )
@ -330,7 +329,7 @@ bool Ascii::DoRotate(WriterFrontend* writer, string rotated_path, double open,
string nname = rotated_path + "." + LogExt();
rename(fname.c_str(), nname.c_str());
if ( ! FinishedRotation(writer, nname, fname, open, close, terminating) )
if ( ! FinishedRotation(nname, fname, open, close, terminating) )
{
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false;

View file

@ -23,8 +23,8 @@ protected:
virtual bool DoWrite(int num_fields, const Field* const * fields,
Value** vals);
virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating);
virtual bool DoRotate(string rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush();
virtual bool DoFinish();

View file

@ -4,10 +4,9 @@
using namespace logging;
using namespace writer;
bool None::DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating)
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
{
if ( ! FinishedRotation(writer, string("/dev/null"), Path(), open, close, terminating))
if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating))
{
Error(Fmt("error rotating %s", Path().c_str()));
return false;

View file

@ -23,8 +23,8 @@ protected:
virtual bool DoWrite(int num_fields, const Field* const * fields,
Value** vals) { return true; }
virtual bool DoSetBuf(bool enabled) { return true; }
virtual bool DoRotate(WriterFrontend* writer, string rotated_path,
double open, double close, bool terminating);
virtual bool DoRotate(string rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush() { return true; }
virtual bool DoFinish() { return true; }
};

View file

@ -184,8 +184,11 @@ protected:
* Triggers a heartbeat message being sent to the client thread.
*
* This is method is called regularly by the threading::Manager.
*
* Can be overriden in derived classed to hook into the heart beat,
* but must call the parent implementation.
*/
void Heartbeat();
virtual void Heartbeat();
/**
* Overriden from BasicThread.
@ -194,6 +197,17 @@ protected:
virtual void Run();
virtual void OnStop();
/**
* Regulatly triggered for execution in the child thread.
*
* When overriding, one must call the parent class' implementation.
*
* network_time: The network_time when the heartbeat was trigger by
* the main thread.
*
* current_time: Wall clock when the heartbeat was trigger by the
* main thread.
*/
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
private: