Merge branch 'topic/robin/log-threads'

* topic/robin/log-threads: (42 commits)
  Two more tweaks to reliably terminate when reading from trace.
  This could be fixing the memory problems finally.
  Fix compile errors due to now-explicit IPAddr ctors and global IPFamily enum.
  Switching log buffer size back to normal
  Teaching cmake to always link in tcmalloc if it finds it.
  Extending queue statistics.
  Small fixes and tweaks.
  Don't assert during shutdown.
  Reverting accidental commit.
  Finetuning communication CPU usage.
  Adding new leak tests involving remote logging.
  Removing some no longer needed checks.
  Fixing problem logging remotely when local logging was turned off.
  Preventing busy looping when no threads have been spawned.
  Prevent manager from busy looping.
  Adding missing includes needed on FreeBSD.
  Updating submodule(s).
  Updating submodule(s).
  A number of bugfixes for the recent threading updates.
  Making exchange of addresses between threads thread-safe.
  ...
This commit is contained in:
Robin Sommer 2012-04-04 17:25:20 -07:00
commit 077089a047
69 changed files with 4171 additions and 1157 deletions

1271
src/logging/Manager.cc Normal file

File diff suppressed because it is too large Load diff

205
src/logging/Manager.h Normal file
View file

@ -0,0 +1,205 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// A class managing log writers and filters.
#ifndef LOGGING_MANAGER_H
#define LOGGING_MANAGER_H
#include "../Val.h"
#include "../EventHandler.h"
#include "../RemoteSerializer.h"
class SerializationFormat;
class RemoteSerializer;
class RotationTimer;
namespace logging {
class WriterBackend;
class WriterFrontend;
class RotationFinishedMessage;
/**
* Singleton class for managing log streams.
*/
class Manager {
public:
/**
* Constructor.
*/
Manager();
/**
* Destructor.
*/
~Manager();
/**
* Creates a new log stream.
*
* @param id The enum value corresponding the log stream.
*
* @param stream A record of script type \c Log::Stream.
*
* This method corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool CreateStream(EnumVal* id, RecordVal* stream);
/**
* Enables a log log stream.
*
* @param id The enum value corresponding the log stream.
*
* This method corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool EnableStream(EnumVal* id);
/**
* Disables a log stream.
*
* @param id The enum value corresponding the log stream.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool DisableStream(EnumVal* id);
/**
* Adds a filter to a log stream.
*
* @param id The enum value corresponding the log stream.
*
* @param filter A record of script type \c Log::Filter.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool AddFilter(EnumVal* id, RecordVal* filter);
/**
* Removes a filter from a log stream.
*
* @param id The enum value corresponding the log stream.
*
* @param name The name of the filter to remove.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool RemoveFilter(EnumVal* id, StringVal* name);
/**
* Removes a filter from a log stream.
*
* @param id The enum value corresponding the log stream.
*
* @param name The name of the filter to remove.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool RemoveFilter(EnumVal* id, string name);
/**
* Write a record to a log stream.
*
* @param id The enum value corresponding the log stream.
*
* @param colums A record of the type defined for the stream's
* columns.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool Write(EnumVal* id, RecordVal* columns);
/**
* Sets log streams buffering state. This adjusts all associated
* writers to the new state.
*
* @param id The enum value corresponding the log stream.
*
* @param enabled False to disable buffering (default is enabled).
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool SetBuf(EnumVal* id, bool enabled);
/**
* Flushes a log stream. This flushed all associated writers.
*
* @param id The enum value corresponding the log stream.
*
* This methods corresponds directly to the internal BiF defined in
* logging.bif, which just forwards here.
*/
bool Flush(EnumVal* id);
/**
* Prepares the log manager to terminate. This will flush all log
* stream.
*/
void Terminate();
protected:
friend class WriterFrontend;
friend class RotationFinishedMessage;
friend class ::RemoteSerializer;
friend class ::RotationTimer;
// Instantiates a new WriterBackend of the given type (note that
// doing so creates a new thread!).
WriterBackend* CreateBackend(WriterFrontend* frontend, bro_int_t type);
//// Function also used by the RemoteSerializer.
// Takes ownership of fields.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const threading::Field* const* fields,
bool local, bool remote);
// Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path,
int num_fields, threading::Value** vals);
// Announces all instantiated writers to peer.
void SendAllWritersTo(RemoteSerializer::PeerID peer);
// Signals that a file has been rotated.
bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating);
// Deletes the values as passed into Write().
void DeleteVals(int num_fields, threading::Value** vals);
private:
struct Filter;
struct Stream;
struct WriterInfo;
bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices);
threading::Value** RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns);
threading::Value* ValToLogVal(Val* val, BroType* ty = 0);
Stream* FindStream(EnumVal* id);
void RemoveDisabledWriters(Stream* stream);
void InstallRotationTimer(WriterInfo* winfo);
void Rotate(WriterInfo* info);
Filter* FindFilter(EnumVal* id, StringVal* filter);
WriterInfo* FindWriter(WriterFrontend* writer);
vector<Stream *> streams; // Indexed by stream enum.
};
}
extern logging::Manager* log_mgr;
#endif

