diff --git a/doc/input.rst b/doc/input.rst index 78e96fe06e..e201af9fed 100644 --- a/doc/input.rst +++ b/doc/input.rst @@ -34,9 +34,11 @@ very similar to the abstracts used in the logging framework: Readers A reader defines the input format for the specific input stream. - At the moment, Bro comes with only one type of reader, which can - read the tab seperated ASCII logfiles that were generated by the + At the moment, Bro comes with two types of reader. The default reader is READER_ASCII, + which can read the tab seperated ASCII logfiles that were generated by the logging framework. + READER_RAW can files containing records separated by a character(like e.g. newline) and send + one event per line. Basics @@ -68,7 +70,21 @@ The fields that can be set when creating a stream are: ``reader`` The reader used for this stream. Default is ``READER_ASCII``. - + + ``mode`` + The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``. + Default is ``MANUAL``. + ``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not + be reflected in the data bro knows. + ``REREAD`` means that the whole file is read again each time a change is found. This should be used for + files that are mapped to a table where individual lines can change. + ``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new + data is added to the file. + + ``autostart`` + If set to yes, the first update operation is triggered automatically after the first filter has been added to the stream. + This has to be set to false if several filters are added to the input source. + In this case Input::force_update has to be called manually once after all filters have been added. Filters ======= @@ -101,9 +117,6 @@ could be defined as follows: ... Input::add_eventfilter(Foo::INPUT, [$name="input", $fields=Val, $ev=line]); - - # read the file after all filters have been set - Input::force_update(Foo::INPUT); } The fields that can be set for an event filter are: @@ -156,7 +169,7 @@ an approach similar to this: Input::add_tablefilter(Foo::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=conn_attempts]); - # read the file after all filters have been set + # read the file after all filters have been set (only needed if autostart is set to false) Input::force_update(Foo::INPUT); } diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index 445f947106..c6995121bd 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -182,9 +182,6 @@ function read_table(description: Input::StreamDescription, filter: Input::TableF if ( ok ) { ok = add_tablefilter(id, filter); } - if ( ok ) { - ok = force_update(id); - } if ( ok ) { ok = remove_stream(id); } else { diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 27580e0e82..db98cb7a33 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -327,7 +327,6 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) { return false; } - Field** logf = new Field*[fieldsV.size()]; for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { logf[i] = fieldsV[i]; @@ -380,6 +379,30 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) { } TableVal *dst = fval->LookupWithDefault(rtype->FieldOffset("destination"))->AsTableVal(); + // check if index fields match tabla description + { + int num = idx->NumFields(); + const type_list* tl = dst->Type()->AsTableType()->IndexTypes(); + + loop_over_list(*tl, j) + { + if ( j >= num ) { + reporter->Error("Table type has more indexes than index definition"); + return false; + } + + if ( !same_type(idx->FieldType(j), (*tl)[j]) ) { + reporter->Error("Table type does not match index type"); + return false; + } + } + + if ( num != j ) { + reporter->Error("Table has less elements than index definition"); + return false; + } + } + Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); Val* event_val = fval->LookupWithDefault(rtype->FieldOffset("ev")); @@ -571,7 +594,6 @@ bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) { reporter->Error("Stream not found in RemoveStreamContinuation"); return false; - } bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { @@ -738,7 +760,6 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu idxval = l; } - //reporter->Error("Position: %d, num_fields: %d", position, num_fields); assert ( position == num_fields ); return idxval; @@ -771,8 +792,6 @@ void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals delete vals[i]; } delete [] vals; - - } int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) { @@ -846,17 +865,15 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va } else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); } + + bool result; + if ( filter->num_val_fields > 0 ) { // we have values + result = CallPred(filter->pred, 3, ev, predidx, valval); + } else { + // no values + result = CallPred(filter->pred, 2, ev, predidx); + } - val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. - vl.append(ev); - vl.append(predidx); - if ( filter->num_val_fields > 0 ) - vl.append(valval); - - Val* v = filter->pred->Call(&vl); - bool result = v->AsBool(); - Unref(v); - if ( result == false ) { if ( !updated ) { // throw away. Hence - we quit. And remove the entry from the current dictionary... @@ -968,14 +985,8 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { Ref(predidx); Ref(val); - val_list vl(3); - vl.append(ev); - vl.append(predidx); - vl.append(val); - Val* v = filter->pred->Call(&vl); - bool result = v->AsBool(); - Unref(v); - + bool result = CallPred(filter->pred, 3, ev, predidx, val); + if ( result == false ) { // Keep it. Hence - we quit and simply go to the next entry of lastDict // ah well - and we have to add the entry to currDict... @@ -1038,7 +1049,6 @@ void Manager::Put(const ReaderFrontend* reader, int id, Value* *vals) { } else { assert(false); } - } int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) { @@ -1132,18 +1142,15 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * } else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); } + + bool result; + if ( filter->num_val_fields > 0 ) { // we have values + result = CallPred(filter->pred, 3, ev, predidx, valval); + } else { + // no values + result = CallPred(filter->pred, 2, ev, predidx); + } - val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. - vl.append(ev); - vl.append(predidx); - if ( filter->num_val_fields > 0 ) - vl.append(valval); - - - Val* v = filter->pred->Call(&vl); - bool result = v->AsBool(); - Unref(v); - if ( result == false ) { // do nothing Unref(idxval); @@ -1154,7 +1161,6 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * } - filter->tab->Assign(idxval, valval); if ( filter->event ) { @@ -1176,13 +1182,9 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * SendEvent(filter->event, 3, ev, predidx, valval); } } - } - - - } else { // no predicates or other stuff @@ -1192,6 +1194,7 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * return filter->num_idx_fields + filter->num_val_fields; } +// Todo:: perhaps throw some kind of clear-event? void Manager::Clear(const ReaderFrontend* reader, int id) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { @@ -1207,6 +1210,7 @@ void Manager::Clear(const ReaderFrontend* reader, int id) { filter->tab->RemoveAll(); } +// put interface: delete old entry from table. bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { @@ -1235,13 +1239,7 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { int startpos = 0; Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); - val_list vl(3); - vl.append(ev); - vl.append(predidx); - vl.append(val); - Val* v = filter->pred->Call(&vl); - filterresult = v->AsBool(); - Unref(v); + filterresult = CallPred(filter->pred, 3, ev, predidx, val); if ( filterresult == false ) { // keep it. @@ -1285,6 +1283,26 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { return success; } +bool Manager::CallPred(Func* pred_func, const int numvals, ...) +{ + bool result; + val_list vl(numvals); + + va_list lP; + va_start(lP, numvals); + for ( int i = 0; i < numvals; i++ ) + { + vl.append( va_arg(lP, Val*) ); + } + va_end(lP); + + Val* v = pred_func->Call(&vl); + result = v->AsBool(); + Unref(v); + + return(result); +} + bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) { EventHandler* handler = event_registry->Lookup(name.c_str()); @@ -1341,8 +1359,15 @@ void Manager::SendEvent(EventHandlerPtr ev, list events) mgr.QueueEvent(ev, vl, SOURCE_LOCAL); } - +// Convert a bro list value to a bro record value. I / we could think about moving this functionality to val.cc RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) { + assert(position != 0 ); // we need the pointer to point to data; + + if ( request_type->Tag() != TYPE_RECORD ) { + reporter->InternalError("ListValToRecordVal called on non-record-value."); + return 0; + } + RecordVal* rec = new RecordVal(request_type->AsRecordType()); int maxpos = list->Length(); @@ -1364,20 +1389,14 @@ RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, return rec; } - - +// Convert a threading value to a record value RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) { - if ( position == 0 ) { - reporter->InternalError("Need position"); - return 0; - } + assert(position != 0); // we need the pointer to point to data. - /* if ( request_type->Tag() != TYPE_RECORD ) { - reporter->InternalError("I only work with records"); + reporter->InternalError("ValueToRecordVal called on non-record-value."); return 0; - } */ - + } RecordVal* rec = new RecordVal(request_type->AsRecordType()); for ( int i = 0; i < request_type->NumFields(); i++ ) { @@ -1394,11 +1413,12 @@ RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *reque } return rec; - } - +// Count the length of the values +// used to create a correct length buffer for hashing later int Manager::GetValueLength(const Value* val) { + assert( val->present ); // presence has to be checked elsewhere int length = 0; switch (val->type) { @@ -1485,19 +1505,20 @@ int Manager::GetValueLength(const Value* val) { } +// Given a threading::value, copy the raw data bytes into *data and return how many bytes were copied. +// Used for hashing the values for lookup in the bro table int Manager::CopyValue(char *data, const int startpos, const Value* val) { + assert( val->present ); // presence has to be checked elsewhere + switch ( val->type ) { case TYPE_BOOL: case TYPE_INT: - //reporter->Error("Adding field content to pos %d: %lld", val->val.int_val, startpos); memcpy(data+startpos, (const void*) &(val->val.int_val), sizeof(val->val.int_val)); - //*(data+startpos) = val->val.int_val; return sizeof(val->val.int_val); break; case TYPE_COUNT: case TYPE_COUNTER: - //*(data+startpos) = val->val.uint_val; memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val)); return sizeof(val->val.uint_val); break; @@ -1516,7 +1537,6 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: - //*(data+startpos) = val->val.double_val; memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val)); return sizeof(val->val.double_val); break; @@ -1598,12 +1618,11 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { return 0; } - reporter->InternalError("internal error"); assert(false); return 0; - } +// Hash num_elements threading values and return the HashKey for them. At least one of the vals has to be ->present. HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { int length = 0; @@ -1633,10 +1652,9 @@ HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { assert(position == length); return new HashKey(data, length, key, true); - - } +// convert threading value to Bro value Val* Manager::ValueToVal(const Value* val, BroType* request_type) { if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { @@ -1647,7 +1665,6 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { if ( !val->present ) { return 0; // unset field } - switch ( val->type ) { case TYPE_BOOL: @@ -1760,8 +1777,7 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { reporter->InternalError("unsupported type for input_read"); } - - reporter->InternalError("Impossible error"); + assert(false); return NULL; } @@ -1778,7 +1794,6 @@ Manager::ReaderInfo* Manager::FindReader(const ReaderFrontend* reader) return 0; } - Manager::ReaderInfo* Manager::FindReader(const EnumVal* id) { for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) diff --git a/src/input/Manager.h b/src/input/Manager.h index b4fc6cff7f..96ea0e43db 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -177,6 +177,9 @@ private: void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, list events); + // Call predicate function and return result + bool CallPred(Func* pred_func, const int numvals, ...); + // get a hashkey for a set of threading::Values HashKey* HashValues(const int num_elements, const threading::Value* const *vals);