Internal refactoring to provide injection points for remotely received

log records.

Also added some additional type-safety check to later make sure that
what a peer sends is actually of the expected format.
This commit is contained in:
Robin Sommer 2011-02-28 21:43:48 -08:00
parent e6edc52d54
commit ba2c6f6139
4 changed files with 142 additions and 75 deletions

View file

@ -7,17 +7,17 @@
#include "LogWriterAscii.h" #include "LogWriterAscii.h"
struct LogWriterDefinition { struct LogWriterDefinition {
LogWriterType::Type type; // The type. bro_int_t type; // The type.
const char *name; // Descriptive name for error messages. const char *name; // Descriptive name for error messages.
bool (*init)(); // An optional one-time initialization function. bool (*init)(); // An optional one-time initialization function.
LogWriter* (*factory)(); // A factory function creating instances. LogWriter* (*factory)(); // A factory function creating instances.
}; };
LogWriterDefinition log_writers[] = { LogWriterDefinition log_writers[] = {
{ LogWriterType::Ascii, "Ascii", 0, LogWriterAscii::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, LogWriterAscii::Instantiate },
// End marker. // End marker.
{ LogWriterType::None, "None", 0, (LogWriter* (*)())0 } { BifEnum::Log::WRITER_DEFAULT, "None", 0, (LogWriter* (*)())0 }
}; };
struct LogMgr::Filter { struct LogMgr::Filter {
@ -26,7 +26,7 @@ struct LogMgr::Filter {
Func* path_func; Func* path_func;
string path; string path;
Val* path_val; Val* path_val;
LogWriterDefinition* writer; EnumVal* writer;
int num_fields; int num_fields;
LogField** fields; LogField** fields;
@ -36,14 +36,14 @@ struct LogMgr::Filter {
}; };
struct LogMgr::Stream { struct LogMgr::Stream {
int id; EnumVal* id;
string name; string name;
RecordType* columns; RecordType* columns;
EventHandlerPtr event; EventHandlerPtr event;
list<Filter*> filters; list<Filter*> filters;
typedef pair<int, string> IdPathPair; typedef pair<int, string> WriterPathPair;
typedef map<IdPathPair, LogWriter *> WriterMap; typedef map<WriterPathPair, LogWriter *> WriterMap;
WriterMap writers; // Writers indexed by id/path pair. WriterMap writers; // Writers indexed by id/path pair.
~Stream(); ~Stream();
@ -94,7 +94,7 @@ LogMgr::Stream* LogMgr::FindStream(EnumVal* id)
void LogMgr::RemoveDisabledWriters(Stream* stream) void LogMgr::RemoveDisabledWriters(Stream* stream)
{ {
list<Stream::IdPathPair> disabled; list<Stream::WriterPathPair> disabled;
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ ) for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{ {
@ -105,7 +105,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
} }
} }
for ( list<Stream::IdPathPair>::iterator j = disabled.begin(); j != disabled.end(); j++ ) for ( list<Stream::WriterPathPair>::iterator j = disabled.begin(); j != disabled.end(); j++ )
stream->writers.erase(*j); stream->writers.erase(*j);
} }
@ -164,11 +164,10 @@ bool LogMgr::CreateStream(EnumVal* id, RecordVal* sval)
// Create new stream. // Create new stream.
streams[idx] = new Stream; streams[idx] = new Stream;
streams[idx]->id = id->AsEnum(); streams[idx]->id = id->Ref()->AsEnumVal();
streams[idx]->name = id->Type()->AsEnumType()->Lookup(idx); streams[idx]->name = id->Type()->AsEnumType()->Lookup(idx);
streams[idx]->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; streams[idx]->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
streams[idx]->columns = columns; streams[idx]->columns = columns->Ref()->AsRecordType();
columns->Ref();
DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : "<none>"); DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s", streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : "<none>");
@ -270,32 +269,6 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
else else
writer = writer_val->AsEnum(); writer = writer_val->AsEnum();
LogWriterDefinition* ld;
for ( ld = log_writers; ld->type != LogWriterType::None; ++ld )
{
if ( ld->type == writer )
break;
}
if ( ld->type == LogWriterType::None )
internal_error("unknow writer in add_filter");
if ( ! ld->factory )
// Oops, we can't instantuate this guy.
return true; // Count as success, as we will have reported it earlier already.
// If the writer has an init function, call it.
if ( ld->init )
{
if ( (*ld->init)() )
// Clear the init function so that we won't call it again later.
ld->init = 0;
else
// Init failed, disable by deleting factory function.
ld->factory = 0;
return false;
}
// Create a new Filter instance. // Create a new Filter instance.
Val* pred = fval->Lookup(rtype->FieldOffset("pred")); Val* pred = fval->Lookup(rtype->FieldOffset("pred"));
@ -305,7 +278,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
filter->name = fval->Lookup(rtype->FieldOffset("name"))->AsString()->CheckString(); filter->name = fval->Lookup(rtype->FieldOffset("name"))->AsString()->CheckString();
filter->pred = pred ? pred->AsFunc() : 0; filter->pred = pred ? pred->AsFunc() : 0;
filter->path_func = path_func ? path_func->AsFunc() : 0; filter->path_func = path_func ? path_func->AsFunc() : 0;
filter->writer = ld; filter->writer = id->Ref()->AsEnumVal();
// TODO: Check that the predciate is of the right type. // TODO: Check that the predciate is of the right type.
@ -349,7 +322,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Created new filter '%s' for stream '%s'", filter->name.c_str(), stream->name.c_str()); DBG_LOG(DBG_LOGGING, "Created new filter '%s' for stream '%s'", filter->name.c_str(), stream->name.c_str());
DBG_LOG(DBG_LOGGING, " writer : %s", ld->name); DBG_LOG(DBG_LOGGING, " writer : %d", filter->writer);
DBG_LOG(DBG_LOGGING, " path : %s", filter->path.c_str()); DBG_LOG(DBG_LOGGING, " path : %s", filter->path.c_str());
DBG_LOG(DBG_LOGGING, " path_func : %s", (filter->path_func ? "set" : "not set")); DBG_LOG(DBG_LOGGING, " path_func : %s", (filter->path_func ? "set" : "not set"));
DBG_LOG(DBG_LOGGING, " pred : %s", (filter->pred ? "set" : "not set")); DBG_LOG(DBG_LOGGING, " pred : %s", (filter->pred ? "set" : "not set"));
@ -454,14 +427,16 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
} }
// See if we already have a writer for this path. // See if we already have a writer for this path.
Stream::WriterMap::iterator w = stream->writers.find(Stream::IdPathPair(stream->id, path)); Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path));
LogWriter* writer = 0; LogWriter* writer = 0;
if ( w == stream->writers.end() )
if ( w != stream->writers.end() )
// We have a writer already.
writer = w->second;
else
{ {
// No, need to create one. // No, need to create one.
assert(filter->writer->factory);
writer = (*filter->writer->factory)();
// Copy the fields for LogWriter::Init() as it will take // Copy the fields for LogWriter::Init() as it will take
// ownership. // ownership.
@ -469,22 +444,18 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j ) for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new LogField(*filter->fields[j]); arg_fields[j] = new LogField(*filter->fields[j]);
if ( ! writer->Init(path, filter->num_fields, arg_fields) ) writer = CreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields);
if ( ! writer )
{ {
Unref(columns); Unref(columns);
return false; return false;
} }
stream->writers.insert(Stream::WriterMap::value_type(Stream::IdPathPair(stream->id, path), writer));
} }
else
// We have a writer already.
writer = w->second;
// Alright, can do the write now. // Alright, can do the write now.
LogVal** vals = RecordToFilterVals(filter, columns); LogVal** vals = RecordToFilterVals(filter, columns);
if ( ! writer->Write(vals) ) if ( ! writer->Write(filter->num_fields, vals) )
error = true; error = true;
#ifdef DEBUG #ifdef DEBUG
@ -506,6 +477,7 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
for ( int i = 0; i < filter->num_fields; ++i ) for ( int i = 0; i < filter->num_fields; ++i )
{ {
TypeTag type = TYPE_ERROR;
Val* val = columns; Val* val = columns;
// For each field, first find the right value, which can potentially // For each field, first find the right value, which can potentially
@ -514,12 +486,13 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j ) for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j )
{ {
type = val->Type()->AsRecordType()->FieldType(*j)->Tag();
val = val->AsRecordVal()->Lookup(*j); val = val->AsRecordVal()->Lookup(*j);
if ( ! val ) if ( ! val )
{ {
// Value, or any of its parents, is not set. // Value, or any of its parents, is not set.
vals[i] = new LogVal(false); vals[i] = new LogVal(type, false);
break; break;
} }
} }
@ -531,26 +504,26 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
case TYPE_ENUM: case TYPE_ENUM:
vals[i] = new LogVal(); vals[i] = new LogVal(type);
vals[i]->val.int_val = val->InternalInt(); vals[i]->val.int_val = val->InternalInt();
break; break;
case TYPE_COUNT: case TYPE_COUNT:
case TYPE_COUNTER: case TYPE_COUNTER:
case TYPE_PORT: case TYPE_PORT:
vals[i] = new LogVal(); vals[i] = new LogVal(type);
vals[i]->val.uint_val = val->InternalUnsigned(); vals[i]->val.uint_val = val->InternalUnsigned();
break; break;
case TYPE_SUBNET: case TYPE_SUBNET:
vals[i] = new LogVal(); vals[i] = new LogVal(type);
vals[i]->val.subnet_val = *val->AsSubNet(); vals[i]->val.subnet_val = *val->AsSubNet();
break; break;
case TYPE_NET: case TYPE_NET:
case TYPE_ADDR: case TYPE_ADDR:
{ {
vals[i] = new LogVal(); vals[i] = new LogVal(type);
addr_type t = val->AsAddr(); addr_type t = val->AsAddr();
copy_addr(&t, &vals[i]->val.addr_val); copy_addr(&t, &vals[i]->val.addr_val);
break; break;
@ -559,7 +532,7 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
case TYPE_DOUBLE: case TYPE_DOUBLE:
case TYPE_TIME: case TYPE_TIME:
case TYPE_INTERVAL: case TYPE_INTERVAL:
vals[i] = new LogVal(); vals[i] = new LogVal(type);
vals[i]->val.double_val = val->InternalDouble(); vals[i]->val.double_val = val->InternalDouble();
break; break;
@ -567,7 +540,7 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
{ {
const BroString* s = val->AsString(); const BroString* s = val->AsString();
LogVal* lval = (LogVal*) new char[sizeof(LogVal) + sizeof(log_string_type) + s->Len()]; LogVal* lval = (LogVal*) new char[sizeof(LogVal) + sizeof(log_string_type) + s->Len()];
new (lval) LogVal(); // Run ctor. new (lval) LogVal(type); // Run ctor.
lval->val.string_val.len = s->Len(); lval->val.string_val.len = s->Len();
memcpy(&lval->val.string_val.string, s->Bytes(), s->Len()); memcpy(&lval->val.string_val.string, s->Bytes(), s->Len());
vals[i] = lval; vals[i] = lval;
@ -582,6 +555,90 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
return vals; return vals;
} }
LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields)
{
Stream* stream = FindStream(id);
if ( ! stream )
// Don't know this stream.
return false;
Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just return
// it.
return w->second;
// Need to instantiate a new writer.
LogWriterDefinition* ld = log_writers;
while ( true )
{
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
{
run_time("unknow writer when creating writer");
return 0;
}
if ( ld->type == writer->AsEnum() )
break;
if ( ! ld->factory )
// Oops, we can't instantuate this guy.
return 0;
// If the writer has an init function, call it.
if ( ld->init )
{
if ( (*ld->init)() )
// Clear the init function so that we won't call it again later.
ld->init = 0;
else
// Init failed, disable by deleting factory function.
ld->factory = 0;
return false;
}
++ld;
}
assert(ld->factory);
LogWriter* writer_obj = (*ld->factory)();
if ( ! writer_obj->Init(path, num_fields, fields) )
return 0;
stream->writers.insert(Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path), writer_obj));
return writer_obj;
}
bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals)
{
Stream* stream = FindStream(id);
if ( ! stream )
// Don't know this stream.
return false;
Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w == stream->writers.end() )
// Don't know this writer.
return false;
bool success = w->second->Write(num_fields, vals);
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to '%s' on stream '%s'", path.c_str(), stream->name.c_str());
#endif
return success;
}
bool LogMgr::SetBuf(EnumVal* id, bool enabled) bool LogMgr::SetBuf(EnumVal* id, bool enabled)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);