View file

@ -0,0 +1,281 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "util.h"
#include "threading/SerialTypes.h"
#include "WriterBackend.h"
#include "WriterFrontend.h"
// Messages sent from backend to frontend (i.e., "OutputMessages").
using threading::Value;
using threading::Field;
namespace logging {
class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
new_name(new_name), old_name(old_name), open(open),
close(close), terminating(terminating) { }
virtual bool Process()
{
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
}
private:
string new_name;
string old_name;
double open;
double close;
bool terminating;
};
class FlushWriteBufferMessage : public threading::OutputMessage<WriterFrontend>
{
public:
FlushWriteBufferMessage(WriterFrontend* writer)
: threading::OutputMessage<WriterFrontend>("FlushWriteBuffer", writer) {}
virtual bool Process() { Object()->FlushWriteBuffer(); return true; }
};
class DisableMessage : public threading::OutputMessage<WriterFrontend>
{
public:
DisableMessage(WriterFrontend* writer)
: threading::OutputMessage<WriterFrontend>("Disable", writer) {}
virtual bool Process() { Object()->SetDisable(); return true; }
};
}
// Backend methods.
using namespace logging;
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
{
path = "<path not yet set>";
num_fields = 0;
fields = 0;
buffering = true;
frontend = arg_frontend;
SetName(frontend->Name());
}
WriterBackend::~WriterBackend()
{
if ( fields )
{
for(int i = 0; i < num_fields; ++i)
delete fields[i];
delete [] fields;
}
}
void WriterBackend::DeleteVals(int num_writes, Value*** vals)
{
for ( int j = 0; j < num_writes; ++j )
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[j][i];
delete [] vals[j];
}
delete [] vals;
}
bool WriterBackend::FinishedRotation(string new_name, string old_name,
double open, double close, bool terminating)
{
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
return true;
}
void WriterBackend::DisableFrontend()
{
SendOut(new DisableMessage(frontend));
}
bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const* arg_fields)
{
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
string name = Fmt("%s/%s", path.c_str(), frontend->Name().c_str());
SetName(name);
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
{
DisableFrontend();
return false;
}
return true;
}
bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
{
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
if ( num_fields != arg_num_fields )
{
#ifdef DEBUG
const char* msg = Fmt("Number of fields don't match in WriterBackend::Write() (%d vs. %d)",
arg_num_fields, num_fields);
Debug(DBG_LOGGING, msg);
#endif
DeleteVals(num_writes, vals);
DisableFrontend();
return false;
}
#ifdef DEBUG
// Double-check all the types match.
for ( int j = 0; j < num_writes; j++ )
{
for ( int i = 0; i < num_fields; ++i )
{
if ( vals[j][i]->type != fields[i]->type )
{
const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)",
vals[j][i]->type, fields[i]->type);
Debug(DBG_LOGGING, msg);
DisableFrontend();
DeleteVals(num_writes, vals);
return false;
}
}
}
#endif
bool success = true;
for ( int j = 0; j < num_writes; j++ )
{
success = DoWrite(num_fields, fields, vals[j]);
if ( ! success )
break;
}
DeleteVals(num_writes, vals);
if ( ! success )
DisableFrontend();
return success;
}
bool WriterBackend::SetBuf(bool enabled)
{
if ( enabled == buffering )
// No change.
return true;
buffering = enabled;
if ( ! DoSetBuf(enabled) )
{
DisableFrontend();
return false;
}
return true;
}
bool WriterBackend::Rotate(string rotated_path, double open,
double close, bool terminating)
{
if ( ! DoRotate(rotated_path, open, close, terminating) )
{
DisableFrontend();
return false;
}
return true;
}
bool WriterBackend::Flush()
{
if ( ! DoFlush() )
{
DisableFrontend();
return false;
}
return true;
}
bool WriterBackend::Finish()
{
if ( ! DoFlush() )
{
DisableFrontend();
return false;
}
return true;
}
bool WriterBackend::DoHeartbeat(double network_time, double current_time)
{
MsgThread::DoHeartbeat(network_time, current_time);
SendOut(new FlushWriteBufferMessage(frontend));
return true;
}
string WriterBackend::Render(const threading::Value::addr_t& addr) const
{
if ( addr.family == IPv4 )
{
char s[INET_ADDRSTRLEN];
if ( inet_ntop(AF_INET, &addr.in.in4, s, INET_ADDRSTRLEN) == NULL )
return "<bad IPv4 address conversion>";
else
return s;
}
else
{
char s[INET6_ADDRSTRLEN];
if ( inet_ntop(AF_INET6, &addr.in.in6, s, INET6_ADDRSTRLEN) == NULL )
return "<bad IPv6 address conversion>";
else
return s;
}
}
string WriterBackend::Render(const threading::Value::subnet_t& subnet) const
{
char l[16];
if ( subnet.prefix.family == IPv4 )
modp_uitoa10(subnet.length - 96, l);
else
modp_uitoa10(subnet.length, l);
string s = Render(subnet.prefix) + "/" + l;
return s;
}

