cleanup, more sanity tests, a little bit more documentation

This commit is contained in:
Bernhard Amann 2012-03-11 20:43:26 -07:00
parent faf5c95752
commit 92555badd4
4 changed files with 110 additions and 82 deletions

View file

@ -34,9 +34,11 @@ very similar to the abstracts used in the logging framework:
Readers Readers
A reader defines the input format for the specific input stream. A reader defines the input format for the specific input stream.
At the moment, Bro comes with only one type of reader, which can At the moment, Bro comes with two types of reader. The default reader is READER_ASCII,
read the tab seperated ASCII logfiles that were generated by the which can read the tab seperated ASCII logfiles that were generated by the
logging framework. logging framework.
READER_RAW can files containing records separated by a character(like e.g. newline) and send
one event per line.
Basics Basics
@ -69,6 +71,20 @@ The fields that can be set when creating a stream are:
``reader`` ``reader``
The reader used for this stream. Default is ``READER_ASCII``. The reader used for this stream. Default is ``READER_ASCII``.
``mode``
The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``.
Default is ``MANUAL``.
``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not
be reflected in the data bro knows.
``REREAD`` means that the whole file is read again each time a change is found. This should be used for
files that are mapped to a table where individual lines can change.
``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new
data is added to the file.
``autostart``
If set to yes, the first update operation is triggered automatically after the first filter has been added to the stream.
This has to be set to false if several filters are added to the input source.
In this case Input::force_update has to be called manually once after all filters have been added.
Filters Filters
======= =======
@ -101,9 +117,6 @@ could be defined as follows:
... ...
Input::add_eventfilter(Foo::INPUT, [$name="input", $fields=Val, $ev=line]); Input::add_eventfilter(Foo::INPUT, [$name="input", $fields=Val, $ev=line]);
# read the file after all filters have been set
Input::force_update(Foo::INPUT);
} }
The fields that can be set for an event filter are: The fields that can be set for an event filter are:
@ -156,7 +169,7 @@ an approach similar to this:
Input::add_tablefilter(Foo::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=conn_attempts]); Input::add_tablefilter(Foo::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=conn_attempts]);
# read the file after all filters have been set # read the file after all filters have been set (only needed if autostart is set to false)
Input::force_update(Foo::INPUT); Input::force_update(Foo::INPUT);
} }

View file