View file

@ -7,14 +7,6 @@
#include "Val.h" #include "Val.h"
#include "EventHandler.h" #include "EventHandler.h"
// One value per writer type we have.
namespace LogWriterType {
enum Type {
None,
Ascii
};
};
struct LogField { struct LogField {
LogField() { } LogField() { }
LogField(const LogField& other) : name(other.name), type(other.type) { } LogField(const LogField& other) : name(other.name), type(other.type) { }
@ -30,8 +22,9 @@ struct log_string_type {
// All values that can be directly logged by a Writer. // All values that can be directly logged by a Writer.
struct LogVal { struct LogVal {
LogVal(bool arg_present = true) : present(arg_present) {} LogVal(TypeTag arg_type, bool arg_present = true) : type(arg_type), present(arg_present) {}
TypeTag type;
bool present; // If false, the field is unset (i.e., &optional and not initialzed). bool present; // If false, the field is unset (i.e., &optional and not initialzed).
// The following union is a subset of BroValUnion, including only the // The following union is a subset of BroValUnion, including only the
@ -47,6 +40,7 @@ struct LogVal {
}; };
class LogWriter; class LogWriter;
class RemoteSerializer;
class LogMgr { class LogMgr {
public: public:
@ -64,6 +58,12 @@ public:
protected: protected:
friend class LogWriter; friend class LogWriter;
friend class RemoteSerializer;
// These function are also used by the RemoteSerializer to inject
// received logs.
LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields);
bool Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals);
/// Functions also used by the writers. /// Functions also used by the writers.

View file

@ -33,8 +33,17 @@ bool LogWriter::Init(string arg_path, int arg_num_fields, LogField** arg_fields)
return true; return true;
} }
bool LogWriter::Write(LogVal** vals) bool LogWriter::Write(int arg_num_fields, LogVal** vals)
{ {
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
if ( num_fields != arg_num_fields )
return false;
for ( int i = 0; i < num_fields; ++i )
if ( vals[i]->type != fields[i]->type )
return false;
bool result = DoWrite(num_fields, fields, vals); bool result = DoWrite(num_fields, fields, vals);
DeleteVals(vals); DeleteVals(vals);

View file

@ -25,11 +25,12 @@ public:
// when done. // when done.
bool Init(string path, int num_fields, LogField** fields); bool Init(string path, int num_fields, LogField** fields);
// Writes one log entry. The method takes ownership of "vals" and will // Writes one log entry. The method takes ownership of "vals" and will
// return immediately after queueing the write request, potentially // return immediately after queueing the write request, potentially
// before the output has actually taken place. Returns false if an error // before the output has actually taken place. Returns false if an error
// occured, in which case the writer must not be used further. // occured, in which case the writer must not be used further. num_fields
bool Write(LogVal** vals); // and types must match what was passed to Init().
bool Write(int num_fields, LogVal** vals);
// Sets the buffering status for the writer, if the writer supports it. // Sets the buffering status for the writer, if the writer supports it.
bool SetBuf(bool enabled); bool SetBuf(bool enabled);