319
src/logging/WriterBackend.h Normal file
View file

@ -0,0 +1,319 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Bridge class between main process and writer threads.
#ifndef LOGGING_WRITERBACKEND_H
#define LOGGING_WRITERBACKEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
namespace logging {
/**
* Base class for writer implementation. When the logging::Manager creates a
* new logging filter, it instantiates a WriterFrontend. That then in turn
* creates a WriterBackend of the right type. The frontend then forwards
* messages over the backend as its methods are called.
*
* All of this methods must be called only from the corresponding child
* thread (the constructor and destructor are the exceptions.)
*/
class WriterBackend : public threading::MsgThread
{
public:
/**
* Constructor.
*
* @param frontend The frontend writer that created this backend. The
* *only* purpose of this value is to be passed back via messages as
* a argument to callbacks. One must not otherwise access the
* frontend, it's running in a different thread.
*
* @param name A descriptive name for writer's type (e.g., \c Ascii).
*
*/
WriterBackend(WriterFrontend* frontend);
/**
* Destructor.
*/
virtual ~WriterBackend();
/**
* One-time initialization of the writer to define the logged fields.
*
* @param path A string left to the interpretation of the writer
* implementation; it corresponds to the value configured on the
* script-level for the logging filter.
*
* @param num_fields The number of log fields for the stream.
*
* @param fields An array of size \a num_fields with the log fields.
* The methods takes ownership of the array.
*
* @return False if an error occured.
*/
bool Init(string path, int num_fields, const threading::Field* const* fields);
/**
* Writes one log entry.
*
* @param num_fields: The number of log fields for this stream. The
* value must match what was passed to Init().
*
* @param An array of size \a num_fields with the log values. Their
* types musst match with the field passed to Init(). The method
* takes ownership of \a vals..
*
* Returns false if an error occured, in which case the writer must
* not be used any further.
*
* @return False if an error occured.
*/
bool Write(int num_fields, int num_writes, threading::Value*** vals);
/**
* Sets the buffering status for the writer, assuming the writer
* supports that. (If not, it will be ignored).
*
* @param enabled False if buffering is to be disabled (by default
* it's on).
*
* @return False if an error occured.
*/
bool SetBuf(bool enabled);
/**
* Flushes any currently buffered output, assuming the writer
* supports that. (If not, it will be ignored).
*
* @return False if an error occured.
*/
bool Flush();
/**
* Triggers rotation, if the writer supports that. (If not, it will
* be ignored).
*
* @return False if an error occured.
*/
bool Rotate(string rotated_path, double open, double close, bool terminating);
/**
* Finishes writing to this logger in a regularl fashion. Must not be
* called if an error has been indicated earlier. After calling this,
* no further writing must be performed.
*
* @return False if an error occured.
*/
bool Finish();
/**
* Disables the frontend that has instantiated this backend. Once
* disabled,the frontend will not send any further message over.
*/
void DisableFrontend();
/**
* Returns the log path as passed into the constructor.
*/
const string Path() const { return path; }
/**
* Returns the number of log fields as passed into the constructor.
*/
int NumFields() const { return num_fields; }
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
/**
* Returns the current buffering state.
*
* @return True if buffering is enabled.
*/
bool IsBuf() { return buffering; }
/**
* Signals that a file has been rotated. This must be called by a
* writer's implementation of DoRotate() once rotation has finished.
*
* Most of the parameters should be passed through from DoRotate().
*
* @param new_name The filename of the rotated file.
*
* @param old_name The filename of the original file.
*
* @param open: The timestamp when the original file was opened.
*
* @param close: The timestamp when the origina file was closed.
*
* @param terminating: True if the original rotation request occured
* due to the main Bro process shutting down.
*/
bool FinishedRotation(string new_name, string old_name,
double open, double close, bool terminating);
/** Helper method to render an IP address as a string.
*
* @param addr The address.
*
* @return An ASCII representation of the address.
*/
string Render(const threading::Value::addr_t& addr) const;
/** Helper method to render an subnet value as a string.
*
* @param addr The address.
*
* @return An ASCII representation of the address.
*/
string Render(const threading::Value::subnet_t& subnet) const;
protected:
/**
* Writer-specific intialization method.
*
* A writer implementation must override this method. If it returns
* false, it will be assumed that a fatal error has occured that
* prevents the writer from further operation; it will then be
* disabled and eventually deleted. When returning false, an
* implementation should also call Error() to indicate what happened.
*/
virtual bool DoInit(string path, int num_fields,
const threading::Field* const* fields) = 0;
/**
* Writer-specific output method implementing recording of fone log
* entry.
*
* A writer implementation must override this method. If it returns
* false, it will be assumed that a fatal error has occured that
* prevents the writer from further operation; it will then be
* disabled and eventually deleted. When returning false, an
* implementation should also call Error() to indicate what happened.
*/
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals) = 0;
/**
* Writer-specific method implementing a change of fthe buffering
* state. If buffering is disabled, the writer should attempt to
* write out information as quickly as possible even if doing so may
* have a performance impact. If enabled (which is the default), it
* may buffer data as helpful and write it out later in a way
* optimized for performance. The current buffering state can be
* queried via IsBuf().
*
* A writer implementation must override this method but it can just
* ignore calls if buffering doesn't align with its semantics.
*
* If the method returns false, it will be assumed that a fatal error
* has occured that prevents the writer from further operation; it
* will then be disabled and eventually deleted. When returning
* false, an implementation should also call Error() to indicate what
* happened.
*/
virtual bool DoSetBuf(bool enabled) = 0;
/**
* Writer-specific method implementing flushing of its output.
*
* A writer implementation must override this method but it can just
* ignore calls if flushing doesn't align with its semantics.
*
* If the method returns false, it will be assumed that a fatal error
* has occured that prevents the writer from further operation; it
* will then be disabled and eventually deleted. When returning
* false, an implementation should also call Error() to indicate what
* happened.
*/
virtual bool DoFlush() = 0;
/**
* Writer-specific method implementing log rotation. Most directly
* this only applies to writers writing into files, which should then
* close the current file and open a new one. However, a writer may
* also trigger other apppropiate actions if semantics are similar. *
* Once rotation has finished, the implementation must call
* FinishedRotation() to signal the log manager that potential
* postprocessors can now run.
*
* A writer implementation must override this method but it can just
* ignore calls if flushing doesn't align with its semantics. It
* still needs to call FinishedRotation() though.
*
* If the method returns false, it will be assumed that a fatal error
* has occured that prevents the writer from further operation; it
* will then be disabled and eventually deleted. When returning
* false, an implementation should also call Error() to indicate what
* happened.
*
* @param rotate_path Reflects the path to where the rotated output
* is to be moved, with specifics depending on the writer. It should
* generally be interpreted in a way consistent with that of \c path
* as passed into DoInit(). As an example, for file-based output, \c
* rotate_path could be the original filename extended with a
* timestamp indicating the time of the rotation.
*
* @param open The network time when the *current* file was opened.
*
* @param close The network time when the *current* file was closed.
*
* @param terminating Indicates whether the rotation request occurs
* due the main Bro prcoess terminating (and not because we've
* reached a regularly scheduled time for rotation).
*/
virtual bool DoRotate(string rotated_path, double open, double close,
bool terminating) = 0;
/**
* Writer-specific method implementing log output finalization at
* termination. Not called when any of the other methods has
* previously signaled an error, i.e., executing this method signals
* a regular shutdown of the writer.
*
* A writer implementation must override this method but it can just
* ignore calls if flushing doesn't align with its semantics.
*
* If the method returns false, it will be assumed that a fatal error
* has occured that prevents the writer from further operation; it
* will then be disabled and eventually deleted. When returning
* false, an implementation should also call Error() to indicate what
* happened.
*/
virtual bool DoFinish() = 0;
/**
* Triggered by regular heartbeat messages from the main thread.
*
* This method can be overridden but once must call
* WriterBackend::DoHeartbeat().
*/
virtual bool DoHeartbeat(double network_time, double current_time);
private:
/**
* Deletes the values as passed into Write().
*/
void DeleteVals(int num_writes, threading::Value*** vals);
// Frontend that instantiated us. This object must not be access from
// this class, it's running in a different thread!
WriterFrontend* frontend;
string path; // Log path.
int num_fields; // Number of log fields.
const threading::Field* const* fields; // Log fields.
bool buffering; // True if buffering is enabled.
};
}
#endif