@ -182,9 +182,6 @@ function read_table(description: Input::StreamDescription, filter: Input::TableF
if ( ok ) { if ( ok ) {
ok = add_tablefilter(id, filter); ok = add_tablefilter(id, filter);
} }
if ( ok ) {
ok = force_update(id);
}
if ( ok ) { if ( ok ) {
ok = remove_stream(id); ok = remove_stream(id);
} else { } else {

View file

@ -327,7 +327,6 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) {
return false; return false;
} }
Field** logf = new Field*[fieldsV.size()]; Field** logf = new Field*[fieldsV.size()];
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
logf[i] = fieldsV[i]; logf[i] = fieldsV[i];
@ -380,6 +379,30 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
} }
TableVal *dst = fval->LookupWithDefault(rtype->FieldOffset("destination"))->AsTableVal(); TableVal *dst = fval->LookupWithDefault(rtype->FieldOffset("destination"))->AsTableVal();
// check if index fields match tabla description
{
int num = idx->NumFields();
const type_list* tl = dst->Type()->AsTableType()->IndexTypes();
loop_over_list(*tl, j)
{
if ( j >= num ) {
reporter->Error("Table type has more indexes than index definition");
return false;
}
if ( !same_type(idx->FieldType(j), (*tl)[j]) ) {
reporter->Error("Table type does not match index type");
return false;
}
}
if ( num != j ) {
reporter->Error("Table has less elements than index definition");
return false;
}
}
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
Val* event_val = fval->LookupWithDefault(rtype->FieldOffset("ev")); Val* event_val = fval->LookupWithDefault(rtype->FieldOffset("ev"));
@ -571,7 +594,6 @@ bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) {
reporter->Error("Stream not found in RemoveStreamContinuation"); reporter->Error("Stream not found in RemoveStreamContinuation");
return false; return false;
} }
bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) { bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) {
@ -738,7 +760,6 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu
idxval = l; idxval = l;
} }
//reporter->Error("Position: %d, num_fields: %d", position, num_fields);
assert ( position == num_fields ); assert ( position == num_fields );
return idxval; return idxval;
@ -771,8 +792,6 @@ void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals
delete vals[i]; delete vals[i];
} }
delete [] vals; delete [] vals;
} }
int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) { int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) {
@ -847,15 +866,13 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
} }
val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. bool result;
vl.append(ev); if ( filter->num_val_fields > 0 ) { // we have values
vl.append(predidx); result = CallPred(filter->pred, 3, ev, predidx, valval);
if ( filter->num_val_fields > 0 ) } else {
vl.append(valval); // no values
result = CallPred(filter->pred, 2, ev, predidx);
Val* v = filter->pred->Call(&vl); }
bool result = v->AsBool();
Unref(v);
if ( result == false ) { if ( result == false ) {
if ( !updated ) { if ( !updated ) {
@ -968,13 +985,7 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
Ref(predidx); Ref(predidx);
Ref(val); Ref(val);
val_list vl(3); bool result = CallPred(filter->pred, 3, ev, predidx, val);
vl.append(ev);
vl.append(predidx);
vl.append(val);
Val* v = filter->pred->Call(&vl);
bool result = v->AsBool();
Unref(v);
if ( result == false ) { if ( result == false ) {
// Keep it. Hence - we quit and simply go to the next entry of lastDict // Keep it. Hence - we quit and simply go to the next entry of lastDict
@ -1038,7 +1049,6 @@ void Manager::Put(const ReaderFrontend* reader, int id, Value* *vals) {
} else { } else {
assert(false); assert(false);
} }
} }
int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) { int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) {
@ -1133,16 +1143,13 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
} }
val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. bool result;
vl.append(ev); if ( filter->num_val_fields > 0 ) { // we have values
vl.append(predidx); result = CallPred(filter->pred, 3, ev, predidx, valval);
if ( filter->num_val_fields > 0 ) } else {
vl.append(valval); // no values
result = CallPred(filter->pred, 2, ev, predidx);
}
Val* v = filter->pred->Call(&vl);
bool result = v->AsBool();
Unref(v);
if ( result == false ) { if ( result == false ) {
// do nothing // do nothing
@ -1154,7 +1161,6 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
} }
filter->tab->Assign(idxval, valval); filter->tab->Assign(idxval, valval);
if ( filter->event ) { if ( filter->event ) {
@ -1176,13 +1182,9 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
SendEvent(filter->event, 3, ev, predidx, valval); SendEvent(filter->event, 3, ev, predidx, valval);
} }
} }
} }
} else { } else {
// no predicates or other stuff // no predicates or other stuff
@ -1192,6 +1194,7 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
return filter->num_idx_fields + filter->num_val_fields; return filter->num_idx_fields + filter->num_val_fields;
} }
// Todo:: perhaps throw some kind of clear-event?
void Manager::Clear(const ReaderFrontend* reader, int id) { void Manager::Clear(const ReaderFrontend* reader, int id) {
ReaderInfo *i = FindReader(reader); ReaderInfo *i = FindReader(reader);
if ( i == 0 ) { if ( i == 0 ) {
@ -1207,6 +1210,7 @@ void Manager::Clear(const ReaderFrontend* reader, int id) {
filter->tab->RemoveAll(); filter->tab->RemoveAll();
} }
// put interface: delete old entry from table.
bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
ReaderInfo *i = FindReader(reader); ReaderInfo *i = FindReader(reader);
if ( i == 0 ) { if ( i == 0 ) {
@ -1235,13 +1239,7 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
int startpos = 0; int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
val_list vl(3); filterresult = CallPred(filter->pred, 3, ev, predidx, val);
vl.append(ev);
vl.append(predidx);
vl.append(val);
Val* v = filter->pred->Call(&vl);
filterresult = v->AsBool();
Unref(v);
if ( filterresult == false ) { if ( filterresult == false ) {
// keep it. // keep it.
@ -1285,6 +1283,26 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
return success; return success;
} }
bool Manager::CallPred(Func* pred_func, const int numvals, ...)
{
bool result;
val_list vl(numvals);
va_list lP;
va_start(lP, numvals);
for ( int i = 0; i < numvals; i++ )
{
vl.append( va_arg(lP, Val*) );
}
va_end(lP);
Val* v = pred_func->Call(&vl);
result = v->AsBool();
Unref(v);
return(result);
}
bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals)
{ {
EventHandler* handler = event_registry->Lookup(name.c_str()); EventHandler* handler = event_registry->Lookup(name.c_str());
@ -1341,8 +1359,15 @@ void Manager::SendEvent(EventHandlerPtr ev, list<Val*> events)
mgr.QueueEvent(ev, vl, SOURCE_LOCAL); mgr.QueueEvent(ev, vl, SOURCE_LOCAL);
} }
// Convert a bro list value to a bro record value. I / we could think about moving this functionality to val.cc
RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) { RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) {
assert(position != 0 ); // we need the pointer to point to data;
if ( request_type->Tag() != TYPE_RECORD ) {
reporter->InternalError("ListValToRecordVal called on non-record-value.");
return 0;
}
RecordVal* rec = new RecordVal(request_type->AsRecordType()); RecordVal* rec = new RecordVal(request_type->AsRecordType());
int maxpos = list->Length(); int maxpos = list->Length();
@ -1364,21 +1389,15 @@ RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type,
return rec; return rec;
} }
// Convert a threading value to a record value
RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) { RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) {
if ( position == 0 ) { assert(position != 0); // we need the pointer to point to data.
reporter->InternalError("Need position");
if ( request_type->Tag() != TYPE_RECORD ) {
reporter->InternalError("ValueToRecordVal called on non-record-value.");
return 0; return 0;
} }
/*
if ( request_type->Tag() != TYPE_RECORD ) {
reporter->InternalError("I only work with records");
return 0;
} */
RecordVal* rec = new RecordVal(request_type->AsRecordType()); RecordVal* rec = new RecordVal(request_type->AsRecordType());
for ( int i = 0; i < request_type->NumFields(); i++ ) { for ( int i = 0; i < request_type->NumFields(); i++ ) {
@ -1394,11 +1413,12 @@ RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *reque
} }
return rec; return rec;
} }
// Count the length of the values
// used to create a correct length buffer for hashing later
int Manager::GetValueLength(const Value* val) { int Manager::GetValueLength(const Value* val) {
assert( val->present ); // presence has to be checked elsewhere
int length = 0; int length = 0;
switch (val->type) { switch (val->type) {
@ -1485,19 +1505,20 @@ int Manager::GetValueLength(const Value* val) {
} }
// Given a threading::value, copy the raw data bytes into *data and return how many bytes were copied.
// Used for hashing the values for lookup in the bro table
int Manager::CopyValue(char *data, const int startpos, const Value* val) { int Manager::CopyValue(char *data, const int startpos, const Value* val) {
assert( val->present ); // presence has to be checked elsewhere
switch ( val->type ) { switch ( val->type ) {
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
//reporter->Error("Adding field content to pos %d: %lld", val->val.int_val, startpos);
memcpy(data+startpos, (const void*) &(val->val.int_val), sizeof(val->val.int_val)); memcpy(data+startpos, (const void*) &(val->val.int_val), sizeof(val->val.int_val));
//*(data+startpos) = val->val.int_val;
return sizeof(val->val.int_val); return sizeof(val->val.int_val);
break; break;
case TYPE_COUNT: case TYPE_COUNT:
case TYPE_COUNTER: case TYPE_COUNTER:
//*(data+startpos) = val->val.uint_val;
memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val)); memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val));
return sizeof(val->val.uint_val); return sizeof(val->val.uint_val);
break; break;
@ -1516,7 +1537,6 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) {
case TYPE_DOUBLE: case TYPE_DOUBLE:
case TYPE_TIME: case TYPE_TIME:
case TYPE_INTERVAL: case TYPE_INTERVAL:
//*(data+startpos) = val->val.double_val;
memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val)); memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val));
return sizeof(val->val.double_val); return sizeof(val->val.double_val);
break; break;
@ -1598,12 +1618,11 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) {
return 0; return 0;
} }
reporter->InternalError("internal error");
assert(false); assert(false);
return 0; return 0;
} }
// Hash num_elements threading values and return the HashKey for them. At least one of the vals has to be ->present.
HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) {
int length = 0; int length = 0;
@ -1633,10 +1652,9 @@ HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) {
assert(position == length); assert(position == length);
return new HashKey(data, length, key, true); return new HashKey(data, length, key, true);
} }
// convert threading value to Bro value
Val* Manager::ValueToVal(const Value* val, BroType* request_type) { Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) {
@ -1648,7 +1666,6 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
return 0; // unset field return 0; // unset field
} }
switch ( val->type ) { switch ( val->type ) {
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
@ -1760,8 +1777,7 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
reporter->InternalError("unsupported type for input_read"); reporter->InternalError("unsupported type for input_read");
} }
assert(false);
reporter->InternalError("Impossible error");
return NULL; return NULL;
} }
@ -1778,7 +1794,6 @@ Manager::ReaderInfo* Manager::FindReader(const ReaderFrontend* reader)
return 0; return 0;
} }
Manager::ReaderInfo* Manager::FindReader(const EnumVal* id) Manager::ReaderInfo* Manager::FindReader(const EnumVal* id)
{ {
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s ) for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )

View file

@ -177,6 +177,9 @@ private:
void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events); void SendEvent(EventHandlerPtr ev, list<Val*> events);
// Call predicate function and return result
bool CallPred(Func* pred_func, const int numvals, ...);
// get a hashkey for a set of threading::Values // get a hashkey for a set of threading::Values
HashKey* HashValues(const int num_elements, const threading::Value* const *vals); HashKey* HashValues(const int num_elements, const threading::Value* const *vals);