diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index c9ce0e321e..7a372dc120 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -4,7 +4,7 @@ module Input; export { - + ## The default input reader used. Defaults to `READER_ASCII`. const default_reader = READER_ASCII &redef; @@ -13,52 +13,56 @@ export { ## TableFilter description type used for the `table` method. type TableDescription: record { ## Common definitions for tables and events - + ## String that allows the reader to find the source. ## For `READER_ASCII`, this is the filename. source: string; - - ## Reader to use for this steam + + ## Reader to use for this stream reader: Reader &default=default_reader; ## Read mode to use for this stream mode: Mode &default=default_mode; ## Descriptive name. Used to remove a stream at a later time - name: string; + name: string; - ## Special definitions for tables + # Special definitions for tables - ## Table which will contain the data read by the input framework + ## Table which will receive the data read by the input framework destination: any; + ## Record that defines the values used as the index of the table idx: any; - ## Record that defines the values used as the values of the table + + ## Record that defines the values used as the elements of the table ## If val is undefined, destination has to be a set. val: any &optional; - ## Defines if the value of the table is a record (default), or a single value. - ## Val can only contain one element when this is set to false. + + ## Defines if the value of the table is a record (default), or a single value. Val + ## can only contain one element when this is set to false. want_record: bool &default=T; - ## The event that is raised each time a value is added to, changed in or removed from the table. - ## The event will receive an Input::Event enum as the first argument, the idx record as the second argument - ## and the value (record) as the third argument. - ev: any &optional; # event containing idx, val as values. + ## The event that is raised each time a value is added to, changed in or removed + ## from the table. The event will receive an Input::Event enum as the first + ## argument, the idx record as the second argument and the value (record) as the + ## third argument. + ev: any &optional; # event containing idx, val as values. - ## Predicate function, that can decide if an insertion, update or removal should really be executed. - ## Parameters are the same as for the event. If true is returned, the update is performed. If false - ## is returned, it is skipped + ## Predicate function that can decide if an insertion, update or removal should + ## really be executed. Parameters are the same as for the event. If true is + ## returned, the update is performed. If false is returned, it is skipped. pred: function(typ: Input::Event, left: any, right: any): bool &optional; }; ## EventFilter description type used for the `event` method. type EventDescription: record { ## Common definitions for tables and events - + ## String that allows the reader to find the source. ## For `READER_ASCII`, this is the filename. source: string; - + ## Reader to use for this steam reader: Reader &default=default_reader; @@ -66,19 +70,20 @@ export { mode: Mode &default=default_mode; ## Descriptive name. Used to remove a stream at a later time - name: string; + name: string; + + # Special definitions for events - ## Special definitions for events - ## Record describing the fields to be retrieved from the source input. fields: any; + ## If want_record if false (default), the event receives each value in fields as a seperate argument. ## If it is set to true, the event receives all fields in a signle record value. want_record: bool &default=F; ## The event that is rised each time a new line is received from the reader. ## The event will receive an Input::Event enum as the first element, and the fields as the following arguments. - ev: any; + ev: any; }; @@ -86,7 +91,7 @@ export { ## ## description: `TableDescription` record describing the source. global add_table: function(description: Input::TableDescription) : bool; - + ## Create a new event input from a given source. Returns true on success. ## ## description: `TableDescription` record describing the source. diff --git a/scripts/base/frameworks/input/readers/ascii.bro b/scripts/base/frameworks/input/readers/ascii.bro index 14c04757f7..7fca1ad795 100644 --- a/scripts/base/frameworks/input/readers/ascii.bro +++ b/scripts/base/frameworks/input/readers/ascii.bro @@ -1,4 +1,6 @@ ##! Interface for the ascii input reader. +##! +##! The defaults are set to match Bro's ASCII output. module InputAscii; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 3bae7dbb28..6cae5e2f34 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -23,27 +23,41 @@ using namespace input; using threading::Value; using threading::Field; +struct ReaderDefinition { + bro_int_t type; // The reader type. + const char *name; // Descriptive name for error messages. + bool (*init)(); // Optional one-time initializing function. + ReaderBackend* (*factory)(ReaderFrontend* frontend); // Factory function for creating instances. +}; + +ReaderDefinition input_readers[] = { + { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, + { BifEnum::Input::READER_RAW, "Raw", 0, reader::Raw::Instantiate }, + { BifEnum::Input::READER_BENCHMARK, "Benchmark", 0, reader::Benchmark::Instantiate }, + + // End marker + { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } +}; + /** - * InputHashes are used as Dictionaries to store the value and index hashes for all - * lines currently stored in a table. Index hash is stored as HashKey*, because it is - * thrown into other Bro functions that need the complex structure of it. - * For everything we do (with values), we just take the hash_t value and compare it - * directly with == + * InputHashes are used as Dictionaries to store the value and index hashes + * for all lines currently stored in a table. Index hash is stored as + * HashKey*, because it is thrown into other Bro functions that need the + * complex structure of it. For everything we do (with values), we just take + * the hash_t value and compare it directly with "==" */ -struct InputHash - { +struct InputHash { hash_t valhash; - HashKey* idxkey; + HashKey* idxkey; ~InputHash(); - }; +}; -InputHash::~InputHash() +InputHash::~InputHash() { - if ( idxkey ) - delete idxkey; - } + delete idxkey; + } -static void input_hash_delete_func(void* val) +static void input_hash_delete_func(void* val) { InputHash* h = (InputHash*) val; delete h; @@ -52,14 +66,14 @@ static void input_hash_delete_func(void* val) declare(PDict, InputHash); /** - * Base stuff that every stream can do + * Base stuff that every stream can do. */ class Manager::Stream { public: string name; string source; bool removed; - + int mode; StreamType stream_type; // to distinguish between event and table streams @@ -73,23 +87,24 @@ public: virtual ~Stream(); }; -Manager::Stream::Stream() +Manager::Stream::Stream() { - type = 0; - reader = 0; - description = 0; + type = 0; + reader = 0; + description = 0; removed = false; } -Manager::Stream::~Stream() +Manager::Stream::~Stream() { - if ( type ) + if ( type ) Unref(type); - if ( description ) + + if ( description ) Unref(description); - if ( reader ) - delete(reader); + if ( reader ) + delete(reader); } class Manager::TableStream: public Manager::Stream { @@ -98,7 +113,7 @@ public: unsigned int num_idx_fields; unsigned int num_val_fields; bool want_record; - EventHandlerPtr table_event; + EventHandlerPtr table_event; TableVal* tab; RecordType* rtype; @@ -107,9 +122,9 @@ public: PDict(InputHash)* currDict; PDict(InputHash)* lastDict; - Func* pred; + Func* pred; - EventHandlerPtr event; + EventHandlerPtr event; TableStream(); ~TableStream(); @@ -122,15 +137,15 @@ public: RecordType* fields; unsigned int num_fields; - bool want_record; + bool want_record; EventStream(); ~EventStream(); }; -Manager::TableStream::TableStream() : Manager::Stream::Stream() +Manager::TableStream::TableStream() : Manager::Stream::Stream() { - stream_type = TABLE_FILTER; - + stream_type = TABLE_STREAM; + tab = 0; itype = 0; rtype = 0; @@ -141,61 +156,47 @@ Manager::TableStream::TableStream() : Manager::Stream::Stream() pred = 0; } -Manager::EventStream::EventStream() : Manager::Stream::Stream() +Manager::EventStream::EventStream() : Manager::Stream::Stream() { fields = 0; - stream_type = EVENT_FILTER; + stream_type = EVENT_STREAM; } -Manager::EventStream::~EventStream() +Manager::EventStream::~EventStream() { - if ( fields ) + if ( fields ) Unref(fields); } -Manager::TableStream::~TableStream() +Manager::TableStream::~TableStream() { if ( tab ) Unref(tab); - if ( itype ) + + if ( itype ) Unref(itype); + if ( rtype ) // can be 0 for sets Unref(rtype); - if ( currDict != 0 ) + if ( currDict != 0 ) { currDict->Clear(); delete currDict; } - if ( lastDict != 0 ) + if ( lastDict != 0 ) { lastDict->Clear();; delete lastDict; } - } - -struct ReaderDefinition { - bro_int_t type; // the type - const char *name; // descriptive name for error messages - bool (*init)(); // optional one-time inifializing function - ReaderBackend* (*factory)(ReaderFrontend* frontend); // factory function for creating instances -}; - -ReaderDefinition input_readers[] = { - { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, - { BifEnum::Input::READER_RAW, "Raw", 0, reader::Raw::Instantiate }, - { BifEnum::Input::READER_BENCHMARK, "Benchmark", 0, reader::Benchmark::Instantiate }, - - // End marker - { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } -}; + } Manager::Manager() { } -Manager::~Manager() +Manager::~Manager() { for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { @@ -205,47 +206,48 @@ Manager::~Manager() } -ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) +ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) { ReaderDefinition* ir = input_readers; - while ( true ) + while ( true ) { - if ( ir->type == BifEnum::Input::READER_DEFAULT ) + if ( ir->type == BifEnum::Input::READER_DEFAULT ) { reporter->Error("The reader that was requested was not found and could not be initialized."); return 0; } - if ( ir->type != type ) + if ( ir->type != type ) { // no, didn't find the right one... ++ir; continue; } - + // call init function of writer if presnt - if ( ir->init ) + if ( ir->init ) { - if ( (*ir->init)() ) + if ( (*ir->init)() ) { //clear it to be not called again ir->init = 0; - } + } + else { // ohok. init failed, kill factory for all eternity ir->factory = 0; DBG_LOG(DBG_LOGGING, "Failed to init input class %s", ir->name); return 0; } - + } - - if ( !ir->factory ) + + if ( ! ir->factory ) // no factory? return 0; - + // all done. break. break; } @@ -259,45 +261,43 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) return backend; } -// create a new input reader object to be used at whomevers leisure lateron. -bool Manager::CreateStream(Stream* info, RecordVal* description) +// Create a new input reader object to be used at whomevers leisure lateron. +bool Manager::CreateStream(Stream* info, RecordVal* description) { ReaderDefinition* ir = input_readers; - + RecordType* rtype = description->Type()->AsRecordType(); - if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0) + if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0) || same_type(rtype, BifType::Record::Input::EventDescription, 0) ) ) { reporter->Error("Streamdescription argument not of right type for new input stream"); return false; } - + Val* name_val = description->LookupWithDefault(rtype->FieldOffset("name")); string name = name_val->AsString()->CheckString(); Unref(name_val); + Stream *i = FindStream(name); + if ( i != 0 ) { - Stream *i = FindStream(name); - if ( i != 0 ) - { - reporter->Error("Trying create already existing input stream %s", - name.c_str()); - return false; - } + reporter->Error("Trying create already existing input stream %s", + name.c_str()); + return false; } EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); - ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt()); - assert(reader_obj); - - // get the source... + ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt()); + assert(reader_obj); + + // get the source ... Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source")); assert ( sourceval != 0 ); const BroString* bsource = sourceval->AsString(); string source((const char*) bsource->Bytes(), bsource->Len()); Unref(sourceval); - + EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); info->mode = mode->InternalInt(); Unref(mode); @@ -311,25 +311,23 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", name.c_str()); - + return true; - } -bool Manager::CreateEventStream(RecordVal* fval) +bool Manager::CreateEventStream(RecordVal* fval) { - RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) ) { reporter->Error("EventDescription argument not of right type"); return false; } - + EventStream* stream = new EventStream(); { bool res = CreateStream(stream, fval); - if ( res == false ) + if ( res == false ) { delete stream; return false; @@ -337,94 +335,93 @@ bool Manager::CreateEventStream(RecordVal* fval) } - RecordType *fields = fval->LookupWithDefault(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType(); - - Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); + RecordType *fields = fval->LookupWithDefault(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType(); + + Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); Val* event_val = fval->LookupWithDefault(rtype->FieldOffset("ev")); Func* event = event_val->AsFunc(); Unref(event_val); + FuncType* etype = event->FType()->AsFuncType(); + + if ( ! etype->IsEvent() ) { - FuncType* etype = event->FType()->AsFuncType(); - - if ( ! etype->IsEvent() ) + reporter->Error("stream event is a function, not an event"); + return false; + } + + const type_list* args = etype->ArgTypes()->Types(); + + if ( args->length() < 2 ) + { + reporter->Error("event takes not enough arguments"); + return false; + } + + if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) + { + reporter->Error("events second attribute must be of type Input::Event"); + return false; + } + + if ( ! same_type((*args)[0], BifType::Record::Input::EventDescription, 0) ) + { + reporter->Error("events first attribute must be of type Input::EventDescription"); + return false; + } + + if ( want_record->InternalInt() == 0 ) + { + if ( args->length() != fields->NumFields() + 2 ) { - reporter->Error("stream event is a function, not an event"); + reporter->Error("event has wrong number of arguments"); return false; } - const type_list* args = etype->ArgTypes()->Types(); - - if ( args->length() < 2 ) + for ( int i = 0; i < fields->NumFields(); i++ ) { - reporter->Error("event takes not enough arguments"); - return false; - } - - if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) - { - reporter->Error("events second attribute must be of type Input::Event"); - return false; - } - - if ( ! same_type((*args)[0], BifType::Record::Input::EventDescription, 0) ) - { - reporter->Error("events first attribute must be of type Input::EventDescription"); - return false; - } - - if ( want_record->InternalInt() == 0 ) - { - if ( args->length() != fields->NumFields() + 2 ) - { - reporter->Error("event has wrong number of arguments"); - return false; - } - - for ( int i = 0; i < fields->NumFields(); i++ ) - { - if ( !same_type((*args)[i+2], fields->FieldType(i) ) ) - { - reporter->Error("Incompatible type for event"); - return false; - } - } - - } - else if ( want_record->InternalInt() == 1 ) - { - if ( args->length() != 3 ) - { - reporter->Error("event has wrong number of arguments"); - return false; - } - - if ( !same_type((*args)[2], fields ) ) + if ( !same_type((*args)[i+2], fields->FieldType(i) ) ) { reporter->Error("Incompatible type for event"); return false; } - - } - else - assert(false); + } - } + } + + else if ( want_record->InternalInt() == 1 ) + { + if ( args->length() != 3 ) + { + reporter->Error("event has wrong number of arguments"); + return false; + } + + if ( !same_type((*args)[2], fields ) ) + { + reporter->Error("Incompatible type for event"); + return false; + } + + } + + else + assert(false); vector fieldsV; // vector, because UnrollRecordType needs it bool status = !UnrollRecordType(&fieldsV, fields, ""); - if ( status ) + if ( status ) { reporter->Error("Problem unrolling"); return false; } - + Field** logf = new Field*[fieldsV.size()]; - for ( unsigned int i = 0; i < fieldsV.size(); i++ ) + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) logf[i] = fieldsV[i]; Unref(fields); // ref'd by lookupwithdefault @@ -445,7 +442,7 @@ bool Manager::CreateEventStream(RecordVal* fval) return true; } -bool Manager::CreateTableStream(RecordVal* fval) +bool Manager::CreateTableStream(RecordVal* fval) { RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) ) @@ -457,7 +454,7 @@ bool Manager::CreateTableStream(RecordVal* fval) TableStream* stream = new TableStream(); { bool res = CreateStream(stream, fval); - if ( res == false ) + if ( res == false ) { delete stream; return false; @@ -468,8 +465,8 @@ bool Manager::CreateTableStream(RecordVal* fval) RecordType *idx = fval->LookupWithDefault(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType(); RecordType *val = 0; - - if ( fval->LookupWithDefault(rtype->FieldOffset("val")) != 0 ) + + if ( fval->LookupWithDefault(rtype->FieldOffset("val")) != 0 ) { val = fval->LookupWithDefault(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType(); Unref(val); // The lookupwithdefault in the if-clause ref'ed val. @@ -478,30 +475,28 @@ bool Manager::CreateTableStream(RecordVal* fval) TableVal *dst = fval->LookupWithDefault(rtype->FieldOffset("destination"))->AsTableVal(); // check if index fields match table description + int num = idx->NumFields(); + const type_list* tl = dst->Type()->AsTableType()->IndexTypes(); + + loop_over_list(*tl, j) { - int num = idx->NumFields(); - const type_list* tl = dst->Type()->AsTableType()->IndexTypes(); - - loop_over_list(*tl, j) + if ( j >= num ) { - if ( j >= num ) - { - reporter->Error("Table type has more indexes than index definition"); - return false; - } - - if ( !same_type(idx->FieldType(j), (*tl)[j]) ) - { - reporter->Error("Table type does not match index type"); - return false; - } - } - - if ( num != j ) - { - reporter->Error("Table has less elements than index definition"); + reporter->Error("Table type has more indexes than index definition"); return false; } + + if ( ! same_type(idx->FieldType(j), (*tl)[j]) ) + { + reporter->Error("Table type does not match index type"); + return false; + } + } + + if ( num != j ) + { + reporter->Error("Table has less elements than index definition"); + return false; } Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); @@ -509,12 +504,12 @@ bool Manager::CreateTableStream(RecordVal* fval) Val* event_val = fval->LookupWithDefault(rtype->FieldOffset("ev")); Func* event = event_val ? event_val->AsFunc() : 0; Unref(event_val); - - if ( event ) + + if ( event ) { FuncType* etype = event->FType()->AsFuncType(); - - if ( ! etype->IsEvent() ) + + if ( ! etype->IsEvent() ) { reporter->Error("stream event is a function, not an event"); return false; @@ -522,37 +517,37 @@ bool Manager::CreateTableStream(RecordVal* fval) const type_list* args = etype->ArgTypes()->Types(); - if ( args->length() != 4 ) + if ( args->length() != 4 ) { reporter->Error("Table event must take 4 arguments"); return false; } - if ( ! same_type((*args)[0], BifType::Record::Input::TableDescription, 0) ) + if ( ! same_type((*args)[0], BifType::Record::Input::TableDescription, 0) ) { reporter->Error("table events first attribute must be of type Input::TableDescription"); return false; - } + } - if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) + if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) { reporter->Error("table events second attribute must be of type Input::Event"); return false; - } + } - if ( ! same_type((*args)[2], idx) ) + if ( ! same_type((*args)[2], idx) ) { reporter->Error("table events index attributes do not match"); return false; - } - - if ( want_record->InternalInt() == 1 && ! same_type((*args)[3], val) ) + } + + if ( want_record->InternalInt() == 1 && ! same_type((*args)[3], val) ) { reporter->Error("table events value attributes do not match"); return false; - } - else if ( want_record->InternalInt() == 0 - && !same_type((*args)[3], val->FieldType(0) ) ) + } + else if ( want_record->InternalInt() == 0 + && !same_type((*args)[3], val->FieldType(0) ) ) { reporter->Error("table events value attribute does not match"); return false; @@ -560,33 +555,32 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(want_record->InternalInt() == 1 || want_record->InternalInt() == 0); - } + } vector fieldsV; // vector, because we don't know the length beforehands bool status = !UnrollRecordType(&fieldsV, idx, ""); int idxfields = fieldsV.size(); - + if ( val ) // if we are not a set status = status || !UnrollRecordType(&fieldsV, val, ""); int valfields = fieldsV.size() - idxfields; - if ( !val ) + if ( ! val ) assert(valfields == 0); - if ( status ) + if ( status ) { reporter->Error("Problem unrolling"); return false; } - - + Field** fields = new Field*[fieldsV.size()]; - for ( unsigned int i = 0; i < fieldsV.size(); i++ ) + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) fields[i] = fieldsV[i]; - + stream->pred = pred ? pred->AsFunc() : 0; stream->num_idx_fields = idxfields; stream->num_val_fields = valfields; @@ -603,9 +597,9 @@ bool Manager::CreateTableStream(RecordVal* fval) Unref(want_record); // ref'd by lookupwithdefault Unref(pred); - if ( valfields > 1 ) + if ( valfields > 1 ) { - if ( ! stream->want_record ) + if ( ! stream->want_record ) { reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", stream->name.c_str()); delete stream; @@ -664,7 +658,7 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only) { if ( atomic_only ) return false; - + return IsCompatibleType(t->AsVectorType()->YieldType(), true); } @@ -676,14 +670,14 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only) } -bool Manager::RemoveStream(const string &name) +bool Manager::RemoveStream(const string &name) { Stream *i = FindStream(name); - if ( i == 0 ) + if ( i == 0 ) return false; // not found - if ( i->removed ) + if ( i->removed ) { reporter->Error("Stream %s is already queued for removal. Ignoring remove.", name.c_str()); return false; @@ -701,11 +695,11 @@ bool Manager::RemoveStream(const string &name) return true; } -bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) +bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->Error("Stream not found in RemoveStreamContinuation"); return false; @@ -718,49 +712,51 @@ bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) readers.erase(reader); delete(i); + return true; } -bool Manager::UnrollRecordType(vector *fields, - const RecordType *rec, const string& nameprepend) +bool Manager::UnrollRecordType(vector *fields, + const RecordType *rec, const string& nameprepend) { - for ( int i = 0; i < rec->NumFields(); i++ ) + for ( int i = 0; i < rec->NumFields(); i++ ) { - if ( !IsCompatibleType(rec->FieldType(i)) ) + if ( ! IsCompatibleType(rec->FieldType(i)) ) { reporter->Error("Incompatible type \"%s\" in table definition for ReaderFrontend", type_name(rec->FieldType(i)->Tag())); return false; } - if ( rec->FieldType(i)->Tag() == TYPE_RECORD ) + if ( rec->FieldType(i)->Tag() == TYPE_RECORD ) { string prep = nameprepend + rec->FieldName(i) + "."; - - if ( !UnrollRecordType(fields, rec->FieldType(i)->AsRecordType(), prep) ) + + if ( !UnrollRecordType(fields, rec->FieldType(i)->AsRecordType(), prep) ) { return false; } - } - else + } + + else { Field* field = new Field(); field->name = nameprepend + rec->FieldName(i); - field->type = rec->FieldType(i)->Tag(); - if ( field->type == TYPE_TABLE ) - { + field->type = rec->FieldType(i)->Tag(); + + if ( field->type == TYPE_TABLE ) field->subtype = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); - } - else if ( field->type == TYPE_VECTOR ) - { + + else if ( field->type == TYPE_VECTOR ) field->subtype = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); - } else if ( field->type == TYPE_PORT && - rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) + + else if ( field->type == TYPE_PORT && + rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) { // we have an annotation for the second column - + Val* c = rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN)->AttrExpr()->Eval(0); assert(c); @@ -769,7 +765,7 @@ bool Manager::UnrollRecordType(vector *fields, field->secondary_name = c->AsStringVal()->AsString()->CheckString(); } - if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) + if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) field->optional = true; fields->push_back(field); @@ -782,30 +778,29 @@ bool Manager::UnrollRecordType(vector *fields, bool Manager::ForceUpdate(const string &name) { Stream *i = FindStream(name); - if ( i == 0 ) + if ( i == 0 ) { reporter->Error("Stream %s not found", name.c_str()); return false; } - - if ( i->removed ) + + if ( i->removed ) { reporter->Error("Stream %s is already queued for removal. Ignoring force update.", name.c_str()); return false; } - + i->reader->Update(); #ifdef DEBUG - DBG_LOG(DBG_INPUT, "Forcing update of stream %s", - name.c_str()); + DBG_LOG(DBG_INPUT, "Forcing update of stream %s", name.c_str()); #endif return true; // update is async :( } -Val* Manager::RecordValToIndexVal(RecordVal *r) +Val* Manager::RecordValToIndexVal(RecordVal *r) { Val* idxval; @@ -813,16 +808,15 @@ Val* Manager::RecordValToIndexVal(RecordVal *r) int num_fields = type->NumFields(); - if ( num_fields == 1 && type->FieldDecl(0)->type->Tag() != TYPE_RECORD ) - { + if ( num_fields == 1 && type->FieldDecl(0)->type->Tag() != TYPE_RECORD ) idxval = r->LookupWithDefault(0); - } - else + + else { ListVal *l = new ListVal(TYPE_ANY); - for ( int j = 0 ; j < num_fields; j++ ) + for ( int j = 0 ; j < num_fields; j++ ) l->Append(r->LookupWithDefault(j)); - + idxval = l; } @@ -831,23 +825,27 @@ Val* Manager::RecordValToIndexVal(RecordVal *r) } -Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) +Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) { Val* idxval; int position = 0; - if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) { + if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) + { idxval = ValueToVal(vals[0], type->FieldType(0)); position = 1; - } else { + } + + else + { ListVal *l = new ListVal(TYPE_ANY); - for ( int j = 0 ; j < type->NumFields(); j++ ) + for ( int j = 0 ; j < type->NumFields(); j++ ) { - if ( type->FieldType(j)->Tag() == TYPE_RECORD ) - l->Append(ValueToRecordVal(vals, + if ( type->FieldType(j)->Tag() == TYPE_RECORD ) + l->Append(ValueToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position)); - else + else { l->Append(ValueToVal(vals[position], type->FieldType(j))); position++; @@ -862,66 +860,70 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu } -void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) +void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->InternalError("Unknown reader in SendEntry"); return; } - int readFields; - if ( i->stream_type == TABLE_FILTER ) + int readFields = 0; + + if ( i->stream_type == TABLE_STREAM ) readFields = SendEntryTable(i, vals); - else if ( i->stream_type == EVENT_FILTER ) + + else if ( i->stream_type == EVENT_STREAM ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - readFields = SendEventStreamEvent(i, type, vals); - } - else + readFields = SendEventStreamEvent(i, type, vals); + } + + else assert(false); - for ( int i = 0; i < readFields; i++ ) + for ( int i = 0; i < readFields; i++ ) delete vals[i]; - delete [] vals; + delete [] vals; } -int Manager::SendEntryTable(Stream* i, const Value* const *vals) +int Manager::SendEntryTable(Stream* i, const Value* const *vals) { bool updated = false; assert(i); - assert(i->stream_type == TABLE_FILTER); + assert(i->stream_type == TABLE_STREAM); TableStream* stream = (TableStream*) i; HashKey* idxhash = HashValues(stream->num_idx_fields, vals); - - if ( idxhash == 0 ) + + if ( idxhash == 0 ) { reporter->Error("Could not hash line. Ignoring"); return stream->num_val_fields + stream->num_idx_fields; - } - + } + hash_t valhash = 0; - if ( stream->num_val_fields > 0 ) + if ( stream->num_val_fields > 0 ) { HashKey* valhashkey = HashValues(stream->num_val_fields, vals+stream->num_idx_fields); - if ( valhashkey == 0 ) { + if ( valhashkey == 0 ) + { // empty line. index, but no values. // hence we also have no hash value... } - else + else { - valhash = valhashkey->Hash(); - delete(valhashkey); + valhash = valhashkey->Hash(); + delete(valhashkey); } } - InputHash *h = stream->lastDict->Lookup(idxhash); - if ( h != 0 ) + InputHash *h = stream->lastDict->Lookup(idxhash); + if ( h != 0 ) { // seen before if ( stream->num_val_fields == 0 || h->valhash == valhash ) @@ -932,41 +934,41 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) delete idxhash; return stream->num_val_fields + stream->num_idx_fields; } - else + + else { assert( stream->num_val_fields > 0 ); // entry was updated in some way stream->lastDict->Remove(idxhash); // keep h for predicates updated = true; - + } } - Val* valval; RecordVal* predidx = 0; - + int position = stream->num_idx_fields; - if ( stream->num_val_fields == 0 ) + + if ( stream->num_val_fields == 0 ) valval = 0; - else if ( stream->num_val_fields == 1 && !stream->want_record ) + + else if ( stream->num_val_fields == 1 && !stream->want_record ) valval = ValueToVal(vals[position], stream->rtype->FieldType(0)); - else + + else valval = ValueToRecordVal(vals, stream->rtype, &position); // call stream first to determine if we really add / change the entry - if ( stream->pred ) + if ( stream->pred ) { EnumVal* ev; - //Ref(idxval); - int startpos = 0; - //Val* predidx = ListValToRecordVal(idxval->AsListVal(), stream->itype, &startpos); + int startpos = 0; predidx = ValueToRecordVal(vals, stream->itype, &startpos); - //ValueToRecordVal(vals, stream->itype, &startpos); - if ( updated ) + if ( updated ) ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); else ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); @@ -976,12 +978,13 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) result = CallPred(stream->pred, 3, ev, predidx->Ref(), valval->Ref()); else // no values result = CallPred(stream->pred, 2, ev, predidx->Ref()); - - if ( result == false ) + + if ( result == false ) { Unref(predidx); Unref(valval); - if ( !updated ) + + if ( ! updated ) { // throw away. Hence - we quit. And remove the entry from the current dictionary... // (but why should it be in there? assert this). @@ -989,8 +992,9 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) delete idxhash; delete h; return stream->num_val_fields + stream->num_idx_fields; - } - else + } + + else { // keep old one stream->currDict->Insert(idxhash, h); @@ -998,42 +1002,37 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) return stream->num_val_fields + stream->num_idx_fields; } } - - } + } // now we don't need h anymore - if we are here, the entry is updated and a new h is created. - if ( h ) + if ( h ) { delete h; h = 0; } - + Val* idxval; - if ( predidx != 0 ) + if ( predidx != 0 ) { idxval = RecordValToIndexVal(predidx); // I think there is an unref missing here. But if I insert is, it crashes :) - } - else + } + else idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); - + Val* oldval = 0; - if ( updated == true ) + if ( updated == true ) { assert(stream->num_val_fields > 0); // in that case, we need the old value to send the event (if we send an event). oldval = stream->tab->Lookup(idxval, false); } - //i->tab->Assign(idxval, valval); assert(idxval); HashKey* k = stream->tab->ComputeHash(idxval); - if ( !k ) - { + if ( ! k ) reporter->InternalError("could not hash"); - assert(false); - } InputHash* ih = new InputHash(); ih->idxkey = new HashKey(k->Key(), k->Size(), k->Hash()); @@ -1044,63 +1043,62 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) stream->tab->Assign(idxval, k, valval); Unref(idxval); // asssign does not consume idxval. + if ( predidx != 0 ) Unref(predidx); stream->currDict->Insert(idxhash, ih); delete idxhash; - if ( stream->event ) + if ( stream->event ) { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); - if ( updated ) + if ( updated ) { // in case of update send back the old value. assert ( stream->num_val_fields > 0 ); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, oldval); - } - else + } + + else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - if ( stream->num_val_fields == 0 ) + if ( stream->num_val_fields == 0 ) { Ref(stream->description); SendEvent(stream->event, 3, stream->description->Ref(), ev, predidx); - } - else + } + else SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, valval->Ref()); - + } - } + } - - return stream->num_val_fields + stream->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } - -void Manager::EndCurrentSend(ReaderFrontend* reader) +void Manager::EndCurrentSend(ReaderFrontend* reader) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->InternalError("Unknown reader in EndCurrentSend"); return; } #ifdef DEBUG - DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", - i->name.c_str()); + DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str()); #endif - if ( i->stream_type == EVENT_FILTER ) // nothing to do.. + if ( i->stream_type == EVENT_STREAM ) // nothing to do.. return; - assert(i->stream_type == TABLE_FILTER); + assert(i->stream_type == TABLE_STREAM); TableStream* stream = (TableStream*) i; // lastdict contains all deleted entries and should be empty apart from that @@ -1108,17 +1106,17 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) stream->lastDict->MakeRobustCookie(c); InputHash* ih; HashKey *lastDictIdxKey; - //while ( ( ih = i->lastDict->NextEntry(c) ) ) { - while ( ( ih = stream->lastDict->NextEntry(lastDictIdxKey, c) ) ) + + while ( ( ih = stream->lastDict->NextEntry(lastDictIdxKey, c) ) ) { ListVal * idx = 0; Val *val = 0; - + Val* predidx = 0; EnumVal* ev = 0; int startpos = 0; - if ( stream->pred || stream->event ) + if ( stream->pred || stream->event ) { idx = stream->tab->RecoverIndex(ih->idxkey); assert(idx != 0); @@ -1128,7 +1126,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); } - if ( stream->pred ) + if ( stream->pred ) { // ask predicate, if we want to expire this element... @@ -1138,7 +1136,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) bool result = CallPred(stream->pred, 3, ev, predidx, val); - if ( result == false ) + if ( result == false ) { // Keep it. Hence - we quit and simply go to the next entry of lastDict // ah well - and we have to add the entry to currDict... @@ -1147,10 +1145,10 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) stream->currDict->Insert(lastDictIdxKey, stream->lastDict->RemoveEntry(lastDictIdxKey)); delete lastDictIdxKey; continue; - } - } + } + } - if ( stream->event ) + if ( stream->event ) { Ref(predidx); Ref(val); @@ -1160,7 +1158,8 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) if ( predidx ) // if we have a stream or an event... Unref(predidx); - if ( ev ) + + if ( ev ) Unref(ev); Unref(stream->tab->Delete(ih->idxkey)); @@ -1172,54 +1171,57 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) stream->lastDict->Clear(); // should be empt. buti- well... who knows... delete(stream->lastDict); - stream->lastDict = stream->currDict; + stream->lastDict = stream->currDict; stream->currDict = new PDict(InputHash); stream->currDict->SetDeleteFunc(input_hash_delete_func); #ifdef DEBUG - DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", - i->name.c_str()); + DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", + i->name.c_str()); #endif // Send event that the current update is indeed finished. EventHandler* handler = event_registry->Lookup("Input::update_finished"); - if ( handler == 0 ) + if ( handler == 0 ) reporter->InternalError("Input::update_finished not found!"); SendEvent(handler, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); } -void Manager::Put(ReaderFrontend* reader, Value* *vals) +void Manager::Put(ReaderFrontend* reader, Value* *vals) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->InternalError("Unknown reader in Put"); return; } - int readFields; - if ( i->stream_type == TABLE_FILTER ) + int readFields = 0; + + if ( i->stream_type == TABLE_STREAM ) readFields = PutTable(i, vals); - else if ( i->stream_type == EVENT_FILTER ) + + else if ( i->stream_type == EVENT_STREAM ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); readFields = SendEventStreamEvent(i, type, vals); - } - else + } + + else assert(false); - - for ( int i = 0; i < readFields; i++ ) + + for ( int i = 0; i < readFields; i++ ) delete vals[i]; - delete [] vals; + delete [] vals; } -int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) +int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) { assert(i); - assert(i->stream_type == EVENT_FILTER); + assert(i->stream_type == EVENT_STREAM); EventStream* stream = (EventStream*) i; Val *val; @@ -1227,71 +1229,77 @@ int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const * Ref(stream->description); out_vals.push_back(stream->description); // no tracking, send everything with a new event... - //out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event)); out_vals.push_back(type); int position = 0; - if ( stream->want_record ) + + if ( stream->want_record ) { RecordVal * r = ValueToRecordVal(vals, stream->fields, &position); out_vals.push_back(r); } - else - { - for ( int j = 0; j < stream->fields->NumFields(); j++) + + else + { + for ( int j = 0; j < stream->fields->NumFields(); j++) { Val* val = 0; - if ( stream->fields->FieldType(j)->Tag() == TYPE_RECORD ) - val = ValueToRecordVal(vals, - stream->fields->FieldType(j)->AsRecordType(), + + if ( stream->fields->FieldType(j)->Tag() == TYPE_RECORD ) + val = ValueToRecordVal(vals, + stream->fields->FieldType(j)->AsRecordType(), &position); - else + + else { val = ValueToVal(vals[position], stream->fields->FieldType(j)); position++; } - out_vals.push_back(val); + + out_vals.push_back(val); } } SendEvent(stream->event, out_vals); return stream->fields->NumFields(); - } -int Manager::PutTable(Stream* i, const Value* const *vals) +int Manager::PutTable(Stream* i, const Value* const *vals) { assert(i); - assert(i->stream_type == TABLE_FILTER); - TableStream* stream = (TableStream*) i; + assert(i->stream_type == TABLE_STREAM); + TableStream* stream = (TableStream*) i; Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); Val* valval; int position = stream->num_idx_fields; - if ( stream->num_val_fields == 0 ) + + if ( stream->num_val_fields == 0 ) valval = 0; - else if ( stream->num_val_fields == 1 && stream->want_record == 0 ) + + else if ( stream->num_val_fields == 1 && stream->want_record == 0 ) valval = ValueToVal(vals[position], stream->rtype->FieldType(0)); - else + + else valval = ValueToRecordVal(vals, stream->rtype, &position); // if we have a subscribed event, we need to figure out, if this is an update or not // same for predicates - if ( stream->pred || stream->event ) + if ( stream->pred || stream->event ) { bool updated = false; Val* oldval = 0; - - if ( stream->num_val_fields > 0 ) + + if ( stream->num_val_fields > 0 ) { // in that case, we need the old value to send the event (if we send an event). oldval = stream->tab->Lookup(idxval, false); } - if ( oldval != 0 ) + if ( oldval != 0 ) { // it is an update updated = true; @@ -1300,27 +1308,27 @@ int Manager::PutTable(Stream* i, const Value* const *vals) // predicate if we want the update or not - if ( stream->pred ) + if ( stream->pred ) { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); Ref(valval); - if ( updated ) - ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, + if ( updated ) + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); - else - ev = new EnumVal(BifEnum::Input::EVENT_NEW, + else + ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - + bool result; if ( stream->num_val_fields > 0 ) // we have values result = CallPred(stream->pred, 3, ev, predidx, valval); else // no values result = CallPred(stream->pred, 2, ev, predidx); - - if ( result == false ) + + if ( result == false ) { // do nothing Unref(idxval); @@ -1331,51 +1339,51 @@ int Manager::PutTable(Stream* i, const Value* const *vals) } - stream->tab->Assign(idxval, valval); + stream->tab->Assign(idxval, valval); - if ( stream->event ) - { + if ( stream->event ) + { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); - if ( updated ) - { + if ( updated ) + { // in case of update send back the old value. assert ( stream->num_val_fields > 0 ); - ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, oldval); - } - else + } + else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); if ( stream->num_val_fields == 0 ) - SendEvent(stream->event, 4, stream->description->Ref(), + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx); else - SendEvent(stream->event, 4, stream->description->Ref(), + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, valval->Ref()); } - + } - } + } + else // no predicates or other stuff stream->tab->Assign(idxval, valval); - return stream->num_idx_fields + stream->num_val_fields; } // Todo:: perhaps throw some kind of clear-event? -void Manager::Clear(ReaderFrontend* reader) +void Manager::Clear(ReaderFrontend* reader) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->InternalError("Unknown reader in Clear"); return; @@ -1386,17 +1394,17 @@ void Manager::Clear(ReaderFrontend* reader) i->name.c_str()); #endif - assert(i->stream_type == TABLE_FILTER); - TableStream* stream = (TableStream*) i; + assert(i->stream_type == TABLE_STREAM); + TableStream* stream = (TableStream*) i; stream->tab->RemoveAll(); } // put interface: delete old entry from table. -bool Manager::Delete(ReaderFrontend* reader, Value* *vals) +bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { Stream *i = FindStream(reader); - if ( i == 0 ) + if ( i == 0 ) { reporter->InternalError("Unknown reader in Delete"); return false; @@ -1405,19 +1413,19 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) bool success = false; int readVals = 0; - if ( i->stream_type == TABLE_FILTER ) + if ( i->stream_type == TABLE_STREAM ) { - TableStream* stream = (TableStream*) i; + TableStream* stream = (TableStream*) i; Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); assert(idxval != 0); readVals = stream->num_idx_fields + stream->num_val_fields; bool streamresult = true; - if ( stream->pred || stream->event ) + if ( stream->pred || stream->event ) { Val *val = stream->tab->Lookup(idxval); - if ( stream->pred ) + if ( stream->pred ) { Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); @@ -1426,7 +1434,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) streamresult = CallPred(stream->pred, 3, ev, predidx, val); - if ( streamresult == false ) + if ( streamresult == false ) { // keep it. Unref(idxval); @@ -1436,56 +1444,58 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) } // only if stream = true -> no streaming - if ( streamresult && stream->event ) + if ( streamresult && stream->event ) { Ref(idxval); assert(val != 0); - Ref(val); + Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); SendEvent(stream->event, 4, stream->description->Ref(), ev, idxval, val); } } // only if stream = true -> no streaming - if ( streamresult ) + if ( streamresult ) { Val* retptr = stream->tab->Delete(idxval); success = ( retptr != 0 ); - if ( !success ) + if ( ! success ) reporter->Error("Internal error while deleting values from input table"); else Unref(retptr); } - - } - else if ( i->stream_type == EVENT_FILTER ) + + } + + else if ( i->stream_type == EVENT_STREAM ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - readVals = SendEventStreamEvent(i, type, vals); + readVals = SendEventStreamEvent(i, type, vals); success = true; } - else + + else { assert(false); return false; } - for ( int i = 0; i < readVals; i++ ) + for ( int i = 0; i < readVals; i++ ) delete vals[i]; - delete [] vals; + delete [] vals; return success; - } + } -bool Manager::CallPred(Func* pred_func, const int numvals, ...) +bool Manager::CallPred(Func* pred_func, const int numvals, ...) { bool result; val_list vl(numvals); - + va_list lP; va_start(lP, numvals); - for ( int i = 0; i < numvals; i++ ) + for ( int i = 0; i < numvals; i++ ) vl.append( va_arg(lP, Val*) ); va_end(lP); @@ -1497,10 +1507,10 @@ bool Manager::CallPred(Func* pred_func, const int numvals, ...) return(result); } -bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) +bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) { EventHandler* handler = event_registry->Lookup(name.c_str()); - if ( handler == 0 ) + if ( handler == 0 ) { reporter->Error("Event %s not found", name.c_str()); return false; @@ -1508,33 +1518,33 @@ bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) RecordType *type = handler->FType()->Args(); int num_event_vals = type->NumFields(); - if ( num_vals != num_event_vals ) + if ( num_vals != num_event_vals ) { reporter->Error("Wrong number of values for event %s", name.c_str()); return false; } val_list* vl = new val_list; - for ( int i = 0; i < num_vals; i++) + for ( int i = 0; i < num_vals; i++) vl->append(ValueToVal(vals[i], type->FieldType(i))); mgr.Dispatch(new Event(handler, vl)); - for ( int i = 0; i < num_vals; i++ ) + for ( int i = 0; i < num_vals; i++ ) delete vals[i]; - delete [] vals; + delete [] vals; return true; -} +} -void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) +void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) { val_list* vl = new val_list; - + va_list lP; va_start(lP, numvals); - for ( int i = 0; i < numvals; i++ ) + for ( int i = 0; i < numvals; i++ ) vl->append( va_arg(lP, Val*) ); va_end(lP); @@ -1545,8 +1555,8 @@ void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) void Manager::SendEvent(EventHandlerPtr ev, list events) { val_list* vl = new val_list; - - for ( list::iterator i = events.begin(); i != events.end(); i++ ) + + for ( list::iterator i = events.begin(); i != events.end(); i++ ) { vl->append( *i ); } @@ -1554,31 +1564,31 @@ void Manager::SendEvent(EventHandlerPtr ev, list events) mgr.QueueEvent(ev, vl, SOURCE_LOCAL); } -// Convert a bro list value to a bro record value. +// Convert a bro list value to a bro record value. // I / we could think about moving this functionality to val.cc -RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) +RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) { assert(position != 0 ); // we need the pointer to point to data; - if ( request_type->Tag() != TYPE_RECORD ) + if ( request_type->Tag() != TYPE_RECORD ) { reporter->InternalError("ListValToRecordVal called on non-record-value."); return 0; - } + } RecordVal* rec = new RecordVal(request_type->AsRecordType()); assert(list != 0); int maxpos = list->Length(); - for ( int i = 0; i < request_type->NumFields(); i++ ) + for ( int i = 0; i < request_type->NumFields(); i++ ) { assert ( (*position) <= maxpos ); Val* fieldVal = 0; if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) - fieldVal = ListValToRecordVal(list, request_type->FieldType(i)->AsRecordType(), position); - else + fieldVal = ListValToRecordVal(list, request_type->FieldType(i)->AsRecordType(), position); + else { fieldVal = list->Index(*position); (*position)++; @@ -1592,24 +1602,23 @@ RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, // Convert a threading value to a record value RecordVal* Manager::ValueToRecordVal(const Value* const *vals, - RecordType *request_type, int* position) + RecordType *request_type, int* position) { assert(position != 0); // we need the pointer to point to data. - if ( request_type->Tag() != TYPE_RECORD ) + if ( request_type->Tag() != TYPE_RECORD ) { reporter->InternalError("ValueToRecordVal called on non-record-value."); return 0; - } + } RecordVal* rec = new RecordVal(request_type->AsRecordType()); - for ( int i = 0; i < request_type->NumFields(); i++ ) + for ( int i = 0; i < request_type->NumFields(); i++ ) { - Val* fieldVal = 0; if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) - fieldVal = ValueToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); - else + fieldVal = ValueToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); + else { fieldVal = ValueToVal(vals[*position], request_type->FieldType(i)); (*position)++; @@ -1619,10 +1628,10 @@ RecordVal* Manager::ValueToRecordVal(const Value* const *vals, } return rec; - } + } -// Count the length of the values -// used to create a correct length buffer for hashing later +// Count the length of the values used to create a correct length buffer for +// hashing later int Manager::GetValueLength(const Value* val) { assert( val->present ); // presence has to be checked elsewhere int length = 0; @@ -1642,7 +1651,7 @@ int Manager::GetValueLength(const Value* val) { length += sizeof(val->val.port_val.port); length += sizeof(val->val.port_val.proto); break; - + case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: @@ -1688,17 +1697,17 @@ int Manager::GetValueLength(const Value* val) { } break; - case TYPE_TABLE: + case TYPE_TABLE: { - for ( int i = 0; i < val->val.set_val.size; i++ ) + for ( int i = 0; i < val->val.set_val.size; i++ ) length += GetValueLength(val->val.set_val.vals[i]); break; } - case TYPE_VECTOR: + case TYPE_VECTOR: { int j = val->val.vector_val.size; - for ( int i = 0; i < j; i++ ) + for ( int i = 0; i < j; i++ ) length += GetValueLength(val->val.vector_val.vals[i]); break; } @@ -1708,12 +1717,12 @@ int Manager::GetValueLength(const Value* val) { } return length; - + } // Given a threading::value, copy the raw data bytes into *data and return how many bytes were copied. // Used for hashing the values for lookup in the bro table -int Manager::CopyValue(char *data, const int startpos, const Value* val) +int Manager::CopyValue(char *data, const int startpos, const Value* val) { assert( val->present ); // presence has to be checked elsewhere @@ -1722,42 +1731,37 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) case TYPE_INT: memcpy(data+startpos, (const void*) &(val->val.int_val), sizeof(val->val.int_val)); return sizeof(val->val.int_val); - break; case TYPE_COUNT: case TYPE_COUNTER: memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val)); return sizeof(val->val.uint_val); - break; - case TYPE_PORT: + case TYPE_PORT: { int length = 0; - memcpy(data+startpos, (const void*) &(val->val.port_val.port), + memcpy(data+startpos, (const void*) &(val->val.port_val.port), sizeof(val->val.port_val.port)); length += sizeof(val->val.port_val.port); - memcpy(data+startpos+length, (const void*) &(val->val.port_val.proto), + memcpy(data+startpos+length, (const void*) &(val->val.port_val.proto), sizeof(val->val.port_val.proto)); length += sizeof(val->val.port_val.proto); return length; - break; } - + case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: - memcpy(data+startpos, (const void*) &(val->val.double_val), + memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val)); return sizeof(val->val.double_val); - break; case TYPE_STRING: case TYPE_ENUM: { memcpy(data+startpos, val->val.string_val->c_str(), val->val.string_val->length()); return val->val.string_val->size(); - break; } case TYPE_ADDR: @@ -1768,86 +1772,89 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) length = sizeof(val->val.addr_val.in.in4); memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in4), length); break; + case IPv6: length = sizeof(val->val.addr_val.in.in6); memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in6), length); break; + default: assert(false); } + return length; } - break; - - case TYPE_SUBNET: + + case TYPE_SUBNET: { int length; switch ( val->val.subnet_val.prefix.family ) { case IPv4: length = sizeof(val->val.addr_val.in.in4); - memcpy(data + startpos, + memcpy(data + startpos, (const char*) &(val->val.subnet_val.prefix.in.in4), length); break; + case IPv6: length = sizeof(val->val.addr_val.in.in6); - memcpy(data + startpos, + memcpy(data + startpos, (const char*) &(val->val.subnet_val.prefix.in.in4), length); break; + default: assert(false); } + int lengthlength = sizeof(val->val.subnet_val.length); - memcpy(data + startpos + length , + memcpy(data + startpos + length , (const char*) &(val->val.subnet_val.length), lengthlength); length += lengthlength; + return length; } - break; - case TYPE_TABLE: + case TYPE_TABLE: { int length = 0; int j = val->val.set_val.size; - for ( int i = 0; i < j; i++ ) + for ( int i = 0; i < j; i++ ) length += CopyValue(data, startpos+length, val->val.set_val.vals[i]); return length; - break; } - case TYPE_VECTOR: + case TYPE_VECTOR: { int length = 0; int j = val->val.vector_val.size; - for ( int i = 0; i < j; i++ ) + for ( int i = 0; i < j; i++ ) length += CopyValue(data, startpos+length, val->val.vector_val.vals[i]); return length; - break; } default: reporter->InternalError("unsupported type %d for CopyValue", val->type); return 0; } - + assert(false); return 0; } // Hash num_elements threading values and return the HashKey for them. At least one of the vals has to be ->present. -HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) +HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { int length = 0; - for ( int i = 0; i < num_elements; i++ ) + for ( int i = 0; i < num_elements; i++ ) { const Value* val = vals[i]; if ( val->present ) length += GetValueLength(val); } - if ( length == 0 ) + if ( length == 0 ) { reporter->Error("Input reader sent line where all elements are null values. Ignoring line"); return NULL; @@ -1855,10 +1862,10 @@ HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) int position = 0; char *data = (char*) malloc(length); - if ( data == 0 ) + if ( data == 0 ) reporter->InternalError("Could not malloc?"); - for ( int i = 0; i < num_elements; i++ ) + for ( int i = 0; i < num_elements; i++ ) { const Value* val = vals[i]; if ( val->present ) @@ -1873,16 +1880,16 @@ HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) } // convert threading value to Bro value -Val* Manager::ValueToVal(const Value* val, BroType* request_type) +Val* Manager::ValueToVal(const Value* val, BroType* request_type) { - - if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) + + if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { reporter->InternalError("Typetags don't match: %d vs %d", request_type->Tag(), val->type); return 0; } - if ( !val->present ) + if ( !val->present ) return 0; // unset field switch ( val->type ) { @@ -1894,24 +1901,20 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) case TYPE_COUNT: case TYPE_COUNTER: return new Val(val->val.uint_val, val->type); - break; - + case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: return new Val(val->val.double_val, val->type); - break; case TYPE_STRING: { BroString *s = new BroString(*(val->val.string_val)); return new StringVal(s); - break; } - + case TYPE_PORT: return new PortVal(val->val.port_val.port, val->val.port_val.proto); - break; case TYPE_ADDR: { @@ -1920,12 +1923,15 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) case IPv4: addr = new IPAddr(val->val.addr_val.in.in4); break; + case IPv6: addr = new IPAddr(val->val.addr_val.in.in6); break; + default: assert(false); } + AddrVal* addrval = new AddrVal(*addr); delete addr; return addrval; @@ -1938,19 +1944,21 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) case IPv4: addr = new IPAddr(val->val.subnet_val.prefix.in.in4); break; + case IPv6: addr = new IPAddr(val->val.subnet_val.prefix.in.in6); break; + default: assert(false); } + SubNetVal* subnetval = new SubNetVal(*addr, val->val.subnet_val.length); delete addr; return subnetval; - break; } - case TYPE_TABLE: + case TYPE_TABLE: { // all entries have to have the same type... BroType* type = request_type->AsTableType()->Indices()->PureType(); @@ -1958,7 +1966,7 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) set_index->Append(type->Ref()); SetType* s = new SetType(set_index, 0); TableVal* t = new TableVal(s); - for ( int i = 0; i < val->val.set_val.size; i++ ) + for ( int i = 0; i < val->val.set_val.size; i++ ) { Val* assignval = ValueToVal( val->val.set_val.vals[i], type ); t->Assign(assignval, 0); @@ -1967,21 +1975,19 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) Unref(s); return t; - break; } - case TYPE_VECTOR: + case TYPE_VECTOR: { // all entries have to have the same type... BroType* type = request_type->AsVectorType()->YieldType(); VectorType* vt = new VectorType(type->Ref()); VectorVal* v = new VectorVal(vt); - for ( int i = 0; i < val->val.vector_val.size; i++ ) + for ( int i = 0; i < val->val.vector_val.size; i++ ) v->Assign(i, ValueToVal( val->val.set_val.vals[i], type ), 0); Unref(vt); return v; - break; } case TYPE_ENUM: { @@ -1990,14 +1996,13 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) string module = extract_module_name(val->val.string_val->c_str()); string var = extract_var_name(val->val.string_val->c_str()); bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); - if ( index == -1 ) - reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", + if ( index == -1 ) + reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", module.c_str(), var.c_str()); - + return new EnumVal(index, request_type->Ref()->AsEnumType() ); - break; } - + default: reporter->InternalError("unsupported type for input_read"); @@ -2006,22 +2011,22 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) assert(false); return NULL; } - + Manager::Stream* Manager::FindStream(const string &name) { for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { - if ( (*s).second->name == name ) + if ( (*s).second->name == name ) return (*s).second; } return 0; } -Manager::Stream* Manager::FindStream(ReaderFrontend* reader) +Manager::Stream* Manager::FindStream(ReaderFrontend* reader) { map::iterator s = readers.find(reader); - if ( s != readers.end() ) + if ( s != readers.end() ) return s->second; return 0; diff --git a/src/input/Manager.h b/src/input/Manager.h index d15febe0d6..984fcf3841 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -1,6 +1,6 @@ // See the file "COPYING" in the main distribution directory for copyright. // -// Class for managing input streams +// Class for managing input streams. #ifndef INPUT_MANAGER_H #define INPUT_MANAGER_H @@ -16,7 +16,7 @@ namespace input { class ReaderFrontend; -class ReaderBackend; +class ReaderBackend; /** * Singleton class for managing input streams. @@ -25,58 +25,60 @@ class Manager { public: /** * Constructor. - */ + */ Manager(); /** * Destructor. */ ~Manager(); - + /** - * Creates a new input stream which will write the data from the data source into + * Creates a new input stream which will write the data from the data + * source into a table. * - * @param description A record of script type \c Input:StreamDescription. + * @param description A record of script type \c + * Input:StreamDescription. * * This method corresponds directly to the internal BiF defined in * input.bif, which just forwards here. - */ - bool CreateTableStream(RecordVal* description); + */ + bool CreateTableStream(RecordVal* description); /** * Creates a new input stream which sends events for read input data. * - * @param description A record of script type \c Input:StreamDescription. + * @param description A record of script type \c + * Input:StreamDescription. * * This method corresponds directly to the internal BiF defined in * input.bif, which just forwards here. - */ - bool CreateEventStream(RecordVal* description); - + */ + bool CreateEventStream(RecordVal* description); /** - * Force update on a input stream. - * Forces a re-read of the whole input source. - * Usually used, when an input stream is opened in managed mode. - * Otherwise, this can be used to trigger a input source check before a heartbeat message arrives. - * May be ignored by the reader. + * Force update on a input stream. Forces a re-read of the whole + * input source. Usually used when an input stream is opened in + * managed mode. Otherwise, this can be used to trigger a input + * source check before a heartbeat message arrives. May be ignored by + * the reader. * - * @param id The enum value corresponding the input stream. + * @param id The enum value corresponding the input stream. * * This method corresponds directly to the internal BiF defined in * input.bif, which just forwards here. */ bool ForceUpdate(const string &id); - + /** - * Deletes an existing input stream + * Deletes an existing input stream. * - * @param id The enum value corresponding the input stream. + * @param id The enum value corresponding the input stream. * * This method corresponds directly to the internal BiF defined in * input.bif, which just forwards here. */ - bool RemoveStream(const string &id); + bool RemoveStream(const string &id); protected: friend class ReaderFrontend; @@ -88,90 +90,100 @@ protected: friend class EndCurrentSendMessage; friend class ReaderClosedMessage; - // For readers to write to input stream in direct mode (reporting new/deleted values directly) - // Functions take ownership of threading::Value fields + // For readers to write to input stream in direct mode (reporting + // new/deleted values directly). Functions take ownership of + // threading::Value fields. void Put(ReaderFrontend* reader, threading::Value* *vals); void Clear(ReaderFrontend* reader); bool Delete(ReaderFrontend* reader, threading::Value* *vals); - // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) - // Functions take ownership of threading::Value fields + // For readers to write to input stream in indirect mode (manager is + // monitoring new/deleted values) Functions take ownership of + // threading::Value fields. void SendEntry(ReaderFrontend* reader, threading::Value* *vals); void EndCurrentSend(ReaderFrontend* reader); - - // Allows readers to directly send Bro events. - // The num_vals and vals must be the same the named event expects. - // Takes ownership of threading::Value fields + + // Allows readers to directly send Bro events. The num_vals and vals + // must be the same the named event expects. Takes ownership of + // threading::Value fields. bool SendEvent(const string& name, const int num_vals, threading::Value* *vals); // Instantiates a new ReaderBackend of the given type (note that - // doing so creates a new thread!). - ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); - - // Functions are called from the ReaderBackend to notify the manager, that a stream has been removed - // or a stream has been closed. - // Used to prevent race conditions where data for a specific stream is still in the queue when the - // RemoveStream directive is executed by the main thread. - // This makes sure all data that has ben queued for a stream is still received. + // doing so creates a new thread!). + ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); + + // Function called from the ReaderBackend to notify the manager that + // a stream has been removed or a stream has been closed. Used to + // prevent race conditions where data for a specific stream is still + // in the queue when the RemoveStream directive is executed by the + // main thread. This makes sure all data that has ben queued for a + // stream is still received. bool RemoveStreamContinuation(ReaderFrontend* reader); - + private: class Stream; class TableStream; class EventStream; - - bool CreateStream(Stream*, RecordVal* description); - // SendEntry implementation for Table stream - int SendEntryTable(Stream* i, const threading::Value* const *vals); + bool CreateStream(Stream*, RecordVal* description); - // Put implementation for Table stream - int PutTable(Stream* i, const threading::Value* const *vals); + // SendEntry implementation for Table stream. + int SendEntryTable(Stream* i, const threading::Value* const *vals); - // SendEntry and Put implementation for Event stream + // Put implementation for Table stream. + int PutTable(Stream* i, const threading::Value* const *vals); + + // SendEntry and Put implementation for Event stream. int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals); - // Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types - // from the log framework + // Checks that a Bro type can be used for data reading. The + // equivalend in threading cannot be used, because we have support + // different types from the log framework bool IsCompatibleType(BroType* t, bool atomic_only=false); - // Check if a record is made up of compatible types and return a list of all fields that are in the record in order. - // Recursively unrolls records + // Check if a record is made up of compatible types and return a list + // of all fields that are in the record in order. Recursively unrolls + // records bool UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend); // Send events - void SendEvent(EventHandlerPtr ev, const int numvals, ...); - void SendEvent(EventHandlerPtr ev, list events); + void SendEvent(EventHandlerPtr ev, const int numvals, ...); + void SendEvent(EventHandlerPtr ev, list events); - // Call predicate function and return result + // Call predicate function and return result. bool CallPred(Func* pred_func, const int numvals, ...); - // get a hashkey for a set of threading::Values + // Get a hashkey for a set of threading::Values. HashKey* HashValues(const int num_elements, const threading::Value* const *vals); - // Get the memory used by a specific value + // Get the memory used by a specific value. int GetValueLength(const threading::Value* val); - // Copies the raw data in a specific threading::Value to position sta + + // Copies the raw data in a specific threading::Value to position + // startpos. int CopyValue(char *data, const int startpos, const threading::Value* val); - // Convert Threading::Value to an internal Bro Type (works also with Records) + // Convert Threading::Value to an internal Bro Type (works also with + // Records). Val* ValueToVal(const threading::Value* val, BroType* request_type); - // Convert Threading::Value to an internal Bro List type + // Convert Threading::Value to an internal Bro List type. Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals); - // Converts a threading::value to a record type. mostly used by ValueToVal - RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position); + // Converts a threading::value to a record type. Mostly used by + // ValueToVal. + RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position); + Val* RecordValToIndexVal(RecordVal *r); - - // Converts a Bro ListVal to a RecordVal given the record type + + // Converts a Bro ListVal to a RecordVal given the record type. RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); Stream* FindStream(const string &name); Stream* FindStream(ReaderFrontend* reader); - enum StreamType { TABLE_FILTER, EVENT_FILTER }; - + enum StreamType { TABLE_STREAM, EVENT_STREAM }; + map readers; }; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index c625301383..328e0bc535 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -15,7 +15,7 @@ public: : threading::OutputMessage("Put", reader), val(val) {} - virtual bool Process() + virtual bool Process() { input_mgr->Put(Object(), val); return true; @@ -31,7 +31,7 @@ public: : threading::OutputMessage("Delete", reader), val(val) {} - virtual bool Process() + virtual bool Process() { return input_mgr->Delete(Object(), val); } @@ -45,7 +45,7 @@ public: ClearMessage(ReaderFrontend* reader) : threading::OutputMessage("Clear", reader) {} - virtual bool Process() + virtual bool Process() { input_mgr->Clear(Object()); return true; @@ -60,14 +60,14 @@ public: : threading::OutputMessage("SendEvent", reader), name(name), num_vals(num_vals), val(val) {} - virtual bool Process() + virtual bool Process() { bool success = input_mgr->SendEvent(name, num_vals, val); - if ( !success ) + if ( ! success ) reporter->Error("SendEvent for event %s failed", name.c_str()); - return true; // we do not want to die if sendEvent fails because the event did not return. + return true; // We do not want to die if sendEvent fails because the event did not return. } private: @@ -82,7 +82,7 @@ public: : threading::OutputMessage("SendEntry", reader), val(val) { } - virtual bool Process() + virtual bool Process() { input_mgr->SendEntry(Object(), val); return true; @@ -97,7 +97,7 @@ public: EndCurrentSendMessage(ReaderFrontend* reader) : threading::OutputMessage("EndCurrentSend", reader) {} - virtual bool Process() + virtual bool Process() { input_mgr->EndCurrentSend(Object()); return true; @@ -111,7 +111,7 @@ public: ReaderClosedMessage(ReaderFrontend* reader) : threading::OutputMessage("ReaderClosed", reader) {} - virtual bool Process() + virtual bool Process() { return input_mgr->RemoveStreamContinuation(Object()); } @@ -127,49 +127,46 @@ public: : threading::OutputMessage("Disable", writer) {} virtual bool Process() - { - Object()->SetDisable(); - return true; + { + Object()->SetDisable(); + return true; } }; ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { - buf = 0; - buf_len = 1024; disabled = true; // disabled will be set correcty in init. - frontend = arg_frontend; SetName(frontend->Name()); } -ReaderBackend::~ReaderBackend() - { +ReaderBackend::~ReaderBackend() + { } -void ReaderBackend::Put(Value* *val) +void ReaderBackend::Put(Value* *val) { SendOut(new PutMessage(frontend, val)); } -void ReaderBackend::Delete(Value* *val) +void ReaderBackend::Delete(Value* *val) { SendOut(new DeleteMessage(frontend, val)); } -void ReaderBackend::Clear() +void ReaderBackend::Clear() { SendOut(new ClearMessage(frontend)); } -void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) +void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) { SendOut(new SendEventMessage(frontend, name, num_vals, vals)); - } + } -void ReaderBackend::EndCurrentSend() +void ReaderBackend::EndCurrentSend() { SendOut(new EndCurrentSendMessage(frontend)); } @@ -179,19 +176,19 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, - const threading::Field* const* arg_fields) +bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, + const threading::Field* const* arg_fields) { source = arg_source; SetName("InputReader/"+source); num_fields = arg_num_fields; - fields = arg_fields; + fields = arg_fields; // disable if DoInit returns error. int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); - if ( !success ) + if ( ! success ) { Error("Init failed"); DisableFrontend(); @@ -202,30 +199,30 @@ bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, return success; } -void ReaderBackend::Close() +void ReaderBackend::Close() { DoClose(); disabled = true; DisableFrontend(); SendOut(new ReaderClosedMessage(frontend)); - if ( fields != 0 ) + if ( fields != 0 ) { - for ( unsigned int i = 0; i < num_fields; i++ ) + for ( unsigned int i = 0; i < num_fields; i++ ) delete(fields[i]); - delete[] (fields); + delete [] (fields); fields = 0; } } -bool ReaderBackend::Update() +bool ReaderBackend::Update() { - if ( disabled ) + if ( disabled ) return false; bool success = DoUpdate(); - if ( !success ) + if ( ! success ) DisableFrontend(); return success; @@ -233,8 +230,9 @@ bool ReaderBackend::Update() void ReaderBackend::DisableFrontend() { - disabled = true; - // we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now + // We also set disabled here, because there still may be other + // messages queued and we will dutifully ignore these from now. + disabled = true; SendOut(new DisableMessage(frontend)); } @@ -244,9 +242,9 @@ bool ReaderBackend::DoHeartbeat(double network_time, double current_time) return true; } -TransportProto ReaderBackend::StringToProto(const string &proto) +TransportProto ReaderBackend::StringToProto(const string &proto) { - if ( proto == "unknown" ) + if ( proto == "unknown" ) return TRANSPORT_UNKNOWN; else if ( proto == "tcp" ) return TRANSPORT_TCP; @@ -261,8 +259,8 @@ TransportProto ReaderBackend::StringToProto(const string &proto) } -// more or less verbose copy from IPAddr.cc -- which uses reporter -Value::addr_t ReaderBackend::StringToAddr(const string &s) +// More or less verbose copy from IPAddr.cc -- which uses reporter. +Value::addr_t ReaderBackend::StringToAddr(const string &s) { Value::addr_t val; @@ -270,9 +268,9 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s) { val.family = IPv4; - if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) + if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) { - Error(Fmt("Bad addres: %s", s.c_str())); + Error(Fmt("Bad address: %s", s.c_str())); memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr)); } @@ -283,7 +281,7 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s) val.family = IPv6; if ( inet_pton(AF_INET6, s.c_str(), val.in.in6.s6_addr) <=0 ) { - Error(Fmt("Bad IP address: %s", s.c_str())); + Error(Fmt("Bad address: %s", s.c_str())); memset(val.in.in6.s6_addr, 0, sizeof(val.in.in6.s6_addr)); } } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index b4d9101bc8..ae8437b08c 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -12,13 +12,13 @@ namespace input { class ReaderFrontend; /** - * Base class for reader implementation. When the input:Manager creates a - * new input stream, it instantiates a ReaderFrontend. That then in turn - * creates a ReaderBackend of the right type. The frontend then forwards - * message over the backend as its methods are called. + * Base class for reader implementation. When the input:Manager creates a new + * input stream, it instantiates a ReaderFrontend. That then in turn creates + * a ReaderBackend of the right type. The frontend then forwards messages + * over the backend as its methods are called. * - * All of this methods must be called only from the corresponding child - * thread (the constructor is the one exception.) + * All methods must be called only from the corresponding child thread (the + * constructor is the one exception.) */ class ReaderBackend : public threading::MsgThread { public: @@ -27,54 +27,51 @@ public: * * @param frontend The frontend reader that created this backend. The * *only* purpose of this value is to be passed back via messages as - * a argument to callbacks. One must not otherwise access the + * an argument to callbacks. One must not otherwise access the * frontend, it's running in a different thread. - * - * @param frontend pointer to the reader frontend - */ + */ ReaderBackend(ReaderFrontend* frontend); - + /** * Destructor. - */ + */ virtual ~ReaderBackend(); /** * One-time initialization of the reader to define the input source. * - * @param arg_source A string left to the interpretation of the reader - * implementation; it corresponds to the value configured on the - * script-level for the input stream. + * @param arg_source A string left to the interpretation of the + * reader implementation; it corresponds to the value configured on + * the script-level for the input stream. * - * @param num_fields The number of log fields for the stream. + * @param fields An array of size \a num_fields with the input + * fields. The method takes ownership of the array. * - * @param fields An array of size \a num_fields with the log fields. - * The methods takes ownership of the array. - * - * @param mode the opening mode for the input source + * @param mode The opening mode for the input source as one of the + * Input::Mode script constants. * - * @param arg_num_fields number of fields contained in \a fields + * @param arg_num_fields Number of fields contained in \a fields. * - * @param fields the types and names of the fields to be retrieved - * from the input source + * @param fields The types and names of the fields to be retrieved + * from the input source. * * @return False if an error occured. */ bool Init(string arg_source, int mode, int arg_num_fields, const threading::Field* const* fields); /** - * Finishes reading from this input stream in a regular fashion. Must not be - * called if an error has been indicated earlier. After calling this, - * no further reading from the stream can be performed + * Finishes reading from this input stream in a regular fashion. Must + * not be called if an error has been indicated earlier. After + * calling this, no further reading from the stream can be performed. * * @return False if an error occured. */ void Close(); /** - * Force trigger an update of the input stream. - * The action that will be taken depends on the current read mode and the - * individual input backend + * Force trigger an update of the input stream. The action that will + * be taken depends on the current read mode and the individual input + * backend. * * An backend can choose to ignore this. * @@ -84,16 +81,17 @@ public: /** * Disables the frontend that has instantiated this backend. Once - * disabled,the frontend will not send any further message over. + * disabled, the frontend will not send any further message over. */ - void DisableFrontend(); - + void DisableFrontend(); + protected: - // Methods that have to be overwritten by the individual readers - + // Methods that have to be overwritten by the individual readers + /** - * Reader-specific intialization method. Note that data may only be - * read from the input source after the Start function has been called. + * Reader-specific intialization method. Note that data may only be + * read from the input source after the Init() function has been + * called. * * A reader implementation must override this method. If it returns * false, it will be assumed that a fatal error has occured that @@ -105,39 +103,39 @@ protected: /** * Reader-specific method implementing input finalization at - * termination. + * termination. * * A reader implementation must override this method but it can just - * ignore calls if an input source must not be closed. + * ignore calls if an input source can't actually be closed. * - * After the method is called, the writer will be deleted. If an error occurs - * during shutdown, an implementation should also call Error() to indicate what - * happened. - */ + * After the method is called, the writer will be deleted. If an + * error occurs during shutdown, an implementation should also call + * Error() to indicate what happened. + */ virtual void DoClose() = 0; /** - * Reader-specific method implementing the forced update trigger + * Reader-specific method implementing the forced update trigger. * - * A reader implementation must override this method but it can just ignore - * calls, if a forced update does not fit the input source or the current input - * reading mode. + * A reader implementation must override this method but it can just + * ignore calls if a forced update does not fit the input source or + * the current input reading mode. * - * If it returns false, it will be assumed that a fatal error has occured - * that prevents the reader from further operation; it will then be - * disabled and eventually deleted. When returning false, an implementation - * should also call Error to indicate what happened. + * If it returns false, it will be assumed that a fatal error has + * occured that prevents the reader from further operation; it will + * then be disabled and eventually deleted. When returning false, an + * implementation should also call Error to indicate what happened. */ virtual bool DoUpdate() = 0; - + /** * Returns the input source as passed into the constructor. */ const string Source() const { return source; } /** - * Method allowing a reader to send a specified bro event. - * Vals must match the values expected by the bro event. + * Method allowing a reader to send a specified Bro event. Vals must + * match the values expected by the bro event. * * @param name name of the bro event to send * @@ -147,30 +145,33 @@ protected: */ void SendEvent(const string& name, const int num_vals, threading::Value* *vals); - // Content-sending-functions (simple mode). Including table-specific stuff that - // simply is not used if we have no table + // Content-sending-functions (simple mode). Including table-specific + // stuff that simply is not used if we have no table. + /** - * Method allowing a reader to send a list of values read for a specific stream - * back to the manager. + * Method allowing a reader to send a list of values read from a + * specific stream back to the manager in simple mode. * - * If the stream is a table stream, the values are inserted into the table; - * if it is an event stream, the event is raised. + * If the stream is a table stream, the values are inserted into the + * table; if it is an event stream, the event is raised. * * @param val list of threading::Values expected by the stream */ void Put(threading::Value* *val); /** - * Method allowing a reader to delete a specific value from a bro table. + * Method allowing a reader to delete a specific value from a Bro + * table. * - * If the receiving stream is an event stream, only a removed event is raised + * If the receiving stream is an event stream, only a removed event + * is raised. * * @param val list of threading::Values expected by the stream */ void Delete(threading::Value* *val); /** - * Method allowing a reader to clear a value from a bro table. + * Method allowing a reader to clear a Bro table. * * If the receiving stream is an event stream, this is ignored. * @@ -178,26 +179,25 @@ protected: void Clear(); // Content-sending-functions (tracking mode): Only changed lines are propagated. - /** - * Method allowing a reader to send a list of values read for a specific stream - * back to the manager. + * Method allowing a reader to send a list of values read from + * specific stream back to the manager in tracking mode. * - * If the stream is a table stream, the values are inserted into the table; - * if it is an event stream, the event is raised. + * If the stream is a table stream, the values are inserted into the + * table; if it is an event stream, the event is raised. * * @param val list of threading::Values expected by the stream */ void SendEntry(threading::Value* *vals); /** - * Method telling the manager, that the current list of entries sent by SendEntry - * is finished. - * - * For table streams, all entries that were not updated since the last EndCurrentSend - * will be deleted, because they are no longer present in the input source + * Method telling the manager, that the current list of entries sent + * by SendEntry is finished. * + * For table streams, all entries that were not updated since the + * last EndCurrentSend will be deleted, because they are no longer + * present in the input source */ void EndCurrentSend(); @@ -207,14 +207,14 @@ protected: * This method can be overridden but once must call * ReaderBackend::DoHeartbeat(). */ - virtual bool DoHeartbeat(double network_time, double current_time); + virtual bool DoHeartbeat(double network_time, double current_time); /** * Utility function for Readers - convert a string into a TransportProto * * @param proto the transport protocol */ - TransportProto StringToProto(const string &proto); + TransportProto StringToProto(const string &proto); /** * Utility function for Readers - convert a string into a Value::addr_t @@ -224,20 +224,16 @@ protected: threading::Value::addr_t StringToAddr(const string &addr); private: - // Frontend that instantiated us. This object must not be access from - // this class, it's running in a different thread! - ReaderFrontend* frontend; + // Frontend that instantiated us. This object must not be accessed + // from this class, it's running in a different thread! + ReaderFrontend* frontend; string source; - + bool disabled; - // For implementing Fmt(). - char* buf; - unsigned int buf_len; - unsigned int num_fields; - const threading::Field* const * fields; // raw mapping + const threading::Field* const * fields; // raw mapping }; } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index f61fd357b9..75bb7fec50 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -12,11 +12,15 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const string source, const int mode, const int num_fields, const threading::Field* const* fields) + InitMessage(ReaderBackend* backend, const string source, const int mode, + const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), source(source), mode(mode), num_fields(num_fields), fields(fields) { } - virtual bool Process() { return Object()->Init(source, mode, num_fields, fields); } + virtual bool Process() + { + return Object()->Init(source, mode, num_fields, fields); + } private: const string source; @@ -46,7 +50,7 @@ public: }; -ReaderFrontend::ReaderFrontend(bro_int_t type) +ReaderFrontend::ReaderFrontend(bro_int_t type) { disabled = initialized = false; ty_name = ""; @@ -56,12 +60,12 @@ ReaderFrontend::ReaderFrontend(bro_int_t type) backend->Start(); } -ReaderFrontend::~ReaderFrontend() +ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, - const threading::Field* const* fields) +void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, + const threading::Field* const* fields) { if ( disabled ) return; @@ -73,14 +77,14 @@ void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, initialized = true; backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); - } + } -void ReaderFrontend::Update() +void ReaderFrontend::Update() { - if ( disabled ) + if ( disabled ) return; - if ( !initialized ) + if ( ! initialized ) { reporter->Error("Tried to call update on uninitialized reader"); return; @@ -89,12 +93,12 @@ void ReaderFrontend::Update() backend->SendIn(new UpdateMessage(backend)); } -void ReaderFrontend::Close() +void ReaderFrontend::Close() { - if ( disabled ) + if ( disabled ) return; - - if ( !initialized ) + + if ( ! initialized ) { reporter->Error("Tried to call finish on uninitialized reader"); return; diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 88cf60804e..c18e22a064 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -29,14 +29,14 @@ public: * corresponding type. * * Frontends must only be instantiated by the main thread. - */ + */ ReaderFrontend(bro_int_t type); /** * Destructor. * * Frontends must only be destroyed by the main thread. - */ + */ virtual ~ReaderFrontend(); /** @@ -47,37 +47,39 @@ public: * sends a message back that will asynchronously call Disable(). * * See ReaderBackend::Init() for arguments. + * * This method must only be called from the main thread. - */ + */ void Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* fields); /** - * Force an update of the current input source. Actual action depends on - * the opening mode and on the input source. + * Force an update of the current input source. Actual action depends + * on the opening mode and on the input source. * * This method generates a message to the backend reader and triggers * the corresponding message there. + * * This method must only be called from the main thread. */ void Update(); /** - * Finalizes writing to this tream. + * Finalizes reading from this stream. * * This method generates a message to the backend reader and triggers - * the corresponding message there. - * This method must only be called from the main thread. - */ + * the corresponding message there. This method must only be called + * from the main thread. + */ void Close(); /** * Disables the reader frontend. From now on, all method calls that * would normally send message over to the backend, turn into no-ops. - * Note though that it does not stop the backend itself, use Finsh() + * Note though that it does not stop the backend itself, use Finish() * to do that as well (this method is primarily for use as callback * when the backend wants to disable the frontend). * - * Disabled frontend will eventually be discarded by the + * Disabled frontends will eventually be discarded by the * input::Manager. * * This method must only be called from the main thread. @@ -85,9 +87,10 @@ public: void SetDisable() { disabled = true; } /** - * Returns true if the reader frontend has been disabled with SetDisable(). + * Returns true if the reader frontend has been disabled with + * SetDisable(). */ - bool Disabled() { return disabled; } + bool Disabled() { return disabled; } /** * Returns a descriptive name for the reader, including the type of @@ -101,18 +104,21 @@ protected: friend class Manager; /** - * Returns the source as passed into the constructor + * Returns the source as passed into the constructor. */ - const string Source() const { return source; }; + const string& Source() const { return source; }; - string ty_name; // Name of the backend type. Set by the manager. + /** + * Returns the name of the backend's type. + */ + const string& TypeName() const { return ty_name; } private: - ReaderBackend* backend; // The backend we have instanatiated. + ReaderBackend* backend; // The backend we have instanatiated. string source; + string ty_name; // Backend type, set by manager. bool disabled; // True if disabled. - bool initialized; // True if initialized. - + bool initialized; // True if initialized. }; } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 8223d6e201..157ea90916 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -20,8 +20,7 @@ using namespace input::reader; using threading::Value; using threading::Field; - -FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) +FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) : name(arg_name), type(arg_type) { position = arg_position; @@ -29,8 +28,8 @@ FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int present = true; } -FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, - const TypeTag& arg_subtype, int arg_position) +FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, + const TypeTag& arg_subtype, int arg_position) : name(arg_name), type(arg_type), subtype(arg_subtype) { position = arg_position; @@ -38,14 +37,14 @@ FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, present = true; } -FieldMapping::FieldMapping(const FieldMapping& arg) +FieldMapping::FieldMapping(const FieldMapping& arg) : name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present) { position = arg.position; secondary_position = arg.secondary_position; } -FieldMapping FieldMapping::subType() +FieldMapping FieldMapping::subType() { return FieldMapping(name, subtype, position); } @@ -54,23 +53,23 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; - - separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), + separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), BifConst::InputAscii::separator->Len()); - if ( separator.size() != 1 ) + + if ( separator.size() != 1 ) Error("separator length has to be 1. Separator will be truncated."); set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), BifConst::InputAscii::set_separator->Len()); - if ( set_separator.size() != 1 ) + + if ( set_separator.size() != 1 ) Error("set_separator length has to be 1. Separator will be truncated."); - empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), + empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), BifConst::InputAscii::empty_field->Len()); - - unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), - BifConst::InputAscii::unset_field->Len()); + unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), + BifConst::InputAscii::unset_field->Len()); } Ascii::~Ascii() @@ -80,7 +79,7 @@ Ascii::~Ascii() void Ascii::DoClose() { - if ( file != 0 ) + if ( file != 0 ) { file->close(); delete(file); @@ -93,26 +92,26 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c fname = path; mode = arg_mode; mtime = 0; - + num_fields = arg_num_fields; fields = arg_fields; - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) ) { Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); return false; - } + } file = new ifstream(path.c_str()); - if ( !file->is_open() ) + if ( ! file->is_open() ) { Error(Fmt("Init: cannot open %s", fname.c_str())); delete(file); file = 0; return false; } - - if ( ReadHeader(false) == false ) + + if ( ReadHeader(false) == false ) { Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str())); file->close(); @@ -120,22 +119,22 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c file = 0; return false; } - + DoUpdate(); return true; } -bool Ascii::ReadHeader(bool useCached) +bool Ascii::ReadHeader(bool useCached) { // try to read the header line... string line; map ifields; - if ( !useCached ) + if ( ! useCached ) { - if ( !GetLine(line) ) + if ( ! GetLine(line) ) { Error("could not read first line"); return false; @@ -143,16 +142,17 @@ bool Ascii::ReadHeader(bool useCached) headerline = line; } - else + + else line = headerline; - + // construct list of field names. istringstream splitstream(line); int pos=0; - while ( splitstream ) + while ( splitstream ) { string s; - if ( !getline(splitstream, s, separator[0])) + if ( ! getline(splitstream, s, separator[0])) break; ifields[s] = pos; @@ -161,15 +161,15 @@ bool Ascii::ReadHeader(bool useCached) //printf("Updating fields from description %s\n", line.c_str()); columnMap.clear(); - - for ( unsigned int i = 0; i < num_fields; i++ ) + + for ( unsigned int i = 0; i < num_fields; i++ ) { const Field* field = fields[i]; - - map::iterator fit = ifields.find(field->name); - if ( fit == ifields.end() ) + + map::iterator fit = ifields.find(field->name); + if ( fit == ifields.end() ) { - if ( field->optional ) + if ( field->optional ) { // we do not really need this field. mark it as not present and always send an undef back. FieldMapping f(field->name, field->type, field->subtype, -1); @@ -178,38 +178,43 @@ bool Ascii::ReadHeader(bool useCached) continue; } - Error(Fmt("Did not find requested field %s in input data file %s.", field->name.c_str(), fname.c_str())); + Error(Fmt("Did not find requested field %s in input data file %s.", + field->name.c_str(), fname.c_str())); return false; } FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); - if ( field->secondary_name != "" ) + + if ( field->secondary_name != "" ) { - map::iterator fit2 = ifields.find(field->secondary_name); - if ( fit2 == ifields.end() ) + map::iterator fit2 = ifields.find(field->secondary_name); + if ( fit2 == ifields.end() ) { - Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str())); + Error(Fmt("Could not find requested port type field %s in input data file.", + field->secondary_name.c_str())); return false; } + f.secondary_position = ifields[field->secondary_name]; } + columnMap.push_back(f); } - + // well, that seems to have worked... return true; } -bool Ascii::GetLine(string& str) +bool Ascii::GetLine(string& str) { while ( getline(*file, str) ) { - if ( str[0] != '#' ) + if ( str[0] != '#' ) return true; - if ( str.compare(0,8, "#fields\t") == 0 ) + if ( str.compare(0,8, "#fields\t") == 0 ) { str = str.substr(8); return true; @@ -220,14 +225,13 @@ bool Ascii::GetLine(string& str) } -Value* Ascii::EntryToVal(string s, FieldMapping field) +Value* Ascii::EntryToVal(string s, FieldMapping field) { - if ( s.compare(unset_field) == 0 ) // field is not set... return new Value(field.type, false); Value* val = new Value(field.type, true); - + switch ( field.type ) { case TYPE_ENUM: case TYPE_STRING: @@ -235,13 +239,14 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) break; case TYPE_BOOL: - if ( s == "T" ) + if ( s == "T" ) val->val.int_val = 1; - else if ( s == "F" ) + else if ( s == "F" ) val->val.int_val = 0; - else + else { - Error(Fmt("Field: %s Invalid value for boolean: %s", field.name.c_str(), s.c_str())); + Error(Fmt("Field: %s Invalid value for boolean: %s", + field.name.c_str(), s.c_str())); return false; } break; @@ -266,13 +271,15 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) val->val.port_val.proto = TRANSPORT_UNKNOWN; break; - case TYPE_SUBNET: + case TYPE_SUBNET: { size_t pos = s.find("/"); - if ( pos == s.npos ) { + if ( pos == s.npos ) + { Error(Fmt("Invalid value for subnet: %s", s.c_str())); return false; - } + } + int width = atoi(s.substr(pos+1).c_str()); string addr = s.substr(0, pos); @@ -281,7 +288,7 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) break; } - case TYPE_ADDR: + case TYPE_ADDR: val->val.addr_val = StringToAddr(s); break; @@ -295,42 +302,42 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) // how many entries do we have... unsigned int length = 1; for ( unsigned int i = 0; i < s.size(); i++ ) - if ( s[i] == ',') length++; + if ( s[i] == ',' ) length++; unsigned int pos = 0; - - if ( s.compare(empty_field) == 0 ) + + if ( s.compare(empty_field) == 0 ) length = 0; Value** lvals = new Value* [length]; - if ( field.type == TYPE_TABLE ) + if ( field.type == TYPE_TABLE ) { val->val.set_val.vals = lvals; val->val.set_val.size = length; } - else if ( field.type == TYPE_VECTOR ) + + else if ( field.type == TYPE_VECTOR ) { val->val.vector_val.vals = lvals; val->val.vector_val.size = length; - } - else - { - assert(false); } + else + assert(false); + if ( length == 0 ) break; //empty istringstream splitstream(s); - while ( splitstream ) + while ( splitstream ) { string element; - if ( !getline(splitstream, element, set_separator[0]) ) + if ( ! getline(splitstream, element, set_separator[0]) ) break; - if ( pos >= length ) + if ( pos >= length ) { Error(Fmt("Internal error while parsing set. pos %d >= length %d." " Element: %s", pos, length, element.c_str())); @@ -338,18 +345,18 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) } Value* newval = EntryToVal(element, field.subType()); - if ( newval == 0 ) + if ( newval == 0 ) { Error("Error while reading set"); return 0; } + lvals[pos] = newval; pos++; } - - if ( pos != length ) + if ( pos != length ) { Error("Internal error while parsing set: did not find all elements"); return 0; @@ -358,24 +365,24 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) break; } - default: Error(Fmt("unsupported field format %d for %s", field.type, field.name.c_str())); return 0; - } + } return val; } // read the entire file and send appropriate thingies back to InputMgr -bool Ascii::DoUpdate() +bool Ascii::DoUpdate() { switch ( mode ) { case REREAD: + { // check if the file has changed struct stat sb; - if ( stat(fname.c_str(), &sb) == -1 ) + if ( stat(fname.c_str(), &sb) == -1 ) { Error(Fmt("Could not get stat for %s", fname.c_str())); return false; @@ -388,54 +395,58 @@ bool Ascii::DoUpdate() // file changed. reread. // fallthrough + } + case MANUAL: case STREAM: - - // dirty, fix me. (well, apparently after trying seeking, etc + { + // dirty, fix me. (well, apparently after trying seeking, etc // - this is not that bad) - if ( file && file->is_open() ) + if ( file && file->is_open() ) { - if ( mode == STREAM ) + if ( mode == STREAM ) { file->clear(); // remove end of file evil bits - if ( !ReadHeader(true) ) + if ( !ReadHeader(true) ) return false; // header reading failed break; } file->close(); } + file = new ifstream(fname.c_str()); - if ( !file->is_open() ) + if ( !file->is_open() ) { Error(Fmt("cannot open %s", fname.c_str())); return false; } - - if ( ReadHeader(false) == false ) + if ( ReadHeader(false) == false ) { return false; } break; + } + default: assert(false); } string line; - while ( GetLine(line ) ) + while ( GetLine(line ) ) { // split on tabs istringstream splitstream(line); map stringfields; int pos = 0; - while ( splitstream ) + while ( splitstream ) { string s; - if ( !getline(splitstream, s, separator[0]) ) + if ( ! getline(splitstream, s, separator[0]) ) break; stringfields[pos] = s; @@ -444,7 +455,6 @@ bool Ascii::DoUpdate() pos--; // for easy comparisons of max element. - Value** fields = new Value*[num_fields]; int fpos = 0; @@ -453,33 +463,34 @@ bool Ascii::DoUpdate() fit++ ) { - if ( ! fit->present ) + if ( ! fit->present ) { // add non-present field fields[fpos] = new Value((*fit).type, false); fpos++; continue; } - + assert(fit->position >= 0 ); - if ( (*fit).position > pos || (*fit).secondary_position > pos ) + if ( (*fit).position > pos || (*fit).secondary_position > pos ) { - Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); + Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", + line.c_str(), pos, (*fit).position, (*fit).secondary_position)); return false; } Value* val = EntryToVal(stringfields[(*fit).position], *fit); - if ( val == 0 ) + if ( val == 0 ) { Error("Could not convert String value to Val"); return false; } - - if ( (*fit).secondary_position != -1 ) + + if ( (*fit).secondary_position != -1 ) { // we have a port definition :) - assert(val->type == TYPE_PORT ); + assert(val->type == TYPE_PORT ); // Error(Fmt("Got type %d != PORT with secondary position!", val->type)); val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]); @@ -493,31 +504,33 @@ bool Ascii::DoUpdate() //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); assert ( (unsigned int) fpos == num_fields ); - if ( mode == STREAM ) + if ( mode == STREAM ) Put(fields); else SendEntry(fields); } - if ( mode != STREAM ) + if ( mode != STREAM ) EndCurrentSend(); - + return true; } bool Ascii::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); - + switch ( mode ) { case MANUAL: // yay, we do nothing :) break; + case REREAD: case STREAM: - Update(); // call update and not DoUpdate, because update + Update(); // call update and not DoUpdate, because update // checks disabled. break; + default: assert(false); } diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index e5f3070724..e5540c5467 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -14,73 +14,57 @@ namespace input { namespace reader { struct FieldMapping { string name; TypeTag type; - // internal type for sets and vectors - TypeTag subtype; + TypeTag subtype; // internal type for sets and vectors int position; - // for ports: pos of the second field - int secondary_position; + int secondary_position; // for ports: pos of the second field bool present; - FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position); - FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position); + FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position); + FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position); FieldMapping(const FieldMapping& arg); FieldMapping() { position = -1; secondary_position = -1; } FieldMapping subType(); - //bool IsEmpty() { return position == -1; } }; - class Ascii : public ReaderBackend { public: - Ascii(ReaderFrontend* frontend); - ~Ascii(); - - static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } - + Ascii(ReaderFrontend* frontend); + ~Ascii(); + + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } + protected: - virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); - virtual void DoClose(); - virtual bool DoUpdate(); private: - virtual bool DoHeartbeat(double network_time, double current_time); - unsigned int num_fields; - - const threading::Field* const * fields; // raw mapping - - // map columns in the file to columns to send back to the manager - vector columnMap; - bool ReadHeader(bool useCached); + bool GetLine(string& str); threading::Value* EntryToVal(string s, FieldMapping type); - bool GetLine(string& str); - + unsigned int num_fields; + const threading::Field* const *fields; // raw mapping + ifstream* file; string fname; + int mode; + time_t mtime; - // Options set from the script-level. - string separator; + // map columns in the file to columns to send back to the manager + vector columnMap; - string set_separator; - - string empty_field; - - string unset_field; - // keep a copy of the headerline to determine field locations when stream descriptions change string headerline; - int mode; - - time_t mtime; - + // Options set from the script-level. + string separator; + string set_separator; + string empty_field; + string unset_field; }; diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 29f0070fec..c6cc1649eb 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -23,15 +23,14 @@ using threading::Field; Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) { - multiplication_factor = double(BifConst::InputBenchmark::factor); - autospread = double(BifConst::InputBenchmark::autospread); + multiplication_factor = double(BifConst::InputBenchmark::factor); + autospread = double(BifConst::InputBenchmark::autospread); spread = int(BifConst::InputBenchmark::spread); add = int(BifConst::InputBenchmark::addfactor); autospread_time = 0; stopspreadat = int(BifConst::InputBenchmark::stopspreadat); timedspread = double(BifConst::InputBenchmark::timedspread); heart_beat_interval = double(BifConst::Threading::heart_beat_interval); - } Benchmark::~Benchmark() @@ -46,15 +45,15 @@ void Benchmark::DoClose() bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) { mode = arg_mode; - + num_fields = arg_num_fields; fields = arg_fields; num_lines = atoi(path.c_str()); - + if ( autospread != 0.0 ) autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) ) { Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); return false; @@ -66,7 +65,7 @@ bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Fiel return true; } -string Benchmark::RandomString(const int len) +string Benchmark::RandomString(const int len) { string s(len, ' '); @@ -75,13 +74,13 @@ string Benchmark::RandomString(const int len) "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; - for (int i = 0; i < len; ++i) - s[i] = values[rand() / (RAND_MAX / sizeof(values))]; + for (int i = 0; i < len; ++i) + s[i] = values[rand() / (RAND_MAX / sizeof(values))]; return s; } -double Benchmark::CurrTime() +double Benchmark::CurrTime() { struct timeval tv; assert ( gettimeofday(&tv, 0) >= 0 ); @@ -91,56 +90,57 @@ double Benchmark::CurrTime() // read the entire file and send appropriate thingies back to InputMgr -bool Benchmark::DoUpdate() +bool Benchmark::DoUpdate() { - int linestosend = num_lines * heart_beat_interval; - for ( int i = 0; i < linestosend; i++ ) + int linestosend = num_lines * heart_beat_interval; + for ( int i = 0; i < linestosend; i++ ) { Value** field = new Value*[num_fields]; - for (unsigned int j = 0; j < num_fields; j++ ) + for (unsigned int j = 0; j < num_fields; j++ ) field[j] = EntryToVal(fields[j]->type, fields[j]->subtype); - if ( mode == STREAM ) + if ( mode == STREAM ) // do not do tracking, spread out elements over the second that we have... Put(field); - else + else SendEntry(field); - - if ( stopspreadat == 0 || num_lines < stopspreadat ) + + if ( stopspreadat == 0 || num_lines < stopspreadat ) { - if ( spread != 0 ) + if ( spread != 0 ) usleep(spread); - if ( autospread_time != 0 ) + if ( autospread_time != 0 ) usleep( autospread_time ); } - if ( timedspread != 0.0 ) + if ( timedspread != 0.0 ) { double diff; - do + do diff = CurrTime() - heartbeatstarttime; - while ( diff/heart_beat_interval < i/(linestosend + while ( diff/heart_beat_interval < i/(linestosend + (linestosend * timedspread) ) ); } } - if ( mode != STREAM ) + if ( mode != STREAM ) EndCurrentSend(); return true; } -threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) +threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { Value* val = new Value(type, true); // basically construct something random from the fields that we want. - + switch ( type ) { case TYPE_ENUM: assert(false); // no enums, please. + case TYPE_STRING: val->val.string_val = new string(RandomString(10)); break; @@ -172,14 +172,14 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) val->val.port_val.proto = TRANSPORT_UNKNOWN; break; - case TYPE_SUBNET: + case TYPE_SUBNET: { val->val.subnet_val.prefix = StringToAddr("192.168.17.1"); val->val.subnet_val.length = 16; } break; - case TYPE_ADDR: + case TYPE_ADDR: val->val.addr_val = StringToAddr("192.168.17.1"); break; @@ -195,26 +195,26 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) Value** lvals = new Value* [length]; - if ( type == TYPE_TABLE ) + if ( type == TYPE_TABLE ) { val->val.set_val.vals = lvals; val->val.set_val.size = length; - } - else if ( type == TYPE_VECTOR ) + } + else if ( type == TYPE_VECTOR ) { val->val.vector_val.vals = lvals; val->val.vector_val.size = length; - } - else + } + else assert(false); if ( length == 0 ) break; //empty - for ( unsigned int pos = 0; pos < length; pos++ ) + for ( unsigned int pos = 0; pos < length; pos++ ) { Value* newval = EntryToVal(subtype, TYPE_ENUM); - if ( newval == 0 ) + if ( newval == 0 ) { Error("Error while reading set"); return 0; @@ -229,7 +229,7 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) default: Error(Fmt("unsupported field format %d", type)); return 0; - } + } return val; @@ -247,9 +247,10 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) case MANUAL: // yay, we do nothing :) break; + case REREAD: case STREAM: - if ( multiplication_factor != 1 || add != 0 ) + if ( multiplication_factor != 1 || add != 0 ) { // we have to document at what time we changed the factor to what value. Value** v = new Value*[2]; @@ -261,10 +262,10 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) SendEvent("lines_changed", 2, v); } - if ( autospread != 0.0 ) + if ( autospread != 0.0 ) // because executing this in every loop is apparently too expensive. autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); - + Update(); // call update and not DoUpdate, because update actually checks disabled. SendEvent("HeartbeatDone", 0, 0); @@ -275,4 +276,3 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) return true; } - diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index b791dabe21..ec14dc6567 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -3,41 +3,37 @@ #ifndef INPUT_READERS_BENCHMARK_H #define INPUT_READERS_BENCHMARK_H - #include "../ReaderBackend.h" namespace input { namespace reader { +/** + * A benchmark reader to measure performance of the input framework. + */ class Benchmark : public ReaderBackend { public: - Benchmark(ReaderFrontend* frontend); - ~Benchmark(); - - static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } - + Benchmark(ReaderFrontend* frontend); + ~Benchmark(); + + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } + protected: - virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); - virtual void DoClose(); - virtual bool DoUpdate(); private: - virtual bool DoHeartbeat(double network_time, double current_time); - unsigned int num_fields; - double CurrTime(); - - const threading::Field* const * fields; // raw mapping - + string RandomString(const int len); threading::Value* EntryToVal(TypeTag Type, TypeTag subtype); + unsigned int num_fields; + const threading::Field* const * fields; // raw mapping + int mode; int num_lines; - double multiplication_factor; int spread; double autospread; @@ -47,9 +43,6 @@ private: double heartbeatstarttime; double timedspread; double heart_beat_interval; - - string RandomString(const int len); - }; diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index ce0b4f8a5f..6538da070b 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -28,8 +28,10 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) file = 0; in = 0; - separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); - if ( separator.size() != 1 ) + separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), + BifConst::InputRaw::record_separator->Len()); + + if ( separator.size() != 1 ) Error("separator length has to be 1. Separator will be truncated."); } @@ -40,57 +42,56 @@ Raw::~Raw() void Raw::DoClose() { - if ( file != 0 ) + if ( file != 0 ) { Close(); } } -bool Raw::Open() +bool Raw::Open() { - if ( execute ) + if ( execute ) { file = popen(fname.c_str(), "r"); - if ( file == NULL ) + if ( file == NULL ) { Error(Fmt("Could not execute command %s", fname.c_str())); return false; } } - else + else { file = fopen(fname.c_str(), "r"); - if ( file == NULL ) + if ( file == NULL ) { Error(Fmt("Init: cannot open %s", fname.c_str())); return false; } } - + + // This is defined in input/fdstream.h in = new boost::fdistream(fileno(file)); - if ( execute && mode == STREAM ) - { + if ( execute && mode == STREAM ) fcntl(fileno(file), F_SETFL, O_NONBLOCK); - } return true; } bool Raw::Close() { - if ( file == NULL ) + if ( file == NULL ) { InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); return false; } - if ( execute ) + if ( execute ) { delete(in); pclose(file); - } - else + } + else { delete(in); fclose(file); @@ -114,13 +115,13 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con num_fields = arg_num_fields; fields = arg_fields; - if ( path.length() == 0 ) + if ( path.length() == 0 ) { Error("No source path provided"); return false; } - - if ( arg_num_fields != 1 ) + + if ( arg_num_fields != 1 ) { Error("Filter for raw reader contains more than one field. " "Filters for the raw reader may only contain exactly one string field. " @@ -128,7 +129,7 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con return false; } - if ( fields[0]->type != TYPE_STRING ) + if ( fields[0]->type != TYPE_STRING ) { Error("Filter for raw reader contains a field that is not of type string."); return false; @@ -136,30 +137,32 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con // do Initialization char last = path[path.length()-1]; - if ( last == '|' ) + if ( last == '|' ) { execute = true; fname = path.substr(0, fname.length() - 1); - if ( ( mode != MANUAL ) && ( mode != STREAM ) ) { - Error(Fmt("Unsupported read mode %d for source %s in execution mode", mode, fname.c_str())); + if ( (mode != MANUAL) && (mode != STREAM) ) { + Error(Fmt("Unsupported read mode %d for source %s in execution mode", + mode, fname.c_str())); return false; - } - + } + result = Open(); } else { execute = false; - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + if ( (mode != MANUAL) && (mode != REREAD) && (mode != STREAM) ) { - Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str())); + Error(Fmt("Unsupported read mode %d for source %s", + mode, fname.c_str())); return false; } - result = Open(); + result = Open(); } - if ( result == false ) + if ( result == false ) return result; #ifdef DEBUG @@ -176,80 +179,78 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con } -bool Raw::GetLine(string& str) +bool Raw::GetLine(string& str) { - if ( in->peek() == std::iostream::traits_type::eof() ) + if ( in->peek() == std::iostream::traits_type::eof() ) return false; - if ( in->eofbit == true || in->failbit == true ) + if ( in->eofbit == true || in->failbit == true ) return false; - while ( getline(*in, str, separator[0]) ) - return true; - - return false; + return getline(*in, str, separator[0]); } - // read the entire file and send appropriate thingies back to InputMgr -bool Raw::DoUpdate() +bool Raw::DoUpdate() { - if ( firstrun ) + if ( firstrun ) firstrun = false; + else { switch ( mode ) { - case REREAD: + case REREAD: + { + // check if the file has changed + struct stat sb; + if ( stat(fname.c_str(), &sb) == -1 ) { - // check if the file has changed - struct stat sb; - if ( stat(fname.c_str(), &sb) == -1 ) - { - Error(Fmt("Could not get stat for %s", fname.c_str())); - return false; - } - - if ( sb.st_mtime <= mtime ) - // no change - return true; - - mtime = sb.st_mtime; - // file changed. reread. - - // fallthrough + Error(Fmt("Could not get stat for %s", fname.c_str())); + return false; } - case MANUAL: - case STREAM: - if ( mode == STREAM && file != NULL && in != NULL ) - { - //fpurge(file); - in->clear(); // remove end of file evil bits - break; - } - Close(); - if ( !Open() ) - return false; + if ( sb.st_mtime <= mtime ) + // no change + return true; + mtime = sb.st_mtime; + // file changed. reread. + // + // fallthrough + } + + case MANUAL: + case STREAM: + if ( mode == STREAM && file != NULL && in != NULL ) + { + //fpurge(file); + in->clear(); // remove end of file evil bits break; - default: - assert(false); + } + Close(); + if ( ! Open() ) + return false; + + break; + + default: + assert(false); } } string line; - while ( GetLine(line) ) + while ( GetLine(line) ) { assert (num_fields == 1); - + Value** fields = new Value*[1]; // filter has exactly one text field. convert to it. Value* val = new Value(TYPE_STRING, true); val->val.string_val = new string(line); fields[0] = val; - + Put(fields); } @@ -260,7 +261,6 @@ bool Raw::DoUpdate() return true; } - bool Raw::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); @@ -269,10 +269,11 @@ bool Raw::DoHeartbeat(double network_time, double current_time) case MANUAL: // yay, we do nothing :) break; + case REREAD: case STREAM: - Update(); // call update and not DoUpdate, because update - // checks disabled. + Update(); // call update and not DoUpdate, because update + // checks disabled. break; default: assert(false); diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 9f575bb89c..3fa09309b0 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -10,51 +10,44 @@ namespace input { namespace reader { +/** + * A reader that returns a file (or the output of a command) as a single + * blob. + */ class Raw : public ReaderBackend { public: - Raw(ReaderFrontend* frontend); - ~Raw(); - - static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } - + Raw(ReaderFrontend* frontend); + ~Raw(); + + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } + protected: - virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields); - virtual void DoClose(); - virtual bool DoUpdate(); private: - virtual bool DoHeartbeat(double network_time, double current_time); + bool Open(); bool Close(); - bool GetLine(string& str); - + + unsigned int num_fields; + const threading::Field* const * fields; // raw mapping + istream* in; - FILE* file; - string fname; - - // Options set from the script-level. - string separator; - int mode; bool execute; bool firstrun; - time_t mtime; - - unsigned int num_fields; - - const threading::Field* const * fields; // raw mapping + // Options set from the script-level. + string separator; }; - } } diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out index 8611b35dd3..a38f3fce84 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out @@ -6,4 +6,4 @@ print outfile, s; close(outfile); }] Input::EVENT_NEW - 8 ../input.log +8 ../input.log diff --git a/testing/btest/scripts/base/frameworks/input/executeraw.bro b/testing/btest/scripts/base/frameworks/input/executeraw.bro index 6d07a9bf29..6df28d08ea 100644 --- a/testing/btest/scripts/base/frameworks/input/executeraw.bro +++ b/testing/btest/scripts/base/frameworks/input/executeraw.bro @@ -1,6 +1,7 @@ # # @TEST-EXEC: btest-bg-run bro bro -b %INPUT # @TEST-EXEC: btest-bg-wait -k 1 +# @TEST-EXEC: cat out.tmp | sed 's/^ *//g' >out # @TEST-EXEC: btest-diff out @TEST-START-FILE input.log @@ -31,7 +32,7 @@ event line(description: Input::EventDescription, tpe: Input::Event, s: string) { event bro_init() { - outfile = open ("../out"); + outfile = open ("../out.tmp"); Input::add_event([$source="wc -l ../input.log |", $reader=Input::READER_RAW, $name="input", $fields=Val, $ev=line]); Input::remove("input"); }