View file

@ -0,0 +1,279 @@
#include "Net.h"
#include "threading/SerialTypes.h"
#include "WriterFrontend.h"
#include "WriterBackend.h"
using threading::Value;
using threading::Field;
namespace logging {
// Messages sent from frontend to backend (i.e., "InputMessages").
class InitMessage : public threading::InputMessage<WriterBackend>
{
public:
InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const* fields)
: threading::InputMessage<WriterBackend>("Init", backend),
path(path), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->Init(path, num_fields, fields); }
private:
const string path;
const int num_fields;
const Field * const* fields;
};
class RotateMessage : public threading::InputMessage<WriterBackend>
{
public:
RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open,
const double close, const bool terminating)
: threading::InputMessage<WriterBackend>("Rotate", backend),
frontend(frontend),
rotated_path(rotated_path), open(open),
close(close), terminating(terminating) { }
virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); }
private:
WriterFrontend* frontend;
const string rotated_path;
const double open;
const double close;
const bool terminating;
};
class WriteMessage : public threading::InputMessage<WriterBackend>
{
public:
WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals)
: threading::InputMessage<WriterBackend>("Write", backend),
num_fields(num_fields), num_writes(num_writes), vals(vals) {}
virtual bool Process() { return Object()->Write(num_fields, num_writes, vals); }
private:
int num_fields;
int num_writes;
Value ***vals;
};
class SetBufMessage : public threading::InputMessage<WriterBackend>
{
public:
SetBufMessage(WriterBackend* backend, const bool enabled)
: threading::InputMessage<WriterBackend>("SetBuf", backend),
enabled(enabled) { }
virtual bool Process() { return Object()->SetBuf(enabled); }
private:
const bool enabled;
};
class FlushMessage : public threading::InputMessage<WriterBackend>
{
public:
FlushMessage(WriterBackend* backend)
: threading::InputMessage<WriterBackend>("Flush", backend) {}
virtual bool Process() { return Object()->Flush(); }
};
class FinishMessage : public threading::InputMessage<WriterBackend>
{
public:
FinishMessage(WriterBackend* backend)
: threading::InputMessage<WriterBackend>("Finish", backend) {}
virtual bool Process() { return Object()->Finish(); }
};
}
// Frontend methods.
using namespace logging;
WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
{
stream = arg_stream;
writer = arg_writer;
Ref(stream);
Ref(writer);
disabled = initialized = false;
buf = true;
local = arg_local;
remote = arg_remote;
write_buffer = 0;
write_buffer_pos = 0;
ty_name = "<not set>";
if ( local )
{
backend = log_mgr->CreateBackend(this, writer->AsEnum());
assert(backend);
backend->Start();
}
else
backend = 0;
}
WriterFrontend::~WriterFrontend()
{
Unref(stream);
Unref(writer);
}
string WriterFrontend::Name() const
{
if ( path.size() )
return ty_name;
return ty_name + "/" + path;
}
void WriterFrontend::Stop()
{
FlushWriteBuffer();
SetDisable();
if ( backend )
backend->Stop();
}
void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields)
{
if ( disabled )
return;
if ( initialized )
reporter->InternalError("writer initialize twice");
path = arg_path;
num_fields = arg_num_fields;
fields = arg_fields;
initialized = true;
if ( backend )
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
if ( remote )
remote_serializer->SendLogCreateWriter(stream,
writer,
arg_path,
arg_num_fields,
arg_fields);
}
void WriterFrontend::Write(int num_fields, Value** vals)
{
if ( disabled )
return;
if ( remote )
remote_serializer->SendLogWrite(stream,
writer,
path,
num_fields,
vals);
if ( ! backend )
{
DeleteVals(vals);
return;
}
if ( ! write_buffer )
{
// Need new buffer.
write_buffer = new Value**[WRITER_BUFFER_SIZE];
write_buffer_pos = 0;
}
write_buffer[write_buffer_pos++] = vals;
if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || terminating )
// Buffer full (or no bufferin desired or termiating).
FlushWriteBuffer();
}
void WriterFrontend::FlushWriteBuffer()
{
if ( ! write_buffer_pos )
// Nothing to do.
return;
if ( backend )
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer));
// Clear buffer (no delete, we pass ownership to child thread.)
write_buffer = 0;
write_buffer_pos = 0;
}
void WriterFrontend::SetBuf(bool enabled)
{
if ( disabled )
return;
buf = enabled;
if ( backend )
backend->SendIn(new SetBufMessage(backend, enabled));
if ( ! buf )
// Make sure no longer buffer any still queued data.
FlushWriteBuffer();
}
void WriterFrontend::Flush()
{
if ( disabled )
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new FlushMessage(backend));
}
void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating)
{
if ( disabled )
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
}
void WriterFrontend::Finish()
{
if ( disabled )
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new FinishMessage(backend));
}
void WriterFrontend::DeleteVals(Value** vals)
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}

View file

@ -0,0 +1,222 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef LOGGING_WRITERFRONTEND_H
#define LOGGING_WRITERFRONTEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
namespace logging {
class WriterBackend;
/**
* Bridge class between the logging::Manager and backend writer threads. The
* Manager instantiates one \a WriterFrontend for each open logging filter.
* Each frontend in turns instantiates a WriterBackend-derived class
* internally that's specific to the particular output format. That backend
* runs in a new thread, and it receives messages from the frontend that
* correspond to method called by the manager.
*
*/
class WriterFrontend {
public:
/**
* Constructor.
*
* stream: The logging stream.
*
* writer: The backend writer type, with the value corresponding to the
* script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The
* frontend will internally instantiate a WriterBackend of the
* corresponding type.
*
* local: If true, the writer will instantiate a local backend.
*
* remote: If true, the writer will forward all data to remote
* clients.
*
* Frontends must only be instantiated by the main thread.
*/
WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote);
/**
* Destructor.
*
* Frontends must only be destroyed by the main thread.
*/
virtual ~WriterFrontend();
/**
* Stops all output to this writer. Calling this methods disables all
* message forwarding to the backend and stops the backend thread.
*
* This method must only be called from the main thread.
*/
void Stop();
/**
* Initializes the writer.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* See WriterBackend::Init() for arguments. The method takes
* ownership of \a fields.
*
* This method must only be called from the main thread.
*/
void Init(string path, int num_fields, const threading::Field* const* fields);
/**
* Write out a record.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* As an optimization, if buffering is enabled (which is the default)
* this method may buffer several writes and send them over to the
* backend in bulk with a single message. An explicit bulk write of
* all currently buffered data can be triggered with
* FlushWriteBuffer(). The backend writer triggers this with a
* message at every heartbeat.
*
* See WriterBackend::Writer() for arguments (except that this method
* takes only a single record, not an array). The method takes
* ownership of \a vals.
*
* This method must only be called from the main thread.
*/
void Write(int num_fields, threading::Value** vals);
/**
* Sets the buffering state.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* See WriterBackend::SetBuf() for arguments.
*
* This method must only be called from the main thread.
*/
void SetBuf(bool enabled);
/**
* Flushes the output.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. In addition, it also triggers
* FlushWriteBuffer(). If the backend method fails, it sends a
* message back that will asynchronously call Disable().
*
* This method must only be called from the main thread.
*/
void Flush();
/**
* Triggers log rotation.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* See WriterBackend::Rotate() for arguments.
*
* This method must only be called from the main thread.
*/
void Rotate(string rotated_path, double open, double close, bool terminating);
/**
* Finalizes writing to this tream.
*
* This method generates a message to the backend writer and triggers
* the corresponding message there. If the backend method fails, it
* sends a message back that will asynchronously call Disable().
*
* This method must only be called from the main thread.
*/
void Finish();
/**
* Explicitly triggers a transfer of all potentially buffered Write()
* operations over to the backend.
*
* This method must only be called from the main thread.
*/
void FlushWriteBuffer();
/**
* Disables the writer frontend. From now on, all method calls that
* would normally send message over to the backend, turn into no-ops.
* Note though that it does not stop the backend itself, use Stop()
* to do thast as well (this method is primarily for use as callback
* when the backend wants to disable the frontend).
*
* Disabled frontend will eventually be discarded by the
* logging::Manager.
*
* This method must only be called from the main thread.
*/
void SetDisable() { disabled = true; }
/**
* Returns true if the writer frontend has been disabled with SetDisable().
*/
bool Disabled() { return disabled; }
/**
* Returns the log path as passed into the constructor.
*/
const string Path() const { return path; }
/**
* Returns the number of log fields as passed into the constructor.
*/
int NumFields() const { return num_fields; }
/**
* Returns a descriptive name for the writer, including the type of
* the backend and the path used.
*
* This method is safe to call from any thread.
*/
string Name() const;
/**
* Returns the log fields as passed into the constructor.
*/
const threading::Field* const * Fields() const { return fields; }
protected:
friend class Manager;
void DeleteVals(threading::Value** vals);
EnumVal* stream;
EnumVal* writer;
WriterBackend* backend; // The backend we have instanatiated.
bool disabled; // True if disabled.
bool initialized; // True if initialized.
bool buf; // True if buffering is enabled (default).
bool local; // True if logging locally.
bool remote; // True if loggin remotely.
string ty_name; // Name of the backend type. Set by the manager.
string path; // The log path.
int num_fields; // The number of log fields.
const threading::Field* const* fields; // The log fields.
// Buffer for bulk writes.
static const int WRITER_BUFFER_SIZE = 1000;
int write_buffer_pos; // Position of next write in buffer.
threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE.
};
}
#endif

View file

@ -0,0 +1,356 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include <string>
#include <errno.h>
#include "NetVar.h"
#include "threading/SerialTypes.h"
#include "Ascii.h"
using namespace logging;
using namespace writer;
using threading::Value;
using threading::Field;
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
{
file = 0;
output_to_stdout = BifConst::LogAscii::output_to_stdout;
include_header = BifConst::LogAscii::include_header;
separator_len = BifConst::LogAscii::separator->Len();
separator = new char[separator_len];
memcpy(separator, BifConst::LogAscii::separator->Bytes(),
separator_len);
set_separator_len = BifConst::LogAscii::set_separator->Len();
set_separator = new char[set_separator_len];
memcpy(set_separator, BifConst::LogAscii::set_separator->Bytes(),
set_separator_len);
empty_field_len = BifConst::LogAscii::empty_field->Len();
empty_field = new char[empty_field_len];
memcpy(empty_field, BifConst::LogAscii::empty_field->Bytes(),
empty_field_len);
unset_field_len = BifConst::LogAscii::unset_field->Len();
unset_field = new char[unset_field_len];
memcpy(unset_field, BifConst::LogAscii::unset_field->Bytes(),
unset_field_len);
header_prefix_len = BifConst::LogAscii::header_prefix->Len();
header_prefix = new char[header_prefix_len];
memcpy(header_prefix, BifConst::LogAscii::header_prefix->Bytes(),
header_prefix_len);
desc.EnableEscaping();
desc.AddEscapeSequence(separator, separator_len);
}
Ascii::~Ascii()
{
if ( file )
fclose(file);
delete [] separator;
delete [] set_separator;
delete [] empty_field;
delete [] unset_field;
delete [] header_prefix;
}
bool Ascii::WriteHeaderField(const string& key, const string& val)
{
string str = string(header_prefix, header_prefix_len) +
key + string(separator, separator_len) + val + "\n";
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
}
bool Ascii::DoInit(string path, int num_fields,
const Field* const * fields)
{
if ( output_to_stdout )
path = "/dev/stdout";
fname = IsSpecial(path) ? path : path + "." + LogExt();
if ( ! (file = fopen(fname.c_str(), "w")) )
{
Error(Fmt("cannot open %s: %s", fname.c_str(),
strerror(errno)));
return false;
}
if ( include_header )
{
string str = string(header_prefix, header_prefix_len)
+ "separator " // Always use space as separator here.
+ get_escaped_string(string(separator, separator_len), false)
+ "\n";
if( fwrite(str.c_str(), str.length(), 1, file) != 1 )
goto write_error;
if ( ! (WriteHeaderField("set_separator", get_escaped_string(
string(set_separator, set_separator_len), false)) &&
WriteHeaderField("empty_field", get_escaped_string(
string(empty_field, empty_field_len), false)) &&
WriteHeaderField("unset_field", get_escaped_string(
string(unset_field, unset_field_len), false)) &&
WriteHeaderField("path", get_escaped_string(path, false))) )
goto write_error;
string names;
string types;
for ( int i = 0; i < num_fields; ++i )
{
if ( i > 0 )
{
names += string(separator, separator_len);
types += string(separator, separator_len);
}
const Field* field = fields[i];
names += field->name;
types += type_name(field->type);
if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) )
{
types += "[";
types += type_name(field->subtype);
types += "]";
}
}
if ( ! (WriteHeaderField("fields", names)
&& WriteHeaderField("types", types)) )
goto write_error;
}
return true;
write_error:
Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno)));
return false;
}
bool Ascii::DoFlush()
{
fflush(file);
return true;
}
bool Ascii::DoFinish()
{
return true;
}
bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
{
if ( ! val->present )
{
desc->AddN(unset_field, unset_field_len);
return true;
}
switch ( val->type ) {
case TYPE_BOOL:
desc->Add(val->val.int_val ? "T" : "F");
break;
case TYPE_INT:
desc->Add(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
desc->Add(val->val.uint_val);
break;
case TYPE_PORT:
desc->Add(val->val.port_val.port);
break;
case TYPE_SUBNET:
desc->Add(Render(val->val.subnet_val));
break;
case TYPE_ADDR:
desc->Add(Render(val->val.addr_val));
break;
case TYPE_TIME:
case TYPE_INTERVAL:
char buf[256];
modp_dtoa(val->val.double_val, buf, 6);
desc->Add(buf);
break;
case TYPE_DOUBLE:
desc->Add(val->val.double_val);
break;
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
int size = val->val.string_val->size();
const char* data = val->val.string_val->data();
if ( ! size )
{
desc->AddN(empty_field, empty_field_len);
break;
}
if ( size == unset_field_len && memcmp(data, unset_field, size) == 0 )
{
// The value we'd write out would match exactly the
// place-holder we use for unset optional fields. We
// escape the first character so that the output
// won't be ambigious.
static const char hex_chars[] = "0123456789abcdef";
char hex[6] = "\\x00";
hex[2] = hex_chars[((*data) & 0xf0) >> 4];
hex[3] = hex_chars[(*data) & 0x0f];
desc->AddRaw(hex, 4);
++data;
--size;
}
if ( size )
desc->AddN(data, size);
break;
}
case TYPE_TABLE:
{
if ( ! val->val.set_val.size )
{
desc->AddN(empty_field, empty_field_len);
break;
}
desc->AddEscapeSequence(set_separator, set_separator_len);
for ( int j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(set_separator, set_separator_len);
if ( ! DoWriteOne(desc, val->val.set_val.vals[j], field) )
{
desc->RemoveEscapeSequence(set_separator, set_separator_len);
return false;
}
}
desc->RemoveEscapeSequence(set_separator, set_separator_len);
break;
}
case TYPE_VECTOR:
{
if ( ! val->val.vector_val.size )
{
desc->AddN(empty_field, empty_field_len);
break;
}
desc->AddEscapeSequence(set_separator, set_separator_len);
for ( int j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(set_separator, set_separator_len);
if ( ! DoWriteOne(desc, val->val.vector_val.vals[j], field) )
{
desc->RemoveEscapeSequence(set_separator, set_separator_len);
return false;
}
}
desc->RemoveEscapeSequence(set_separator, set_separator_len);
break;
}
default:
Error(Fmt("unsupported field format %d for %s", val->type,
field->name.c_str()));
return false;
}
return true;
}
bool Ascii::DoWrite(int num_fields, const Field* const * fields,
Value** vals)
{
if ( ! file )
DoInit(Path(), NumFields(), Fields());
desc.Clear();
for ( int i = 0; i < num_fields; i++ )
{
if ( i > 0 )
desc.AddRaw(separator, separator_len);
if ( ! DoWriteOne(&desc, vals[i], fields[i]) )
return false;
}
desc.AddRaw("\n", 1);
if ( fwrite(desc.Bytes(), desc.Len(), 1, file) != 1 )
{
Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno)));
return false;
}
if ( IsBuf() )
fflush(file);
return true;
}
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
{
// Don't rotate special files or if there's not one currently open.
if ( ! file || IsSpecial(Path()) )
return true;
fclose(file);
file = 0;
string nname = rotated_path + "." + LogExt();
rename(fname.c_str(), nname.c_str());
if ( ! FinishedRotation(nname, fname, open, close, terminating) )
{
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false;
}
return true;
}
bool Ascii::DoSetBuf(bool enabled)
{
// Nothing to do.
return true;
}
string Ascii::LogExt()
{
const char* ext = getenv("BRO_LOG_SUFFIX");
if ( ! ext ) ext = "log";
return ext;
}

View file

@ -0,0 +1,65 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Log writer for delimiter-separated ASCII logs.
#ifndef LOGGING_WRITER_ASCII_H
#define LOGGING_WRITER_ASCII_H
#include "../WriterBackend.h"
namespace logging { namespace writer {
class Ascii : public WriterBackend {
public:
Ascii(WriterFrontend* frontend);
~Ascii();
static WriterBackend* Instantiate(WriterFrontend* frontend)
{ return new Ascii(frontend); }
static string LogExt();
protected:
virtual bool DoInit(string path, int num_fields,
const threading::Field* const* fields);
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals);
virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(string rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush();
virtual bool DoFinish();
private:
bool IsSpecial(string path) { return path.find("/dev/") == 0; }
bool DoWriteOne(ODesc* desc, threading::Value* val, const threading::Field* field);
bool WriteHeaderField(const string& key, const string& value);
FILE* file;
string fname;
ODesc desc;
// Options set from the script-level.
bool output_to_stdout;
bool include_header;
char* separator;
int separator_len;
char* set_separator;
int set_separator_len;
char* empty_field;
int empty_field_len;
char* unset_field;
int unset_field_len;
char* header_prefix;
int header_prefix_len;
};
}
}
#endif

View file

@ -0,0 +1,18 @@
#include "None.h"
using namespace logging;
using namespace writer;
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
{
if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating))
{
Error(Fmt("error rotating %s", Path().c_str()));
return false;
}
return true;
}

View file

@ -0,0 +1,36 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Dummy log writer that just discards everything (but still pretends to rotate).
#ifndef LOGGING_WRITER_NONE_H
#define LOGGING_WRITER_NONE_H
#include "../WriterBackend.h"
namespace logging { namespace writer {
class None : public WriterBackend {
public:
None(WriterFrontend* frontend) : WriterBackend(frontend) {}
~None() {};
static WriterBackend* Instantiate(WriterFrontend* frontend)
{ return new None(frontend); }
protected:
virtual bool DoInit(string path, int num_fields,
const threading::Field* const * fields) { return true; }
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals) { return true; }
virtual bool DoSetBuf(bool enabled) { return true; }
virtual bool DoRotate(string rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush() { return true; }
virtual bool DoFinish() { return true; }
};
}
}
#endif