diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 6b179f66a1..0fde16b87d 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -24,25 +24,30 @@ using threading::Value; using threading::Field; /** - * InputHashes are used as Dictionaries to store the value and index hashes for all lines currently stored in a table. - * Index hash is stored as HashKey*, because it is thrown into other Bro functions that need the complex structure of it. - * For everything we do (with values), we just take the hash_t value and compare it directly with == + * InputHashes are used as Dictionaries to store the value and index hashes for all + * lines currently stored in a table. Index hash is stored as HashKey*, because it is + * thrown into other Bro functions that need the complex structure of it. + * For everything we do (with values), we just take the hash_t value and compare it + * directly with == */ -struct InputHash { +struct InputHash + { hash_t valhash; HashKey* idxkey; ~InputHash(); -}; + }; -InputHash::~InputHash() { +InputHash::~InputHash() + { if ( idxkey ) delete idxkey; -} + } -static void input_hash_delete_func(void* val) { +static void input_hash_delete_func(void* val) + { InputHash* h = (InputHash*) val; delete h; -} + } declare(PDict, InputHash); @@ -68,14 +73,16 @@ public: virtual ~Stream(); }; -Manager::Stream::Stream() { +Manager::Stream::Stream() + { type = 0; reader = 0; description = 0; removed = false; -} + } -Manager::Stream::~Stream() { +Manager::Stream::~Stream() + { if ( type ) Unref(type); if ( description ) @@ -83,7 +90,7 @@ Manager::Stream::~Stream() { if ( reader ) delete(reader); -} + } class Manager::TableStream: public Manager::Stream { public: @@ -120,7 +127,8 @@ public: ~EventStream(); }; -Manager::TableStream::TableStream() : Manager::Stream::Stream() { +Manager::TableStream::TableStream() : Manager::Stream::Stream() + { filter_type = TABLE_FILTER; tab = 0; @@ -131,20 +139,22 @@ Manager::TableStream::TableStream() : Manager::Stream::Stream() { lastDict = 0; pred = 0; -} + } -Manager::EventStream::EventStream() : Manager::Stream::Stream() { +Manager::EventStream::EventStream() : Manager::Stream::Stream() + { fields = 0; filter_type = EVENT_FILTER; -} + } -Manager::EventStream::~EventStream() { - if ( fields ) { +Manager::EventStream::~EventStream() + { + if ( fields ) Unref(fields); - } -} + } -Manager::TableStream::~TableStream() { +Manager::TableStream::~TableStream() + { if ( tab ) Unref(tab); if ( itype ) @@ -152,22 +162,24 @@ Manager::TableStream::~TableStream() { if ( rtype ) // can be 0 for sets Unref(rtype); - if ( currDict != 0 ) { + if ( currDict != 0 ) + { currDict->Clear(); delete currDict; - } + } - if ( lastDict != 0 ) { + if ( lastDict != 0 ) + { lastDict->Clear();; delete lastDict; - } -} + } + } struct ReaderDefinition { bro_int_t type; // the type const char *name; // descriptive name for error messages bool (*init)(); // optional one-time inifializing function - ReaderBackend* (*factory)(ReaderFrontend* frontend); // factory function for creating instances + ReaderBackend* (*factory)(ReaderFrontend* frontend); // factory function for creating instances }; ReaderDefinition input_readers[] = { @@ -180,49 +192,55 @@ ReaderDefinition input_readers[] = { }; Manager::Manager() -{ -} - -Manager::~Manager() { - for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { - delete s->second; - delete s->first; + { } -} - -ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) { - ReaderDefinition* ir = input_readers; - - while ( true ) { - if ( ir->type == BifEnum::Input::READER_DEFAULT ) +Manager::~Manager() + { + for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { - reporter->Error("The reader that was requested was not found and could not be initialized."); - return 0; + delete s->second; + delete s->first; } - if ( ir->type != type ) { + } + +ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) + { + ReaderDefinition* ir = input_readers; + + while ( true ) + { + if ( ir->type == BifEnum::Input::READER_DEFAULT ) + { + reporter->Error("The reader that was requested was not found and could not be initialized."); + return 0; + } + + if ( ir->type != type ) + { // no, didn't find the right one... ++ir; continue; - } + } // call init function of writer if presnt if ( ir->init ) - { + { if ( (*ir->init)() ) { - //clear it to be not called again - ir->init = 0; - } else { + //clear it to be not called again + ir->init = 0; + } + else { // ohok. init failed, kill factory for all eternity ir->factory = 0; DBG_LOG(DBG_LOGGING, "Failed to init input class %s", ir->name); return 0; } - } + } if ( !ir->factory ) // no factory? @@ -230,7 +248,8 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) // all done. break. break; - } + } + assert(ir->factory); ReaderBackend* backend = (*ir->factory)(frontend); @@ -238,31 +257,34 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) frontend->ty_name = ir->name; return backend; -} + } // create a new input reader object to be used at whomevers leisure lateron. bool Manager::CreateStream(Stream* info, RecordVal* description) -{ + { ReaderDefinition* ir = input_readers; RecordType* rtype = description->Type()->AsRecordType(); - if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0) || same_type(rtype, BifType::Record::Input::EventDescription, 0) ) ) - { + if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0) + || same_type(rtype, BifType::Record::Input::EventDescription, 0) ) ) + { reporter->Error("Streamdescription argument not of right type for new input stream"); return false; - } + } Val* name_val = description->LookupWithDefault(rtype->FieldOffset("name")); string name = name_val->AsString()->CheckString(); Unref(name_val); - { + { Stream *i = FindStream(name); - if ( i != 0 ) { - reporter->Error("Trying create already existing input stream %s", name.c_str()); + if ( i != 0 ) + { + reporter->Error("Trying create already existing input stream %s", + name.c_str()); return false; + } } - } EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); Val *autostart = description->LookupWithDefault(rtype->FieldOffset("autostart")); @@ -293,25 +315,27 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) return true; -} + } -bool Manager::CreateEventStream(RecordVal* fval) { +bool Manager::CreateEventStream(RecordVal* fval) + { RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) ) - { + { reporter->Error("filter argument not of right type"); return false; - } + } EventStream* filter = new EventStream(); - { + { bool res = CreateStream(filter, fval); - if ( res == false ) { + if ( res == false ) + { delete filter; return false; + } } - } RecordType *fields = fval->LookupWithDefault(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType(); @@ -322,77 +346,87 @@ bool Manager::CreateEventStream(RecordVal* fval) { Func* event = event_val->AsFunc(); Unref(event_val); - { + { FuncType* etype = event->FType()->AsFuncType(); - if ( ! etype->IsEvent() ) { + if ( ! etype->IsEvent() ) + { reporter->Error("stream event is a function, not an event"); return false; - } + } const type_list* args = etype->ArgTypes()->Types(); - if ( args->length() < 2 ) { + if ( args->length() < 2 ) + { reporter->Error("event takes not enough arguments"); return false; - } + } if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) - { + { reporter->Error("events second attribute must be of type Input::Event"); return false; - } + } if ( ! same_type((*args)[0], BifType::Record::Input::EventDescription, 0) ) - { + { reporter->Error("events first attribute must be of type Input::EventDescription"); return false; - } + } - if ( want_record->InternalInt() == 0 ) { - if ( args->length() != fields->NumFields() + 2 ) { + if ( want_record->InternalInt() == 0 ) + { + if ( args->length() != fields->NumFields() + 2 ) + { reporter->Error("event has wrong number of arguments"); return false; - } + } - for ( int i = 0; i < fields->NumFields(); i++ ) { - if ( !same_type((*args)[i+2], fields->FieldType(i) ) ) { + for ( int i = 0; i < fields->NumFields(); i++ ) + { + if ( !same_type((*args)[i+2], fields->FieldType(i) ) ) + { reporter->Error("Incompatible type for event"); return false; + } } - } - } else if ( want_record->InternalInt() == 1 ) { - if ( args->length() != 3 ) { + } + else if ( want_record->InternalInt() == 1 ) + { + if ( args->length() != 3 ) + { reporter->Error("event has wrong number of arguments"); return false; - } + } - if ( !same_type((*args)[2], fields ) ) { + if ( !same_type((*args)[2], fields ) ) + { reporter->Error("Incompatible type for event"); return false; - } + } - } else { + } + else assert(false); - } - } + } vector fieldsV; // vector, because UnrollRecordType needs it bool status = !UnrollRecordType(&fieldsV, fields, ""); - if ( status ) { + if ( status ) + { reporter->Error("Problem unrolling"); return false; - } + } Field** logf = new Field*[fieldsV.size()]; - for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) logf[i] = fieldsV[i]; - } Unref(fields); // ref'd by lookupwithdefault filter->num_fields = fieldsV.size(); @@ -412,56 +446,64 @@ bool Manager::CreateEventStream(RecordVal* fval) { return true; } -bool Manager::CreateTableStream(RecordVal* fval) { +bool Manager::CreateTableStream(RecordVal* fval) + { RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) ) - { + { reporter->Error("filter argument not of right type"); return false; - } + } TableStream* filter = new TableStream(); - { + { bool res = CreateStream(filter, fval); - if ( res == false ) { + if ( res == false ) + { delete filter; return false; + } } - } Val* pred = fval->LookupWithDefault(rtype->FieldOffset("pred")); RecordType *idx = fval->LookupWithDefault(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType(); RecordType *val = 0; - if ( fval->LookupWithDefault(rtype->FieldOffset("val")) != 0 ) { + + if ( fval->LookupWithDefault(rtype->FieldOffset("val")) != 0 ) + { val = fval->LookupWithDefault(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType(); Unref(val); // The lookupwithdefault in the if-clause ref'ed val. - } + } + TableVal *dst = fval->LookupWithDefault(rtype->FieldOffset("destination"))->AsTableVal(); // check if index fields match table description - { + { int num = idx->NumFields(); const type_list* tl = dst->Type()->AsTableType()->IndexTypes(); loop_over_list(*tl, j) { - if ( j >= num ) { + if ( j >= num ) + { reporter->Error("Table type has more indexes than index definition"); return false; - } + } - if ( !same_type(idx->FieldType(j), (*tl)[j]) ) { + if ( !same_type(idx->FieldType(j), (*tl)[j]) ) + { reporter->Error("Table type does not match index type"); return false; - } + } } - if ( num != j ) { + if ( num != j ) + { reporter->Error("Table has less elements than index definition"); return false; + } } - } Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); @@ -469,51 +511,57 @@ bool Manager::CreateTableStream(RecordVal* fval) { Func* event = event_val ? event_val->AsFunc() : 0; Unref(event_val); - if ( event ) { + if ( event ) + { FuncType* etype = event->FType()->AsFuncType(); - if ( ! etype->IsEvent() ) { + if ( ! etype->IsEvent() ) + { reporter->Error("stream event is a function, not an event"); return false; - } + } const type_list* args = etype->ArgTypes()->Types(); if ( args->length() != 4 ) - { + { reporter->Error("Table event must take 4 arguments"); return false; - } + } if ( ! same_type((*args)[0], BifType::Record::Input::TableDescription, 0) ) - { + { reporter->Error("table events first attribute must be of type Input::TableDescription"); return false; - } + } if ( ! same_type((*args)[1], BifType::Enum::Input::Event, 0) ) - { + { reporter->Error("table events second attribute must be of type Input::Event"); return false; - } + } if ( ! same_type((*args)[2], idx) ) - { + { reporter->Error("table events index attributes do not match"); return false; - } + } if ( want_record->InternalInt() == 1 && ! same_type((*args)[3], val) ) - { + { reporter->Error("table events value attributes do not match"); return false; - } else if ( want_record->InternalInt() == 0 && !same_type((*args)[3], val->FieldType(0) ) ) { + } + else if ( want_record->InternalInt() == 0 + && !same_type((*args)[3], val->FieldType(0) ) ) + { reporter->Error("table events value attribute does not match"); return false; - } + } + assert(want_record->InternalInt() == 1 || want_record->InternalInt() == 0); - } + } vector fieldsV; // vector, because we don't know the length beforehands @@ -529,16 +577,16 @@ bool Manager::CreateTableStream(RecordVal* fval) { if ( !val ) assert(valfields == 0); - if ( status ) { + if ( status ) + { reporter->Error("Problem unrolling"); return false; - } + } Field** fields = new Field*[fieldsV.size()]; - for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) fields[i] = fieldsV[i]; - } filter->pred = pred ? pred->AsFunc() : 0; filter->num_idx_fields = idxfields; @@ -556,13 +604,15 @@ bool Manager::CreateTableStream(RecordVal* fval) { Unref(want_record); // ref'd by lookupwithdefault Unref(pred); - if ( valfields > 1 ) { - if ( ! filter->want_record ) { + if ( valfields > 1 ) + { + if ( ! filter->want_record ) + { reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", filter->name.c_str()); delete filter; return false; + } } - } assert(filter->reader); @@ -574,7 +624,7 @@ bool Manager::CreateTableStream(RecordVal* fval) { filter->name.c_str()); return true; -} + } bool Manager::IsCompatibleType(BroType* t, bool atomic_only) @@ -582,7 +632,7 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only) if ( ! t ) return false; - switch ( t->Tag() ) { + switch ( t->Tag() ) { case TYPE_BOOL: case TYPE_INT: case TYPE_COUNT: @@ -624,20 +674,21 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only) } return false; -} + } -bool Manager::RemoveStream(const string &name) { +bool Manager::RemoveStream(const string &name) + { Stream *i = FindStream(name); - if ( i == 0 ) { + if ( i == 0 ) return false; // not found - } - if ( i->removed ) { + if ( i->removed ) + { reporter->Error("Stream %s is already queued for removal. Ignoring remove.", name.c_str()); return false; - } + } i->removed = true; @@ -649,54 +700,66 @@ bool Manager::RemoveStream(const string &name) { #endif return true; -} - -bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) { - Stream *i = FindStream(reader); - - if ( i == 0 ) { - reporter->Error("Stream not found in RemoveStreamContinuation"); - return false; } +bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) + { + Stream *i = FindStream(reader); + + if ( i == 0 ) + { + reporter->Error("Stream not found in RemoveStreamContinuation"); + return false; + } #ifdef DEBUG DBG_LOG(DBG_INPUT, "Successfully executed removal of stream %s", i->name.c_str()); #endif + readers.erase(reader); delete(i); return true; -} + } -bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { - for ( int i = 0; i < rec->NumFields(); i++ ) +bool Manager::UnrollRecordType(vector *fields, + const RecordType *rec, const string& nameprepend) { - if ( !IsCompatibleType(rec->FieldType(i)) ) { + for ( int i = 0; i < rec->NumFields(); i++ ) + { + + if ( !IsCompatibleType(rec->FieldType(i)) ) + { reporter->Error("Incompatible type \"%s\" in table definition for ReaderFrontend", type_name(rec->FieldType(i)->Tag())); return false; - } + } if ( rec->FieldType(i)->Tag() == TYPE_RECORD ) - { + { string prep = nameprepend + rec->FieldName(i) + "."; if ( !UnrollRecordType(fields, rec->FieldType(i)->AsRecordType(), prep) ) - { + { return false; - } + } - } else { + } + else + { Field* field = new Field(); field->name = nameprepend + rec->FieldName(i); field->type = rec->FieldType(i)->Tag(); - if ( field->type == TYPE_TABLE ) { + if ( field->type == TYPE_TABLE ) + { field->subtype = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); - } else if ( field->type == TYPE_VECTOR ) { + } + else if ( field->type == TYPE_VECTOR ) + { field->subtype = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); - } else if ( field->type == TYPE_PORT && - rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) { + } else if ( field->type == TYPE_PORT && + rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) + { // we have an annotation for the second column Val* c = rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN)->AttrExpr()->Eval(0); @@ -705,31 +768,32 @@ bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, co assert(c->Type()->Tag() == TYPE_STRING); field->secondary_name = c->AsStringVal()->AsString()->CheckString(); - } + } - if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) { + if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) field->optional = true; - } fields->push_back(field); + } } - } return true; -} + } bool Manager::ForceUpdate(const string &name) -{ + { Stream *i = FindStream(name); - if ( i == 0 ) { + if ( i == 0 ) + { reporter->Error("Stream %s not found", name.c_str()); return false; - } + } - if ( i->removed ) { + if ( i->removed ) + { reporter->Error("Stream %s is already queued for removal. Ignoring force update.", name.c_str()); return false; - } + } i->reader->Update(); @@ -742,31 +806,34 @@ bool Manager::ForceUpdate(const string &name) } -Val* Manager::RecordValToIndexVal(RecordVal *r) { +Val* Manager::RecordValToIndexVal(RecordVal *r) + { Val* idxval; RecordType *type = r->Type()->AsRecordType(); int num_fields = type->NumFields(); - if ( num_fields == 1 && type->FieldDecl(0)->type->Tag() != TYPE_RECORD ) { + if ( num_fields == 1 && type->FieldDecl(0)->type->Tag() != TYPE_RECORD ) + { idxval = r->LookupWithDefault(0); - } else { + } + else + { ListVal *l = new ListVal(TYPE_ANY); - for ( int j = 0 ; j < num_fields; j++ ) { - //Val* rval = r->Lookup(j); - //assert(rval != 0); + for ( int j = 0 ; j < num_fields; j++ ) l->Append(r->LookupWithDefault(j)); - } + idxval = l; - } + } return idxval; -} + } -Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) { +Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) + { Val* idxval; int position = 0; @@ -776,47 +843,54 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu position = 1; } else { ListVal *l = new ListVal(TYPE_ANY); - for ( int j = 0 ; j < type->NumFields(); j++ ) { - if ( type->FieldType(j)->Tag() == TYPE_RECORD ) { - l->Append(ValueToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position)); - } else { + for ( int j = 0 ; j < type->NumFields(); j++ ) + { + if ( type->FieldType(j)->Tag() == TYPE_RECORD ) + l->Append(ValueToRecordVal(vals, + type->FieldType(j)->AsRecordType(), &position)); + else + { l->Append(ValueToVal(vals[position], type->FieldType(j))); position++; + } } - } idxval = l; - } + } assert ( position == num_fields ); return idxval; -} + } -void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) { +void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) + { Stream *i = FindStream(reader); - if ( i == 0 ) { + if ( i == 0 ) + { reporter->InternalError("Unknown reader in SendEntry"); return; - } + } int readFields; - if ( i->filter_type == TABLE_FILTER ) { + if ( i->filter_type == TABLE_FILTER ) readFields = SendEntryTable(i, vals); - } else if ( i->filter_type == EVENT_FILTER ) { + else if ( i->filter_type == EVENT_FILTER ) + { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); readFields = SendEventStreamEvent(i, type, vals); - } else { + } + else assert(false); - } - for ( int i = 0; i < readFields; i++ ) { + for ( int i = 0; i < readFields; i++ ) delete vals[i]; - } - delete [] vals; -} -int Manager::SendEntryTable(Stream* i, const Value* const *vals) { + delete [] vals; + } + +int Manager::SendEntryTable(Stream* i, const Value* const *vals) + { bool updated = false; assert(i); @@ -826,59 +900,66 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) { HashKey* idxhash = HashValues(filter->num_idx_fields, vals); - if ( idxhash == 0 ) { + if ( idxhash == 0 ) + { reporter->Error("Could not hash line. Ignoring"); return filter->num_val_fields + filter->num_idx_fields; - } + } hash_t valhash = 0; - if ( filter->num_val_fields > 0 ) { + if ( filter->num_val_fields > 0 ) + { HashKey* valhashkey = HashValues(filter->num_val_fields, vals+filter->num_idx_fields); if ( valhashkey == 0 ) { // empty line. index, but no values. // hence we also have no hash value... - } else { + } + else + { valhash = valhashkey->Hash(); delete(valhashkey); + } } - } InputHash *h = filter->lastDict->Lookup(idxhash); - if ( h != 0 ) { + if ( h != 0 ) + { // seen before - if ( filter->num_val_fields == 0 || h->valhash == valhash ) { + if ( filter->num_val_fields == 0 || h->valhash == valhash ) + { // ok, exact duplicate, move entry to new dicrionary and do nothing else. filter->lastDict->Remove(idxhash); filter->currDict->Insert(idxhash, h); delete idxhash; return filter->num_val_fields + filter->num_idx_fields; - } else { + } + else + { assert( filter->num_val_fields > 0 ); // entry was updated in some way filter->lastDict->Remove(idxhash); // keep h for predicates updated = true; + } } - } Val* valval; RecordVal* predidx = 0; int position = filter->num_idx_fields; - if ( filter->num_val_fields == 0 ) { + if ( filter->num_val_fields == 0 ) valval = 0; - } else if ( filter->num_val_fields == 1 && !filter->want_record ) { + else if ( filter->num_val_fields == 1 && !filter->want_record ) valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); - } else { + else valval = ValueToRecordVal(vals, filter->rtype, &position); - } - // call filter first to determine if we really add / change the entry - if ( filter->pred ) { + if ( filter->pred ) + { EnumVal* ev; //Ref(idxval); int startpos = 0; @@ -886,68 +967,74 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) { predidx = ValueToRecordVal(vals, filter->itype, &startpos); //ValueToRecordVal(vals, filter->itype, &startpos); - if ( updated ) { + if ( updated ) ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); - } else { + else ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - } bool result; - if ( filter->num_val_fields > 0 ) { // we have values + if ( filter->num_val_fields > 0 ) // we have values result = CallPred(filter->pred, 3, ev, predidx->Ref(), valval->Ref()); - } else { - // no values + else // no values result = CallPred(filter->pred, 2, ev, predidx->Ref()); - } - if ( result == false ) { + if ( result == false ) + { Unref(predidx); Unref(valval); - if ( !updated ) { + if ( !updated ) + { // throw away. Hence - we quit. And remove the entry from the current dictionary... // (but why should it be in there? assert this). assert ( filter->currDict->RemoveEntry(idxhash) == 0 ); delete idxhash; delete h; return filter->num_val_fields + filter->num_idx_fields; - } else { + } + else + { // keep old one filter->currDict->Insert(idxhash, h); delete idxhash; return filter->num_val_fields + filter->num_idx_fields; + } } - } - } + } // now we don't need h anymore - if we are here, the entry is updated and a new h is created. - if ( h ) { + if ( h ) + { delete h; h = 0; - } + } Val* idxval; - if ( predidx != 0 ) { + if ( predidx != 0 ) + { idxval = RecordValToIndexVal(predidx); // I think there is an unref missing here. But if I insert is, it crashes :) - } else { + } + else idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); - } + Val* oldval = 0; - if ( updated == true ) { + if ( updated == true ) + { assert(filter->num_val_fields > 0); // in that case, we need the old value to send the event (if we send an event). oldval = filter->tab->Lookup(idxval, false); - } + } //i->tab->Assign(idxval, valval); assert(idxval); HashKey* k = filter->tab->ComputeHash(idxval); - if ( !k ) { + if ( !k ) + { reporter->InternalError("could not hash"); assert(false); - } + } InputHash* ih = new InputHash(); ih->idxkey = new HashKey(k->Key(), k->Size(), k->Hash()); @@ -955,57 +1042,64 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) { if ( filter->event && updated ) Ref(oldval); // otherwise it is no longer accessible after the assignment + filter->tab->Assign(idxval, k, valval); Unref(idxval); // asssign does not consume idxval. - if ( predidx != 0 ) { + if ( predidx != 0 ) Unref(predidx); - } filter->currDict->Insert(idxhash, ih); delete idxhash; - if ( filter->event ) { + if ( filter->event ) + { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); - if ( updated ) { // in case of update send back the old value. + if ( updated ) + { // in case of update send back the old value. assert ( filter->num_val_fields > 0 ); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, oldval); - } else { + } + else + { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - if ( filter->num_val_fields == 0 ) { + if ( filter->num_val_fields == 0 ) + { Ref(filter->description); SendEvent(filter->event, 3, filter->description->Ref(), ev, predidx); - } else { + } + else SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, valval->Ref()); + } - } - } + } return filter->num_val_fields + filter->num_idx_fields; -} + } -void Manager::EndCurrentSend(ReaderFrontend* reader) { +void Manager::EndCurrentSend(ReaderFrontend* reader) + { Stream *i = FindStream(reader); - if ( i == 0 ) { + + if ( i == 0 ) + { reporter->InternalError("Unknown reader in EndCurrentSend"); return; - } + } #ifdef DEBUG DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str()); #endif - if ( i->filter_type == EVENT_FILTER ) { - // nothing to do.. + if ( i->filter_type == EVENT_FILTER ) // nothing to do.. return; - } assert(i->filter_type == TABLE_FILTER); TableStream* filter = (TableStream*) i; @@ -1016,8 +1110,8 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { InputHash* ih; HashKey *lastDictIdxKey; //while ( ( ih = i->lastDict->NextEntry(c) ) ) { - while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) { - + while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) + { ListVal * idx = 0; Val *val = 0; @@ -1025,16 +1119,18 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { EnumVal* ev = 0; int startpos = 0; - if ( filter->pred || filter->event ) { + if ( filter->pred || filter->event ) + { idx = filter->tab->RecoverIndex(ih->idxkey); assert(idx != 0); val = filter->tab->Lookup(idx); assert(val != 0); predidx = ListValToRecordVal(idx, filter->itype, &startpos); ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - } + } - if ( filter->pred ) { + if ( filter->pred ) + { // ask predicate, if we want to expire this element... Ref(ev); @@ -1043,7 +1139,8 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { bool result = CallPred(filter->pred, 3, ev, predidx, val); - if ( result == false ) { + if ( result == false ) + { // Keep it. Hence - we quit and simply go to the next entry of lastDict // ah well - and we have to add the entry to currDict... Unref(predidx); @@ -1051,15 +1148,16 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey)); delete lastDictIdxKey; continue; + } } - } - if ( filter->event ) { + if ( filter->event ) + { Ref(predidx); Ref(val); Ref(ev); SendEvent(filter->event, 3, ev, predidx, val); - } + } if ( predidx ) // if we have a filter or an event... Unref(predidx); @@ -1070,9 +1168,9 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { filter->lastDict->Remove(lastDictIdxKey); // delete in next line delete lastDictIdxKey; delete(ih); - } + } - filter->lastDict->Clear(); // should be empty->->-> but->->-> well->->-> who knows->->-> + filter->lastDict->Clear(); // should be empt. buti- well... who knows... delete(filter->lastDict); filter->lastDict = filter->currDict; @@ -1086,39 +1184,40 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { // Send event that the current update is indeed finished. EventHandler* handler = event_registry->Lookup("Input::update_finished"); - if ( handler == 0 ) { + if ( handler == 0 ) reporter->InternalError("Input::update_finished not found!"); - } - SendEvent(handler, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); -} + } -void Manager::Put(ReaderFrontend* reader, Value* *vals) { +void Manager::Put(ReaderFrontend* reader, Value* *vals) + { Stream *i = FindStream(reader); - if ( i == 0 ) { + if ( i == 0 ) + { reporter->InternalError("Unknown reader in Put"); return; - } + } int readFields; - if ( i->filter_type == TABLE_FILTER ) { + if ( i->filter_type == TABLE_FILTER ) readFields = PutTable(i, vals); - } else if ( i->filter_type == EVENT_FILTER ) { + else if ( i->filter_type == EVENT_FILTER ) + { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); readFields = SendEventStreamEvent(i, type, vals); - } else { + } + else assert(false); - } - for ( int i = 0; i < readFields; i++ ) { + for ( int i = 0; i < readFields; i++ ) delete vals[i]; - } + delete [] vals; + } -} - -int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) { +int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) + { assert(i); assert(i->filter_type == EVENT_FILTER); @@ -1133,29 +1232,37 @@ int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const * out_vals.push_back(type); int position = 0; - if ( filter->want_record ) { + if ( filter->want_record ) + { RecordVal * r = ValueToRecordVal(vals, filter->fields, &position); out_vals.push_back(r); - } else { - for ( int j = 0; j < filter->fields->NumFields(); j++) { + } + else + { + for ( int j = 0; j < filter->fields->NumFields(); j++) + { Val* val = 0; - if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) { - val = ValueToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position); - } else { + if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) + val = ValueToRecordVal(vals, + filter->fields->FieldType(j)->AsRecordType(), + &position); + else + { val = ValueToVal(vals[position], filter->fields->FieldType(j)); position++; - } + } out_vals.push_back(val); + } } - } SendEvent(filter->event, out_vals); return filter->fields->NumFields(); -} + } -int Manager::PutTable(Stream* i, const Value* const *vals) { +int Manager::PutTable(Stream* i, const Value* const *vals) + { assert(i); assert(i->filter_type == TABLE_FILTER); @@ -1165,102 +1272,115 @@ int Manager::PutTable(Stream* i, const Value* const *vals) { Val* valval; int position = filter->num_idx_fields; - if ( filter->num_val_fields == 0 ) { + if ( filter->num_val_fields == 0 ) valval = 0; - } else if ( filter->num_val_fields == 1 && filter->want_record == 0 ) { + else if ( filter->num_val_fields == 1 && filter->want_record == 0 ) valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); - } else { + else valval = ValueToRecordVal(vals, filter->rtype, &position); - } // if we have a subscribed event, we need to figure out, if this is an update or not // same for predicates - if ( filter->pred || filter->event ) { + if ( filter->pred || filter->event ) + { bool updated = false; Val* oldval = 0; - if ( filter->num_val_fields > 0 ) { + if ( filter->num_val_fields > 0 ) + { // in that case, we need the old value to send the event (if we send an event). oldval = filter->tab->Lookup(idxval, false); - } + } - if ( oldval != 0 ) { + if ( oldval != 0 ) + { // it is an update updated = true; Ref(oldval); // have to do that, otherwise it may disappear in assign - } + } // predicate if we want the update or not - if ( filter->pred ) { + if ( filter->pred ) + { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Ref(valval); - if ( updated ) { - ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); - } else { - ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - } + if ( updated ) + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, + BifType::Enum::Input::Event); + else + ev = new EnumVal(BifEnum::Input::EVENT_NEW, + BifType::Enum::Input::Event); bool result; - if ( filter->num_val_fields > 0 ) { // we have values + if ( filter->num_val_fields > 0 ) // we have values result = CallPred(filter->pred, 3, ev, predidx, valval); - } else { - // no values + else // no values result = CallPred(filter->pred, 2, ev, predidx); - } - if ( result == false ) { + if ( result == false ) + { // do nothing Unref(idxval); Unref(valval); Unref(oldval); return filter->num_val_fields + filter->num_idx_fields; - } + } - } + } filter->tab->Assign(idxval, valval); - if ( filter->event ) { + if ( filter->event ) + { EnumVal* ev; int startpos = 0; Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); - if ( updated ) { // in case of update send back the old value. + if ( updated ) + { + // in case of update send back the old value. assert ( filter->num_val_fields > 0 ); - ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, + BifType::Enum::Input::Event); assert ( oldval != 0 ); - SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, oldval); - } else { - ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - if ( filter->num_val_fields == 0 ) { - SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx); - } else { - SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, valval->Ref()); + SendEvent(filter->event, 4, filter->description->Ref(), + ev, predidx, oldval); + } + else + { + ev = new EnumVal(BifEnum::Input::EVENT_NEW, + BifType::Enum::Input::Event); + if ( filter->num_val_fields == 0 ) + SendEvent(filter->event, 4, filter->description->Ref(), + ev, predidx); + else + SendEvent(filter->event, 4, filter->description->Ref(), + ev, predidx, valval->Ref()); } + } - } - - - } else { - // no predicates or other stuff + } + else // no predicates or other stuff filter->tab->Assign(idxval, valval); - } + return filter->num_idx_fields + filter->num_val_fields; -} + } // Todo:: perhaps throw some kind of clear-event? -void Manager::Clear(ReaderFrontend* reader) { +void Manager::Clear(ReaderFrontend* reader) + { Stream *i = FindStream(reader); - if ( i == 0 ) { + if ( i == 0 ) + { reporter->InternalError("Unknown reader in Clear"); return; - } + } #ifdef DEBUG DBG_LOG(DBG_INPUT, "Got Clear for stream %s", @@ -1271,30 +1391,35 @@ void Manager::Clear(ReaderFrontend* reader) { TableStream* filter = (TableStream*) i; filter->tab->RemoveAll(); -} + } // put interface: delete old entry from table. -bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { +bool Manager::Delete(ReaderFrontend* reader, Value* *vals) + { Stream *i = FindStream(reader); - if ( i == 0 ) { + if ( i == 0 ) + { reporter->InternalError("Unknown reader in Delete"); return false; - } + } bool success = false; int readVals = 0; - if ( i->filter_type == TABLE_FILTER ) { + if ( i->filter_type == TABLE_FILTER ) + { TableStream* filter = (TableStream*) i; Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); assert(idxval != 0); readVals = filter->num_idx_fields + filter->num_val_fields; bool filterresult = true; - if ( filter->pred || filter->event ) { + if ( filter->pred || filter->event ) + { Val *val = filter->tab->Lookup(idxval); - if ( filter->pred ) { + if ( filter->pred ) + { Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); int startpos = 0; @@ -1302,62 +1427,68 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { filterresult = CallPred(filter->pred, 3, ev, predidx, val); - if ( filterresult == false ) { + if ( filterresult == false ) + { // keep it. Unref(idxval); success = true; + } + } - } - // only if filter = true -> no filtering - if ( filterresult && filter->event ) { + if ( filterresult && filter->event ) + { Ref(idxval); assert(val != 0); Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); SendEvent(filter->event, 4, filter->description->Ref(), ev, idxval, val); + } } - } // only if filter = true -> no filtering - if ( filterresult ) { + if ( filterresult ) + { Val* retptr = filter->tab->Delete(idxval); success = ( retptr != 0 ); - if ( !success ) { + if ( !success ) reporter->Error("Internal error while deleting values from input table"); - } else { + else Unref(retptr); } - } - } else if ( i->filter_type == EVENT_FILTER ) { + + } + else if ( i->filter_type == EVENT_FILTER ) + { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); readVals = SendEventStreamEvent(i, type, vals); success = true; - } else { + } + else + { assert(false); return false; - } + } - for ( int i = 0; i < readVals; i++ ) { + for ( int i = 0; i < readVals; i++ ) delete vals[i]; - } + delete [] vals; return success; -} + } bool Manager::CallPred(Func* pred_func, const int numvals, ...) -{ + { bool result; val_list vl(numvals); va_list lP; va_start(lP, numvals); for ( int i = 0; i < numvals; i++ ) - { vl.append( va_arg(lP, Val*) ); - } + va_end(lP); Val* v = pred_func->Call(&vl); @@ -1365,120 +1496,131 @@ bool Manager::CallPred(Func* pred_func, const int numvals, ...) Unref(v); return(result); -} + } bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) -{ + { EventHandler* handler = event_registry->Lookup(name.c_str()); - if ( handler == 0 ) { + if ( handler == 0 ) + { reporter->Error("Event %s not found", name.c_str()); return false; - } + } RecordType *type = handler->FType()->Args(); int num_event_vals = type->NumFields(); - if ( num_vals != num_event_vals ) { + if ( num_vals != num_event_vals ) + { reporter->Error("Wrong number of values for event %s", name.c_str()); return false; - } + } val_list* vl = new val_list; - for ( int i = 0; i < num_vals; i++) { + for ( int i = 0; i < num_vals; i++) vl->append(ValueToVal(vals[i], type->FieldType(i))); - } mgr.Dispatch(new Event(handler, vl)); - for ( int i = 0; i < num_vals; i++ ) { + for ( int i = 0; i < num_vals; i++ ) delete vals[i]; - } + delete [] vals; return true; } void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) -{ + { val_list* vl = new val_list; va_list lP; va_start(lP, numvals); for ( int i = 0; i < numvals; i++ ) - { vl->append( va_arg(lP, Val*) ); - } + va_end(lP); mgr.QueueEvent(ev, vl, SOURCE_LOCAL); -} - -void Manager::SendEvent(EventHandlerPtr ev, list events) -{ - val_list* vl = new val_list; - - for ( list::iterator i = events.begin(); i != events.end(); i++ ) { - vl->append( *i ); } - mgr.QueueEvent(ev, vl, SOURCE_LOCAL); -} +void Manager::SendEvent(EventHandlerPtr ev, list events) + { + val_list* vl = new val_list; + + for ( list::iterator i = events.begin(); i != events.end(); i++ ) + { + vl->append( *i ); + } -// Convert a bro list value to a bro record value. I / we could think about moving this functionality to val.cc -RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) { + mgr.QueueEvent(ev, vl, SOURCE_LOCAL); + } + +// Convert a bro list value to a bro record value. +// I / we could think about moving this functionality to val.cc +RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) + { assert(position != 0 ); // we need the pointer to point to data; - if ( request_type->Tag() != TYPE_RECORD ) { + if ( request_type->Tag() != TYPE_RECORD ) + { reporter->InternalError("ListValToRecordVal called on non-record-value."); return 0; - } + } RecordVal* rec = new RecordVal(request_type->AsRecordType()); assert(list != 0); int maxpos = list->Length(); - for ( int i = 0; i < request_type->NumFields(); i++ ) { + for ( int i = 0; i < request_type->NumFields(); i++ ) + { assert ( (*position) <= maxpos ); Val* fieldVal = 0; - if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) { + if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) fieldVal = ListValToRecordVal(list, request_type->FieldType(i)->AsRecordType(), position); - } else { + else + { fieldVal = list->Index(*position); (*position)++; - } + } rec->Assign(i, fieldVal); - } + } return rec; -} + } // Convert a threading value to a record value -RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) { +RecordVal* Manager::ValueToRecordVal(const Value* const *vals, + RecordType *request_type, int* position) + { assert(position != 0); // we need the pointer to point to data. - if ( request_type->Tag() != TYPE_RECORD ) { + if ( request_type->Tag() != TYPE_RECORD ) + { reporter->InternalError("ValueToRecordVal called on non-record-value."); return 0; - } + } RecordVal* rec = new RecordVal(request_type->AsRecordType()); - for ( int i = 0; i < request_type->NumFields(); i++ ) { + for ( int i = 0; i < request_type->NumFields(); i++ ) + { Val* fieldVal = 0; - if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) { + if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) fieldVal = ValueToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); - } else { + else + { fieldVal = ValueToVal(vals[*position], request_type->FieldType(i)); (*position)++; - } + } rec->Assign(i, fieldVal); - } + } return rec; -} + } // Count the length of the values // used to create a correct length buffer for hashing later @@ -1495,7 +1637,7 @@ int Manager::GetValueLength(const Value* val) { case TYPE_COUNT: case TYPE_COUNTER: length += sizeof(val->val.uint_val); - break; + break; case TYPE_PORT: length += sizeof(val->val.port_val.port); @@ -1517,48 +1659,48 @@ int Manager::GetValueLength(const Value* val) { case TYPE_ADDR: { - switch ( val->val.addr_val.family ) { - case IPv4: - length += sizeof(val->val.addr_val.in.in4); - break; - case IPv6: - length += sizeof(val->val.addr_val.in.in6); - break; - default: - assert(false); - } - + switch ( val->val.addr_val.family ) { + case IPv4: + length += sizeof(val->val.addr_val.in.in4); + break; + case IPv6: + length += sizeof(val->val.addr_val.in.in6); + break; + default: + assert(false); + } } break; case TYPE_SUBNET: { - switch ( val->val.subnet_val.prefix.family ) { - case IPv4: - length += sizeof(val->val.subnet_val.prefix.in.in4)+sizeof(val->val.subnet_val.length); - break; - case IPv6: - length += sizeof(val->val.subnet_val.prefix.in.in6)+sizeof(val->val.subnet_val.length); - break; - default: - assert(false); - } - + switch ( val->val.subnet_val.prefix.family ) { + case IPv4: + length += sizeof(val->val.subnet_val.prefix.in.in4)+ + sizeof(val->val.subnet_val.length); + break; + case IPv6: + length += sizeof(val->val.subnet_val.prefix.in.in6)+ + sizeof(val->val.subnet_val.length); + break; + default: + assert(false); + } } break; - case TYPE_TABLE: { - for ( int i = 0; i < val->val.set_val.size; i++ ) { + case TYPE_TABLE: + { + for ( int i = 0; i < val->val.set_val.size; i++ ) length += GetValueLength(val->val.set_val.vals[i]); - } break; } - case TYPE_VECTOR: { + case TYPE_VECTOR: + { int j = val->val.vector_val.size; - for ( int i = 0; i < j; i++ ) { + for ( int i = 0; i < j; i++ ) length += GetValueLength(val->val.vector_val.vals[i]); - } break; } @@ -1572,7 +1714,8 @@ int Manager::GetValueLength(const Value* val) { // Given a threading::value, copy the raw data bytes into *data and return how many bytes were copied. // Used for hashing the values for lookup in the bro table -int Manager::CopyValue(char *data, const int startpos, const Value* val) { +int Manager::CopyValue(char *data, const int startpos, const Value* val) + { assert( val->present ); // presence has to be checked elsewhere switch ( val->type ) { @@ -1588,11 +1731,14 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { return sizeof(val->val.uint_val); break; - case TYPE_PORT: { + case TYPE_PORT: + { int length = 0; - memcpy(data+startpos, (const void*) &(val->val.port_val.port), sizeof(val->val.port_val.port)); + memcpy(data+startpos, (const void*) &(val->val.port_val.port), + sizeof(val->val.port_val.port)); length += sizeof(val->val.port_val.port); - memcpy(data+startpos+length, (const void*) &(val->val.port_val.proto), sizeof(val->val.port_val.proto)); + memcpy(data+startpos+length, (const void*) &(val->val.port_val.proto), + sizeof(val->val.port_val.proto)); length += sizeof(val->val.port_val.proto); return length; break; @@ -1602,7 +1748,8 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: - memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val)); + memcpy(data+startpos, (const void*) &(val->val.double_val), + sizeof(val->val.double_val)); return sizeof(val->val.double_val); break; @@ -1616,64 +1763,66 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { case TYPE_ADDR: { - int length; - switch ( val->val.addr_val.family ) { - case IPv4: - length = sizeof(val->val.addr_val.in.in4); - memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in4), length); - break; - case IPv6: - length = sizeof(val->val.addr_val.in.in6); - memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in6), length); - break; - default: - assert(false); - } - - return length; - + int length; + switch ( val->val.addr_val.family ) { + case IPv4: + length = sizeof(val->val.addr_val.in.in4); + memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in4), length); + break; + case IPv6: + length = sizeof(val->val.addr_val.in.in6); + memcpy(data + startpos, (const char*) &(val->val.addr_val.in.in6), length); + break; + default: + assert(false); + } + return length; } break; case TYPE_SUBNET: { - int length; - switch ( val->val.subnet_val.prefix.family ) { - case IPv4: - length = sizeof(val->val.addr_val.in.in4); - memcpy(data + startpos, (const char*) &(val->val.subnet_val.prefix.in.in4), length); - break; - case IPv6: - length = sizeof(val->val.addr_val.in.in6); - memcpy(data + startpos, (const char*) &(val->val.subnet_val.prefix.in.in4), length); - break; - default: - assert(false); - } - int lengthlength = sizeof(val->val.subnet_val.length); - memcpy(data + startpos + length , (const char*) &(val->val.subnet_val.length), lengthlength); - length += lengthlength; - return length; - + int length; + switch ( val->val.subnet_val.prefix.family ) { + case IPv4: + length = sizeof(val->val.addr_val.in.in4); + memcpy(data + startpos, + (const char*) &(val->val.subnet_val.prefix.in.in4), length); + break; + case IPv6: + length = sizeof(val->val.addr_val.in.in6); + memcpy(data + startpos, + (const char*) &(val->val.subnet_val.prefix.in.in4), length); + break; + default: + assert(false); + } + int lengthlength = sizeof(val->val.subnet_val.length); + memcpy(data + startpos + length , + (const char*) &(val->val.subnet_val.length), lengthlength); + length += lengthlength; + return length; } break; - case TYPE_TABLE: { + case TYPE_TABLE: + { int length = 0; int j = val->val.set_val.size; - for ( int i = 0; i < j; i++ ) { + for ( int i = 0; i < j; i++ ) length += CopyValue(data, startpos+length, val->val.set_val.vals[i]); - } + return length; break; } - case TYPE_VECTOR: { + case TYPE_VECTOR: + { int length = 0; int j = val->val.vector_val.size; - for ( int i = 0; i < j; i++ ) { + for ( int i = 0; i < j; i++ ) length += CopyValue(data, startpos+length, val->val.vector_val.vals[i]); - } + return length; break; } @@ -1685,52 +1834,57 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) { assert(false); return 0; -} + } // Hash num_elements threading values and return the HashKey for them. At least one of the vals has to be ->present. -HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { +HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) + { int length = 0; - for ( int i = 0; i < num_elements; i++ ) { + for ( int i = 0; i < num_elements; i++ ) + { const Value* val = vals[i]; if ( val->present ) length += GetValueLength(val); - } + } - if ( length == 0 ) { + if ( length == 0 ) + { reporter->Error("Input reader sent line where all elements are null values. Ignoring line"); return NULL; - } + } int position = 0; char *data = (char*) malloc(length); - if ( data == 0 ) { + if ( data == 0 ) reporter->InternalError("Could not malloc?"); - } - for ( int i = 0; i < num_elements; i++ ) { + + for ( int i = 0; i < num_elements; i++ ) + { const Value* val = vals[i]; if ( val->present ) position += CopyValue(data, position, val); - } + } HashKey *key = new HashKey(data, length); delete data; assert(position == length); return key; -} + } // convert threading value to Bro value -Val* Manager::ValueToVal(const Value* val, BroType* request_type) { +Val* Manager::ValueToVal(const Value* val, BroType* request_type) + { - if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { + if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) + { reporter->InternalError("Typetags don't match: %d vs %d", request_type->Tag(), val->type); return 0; - } + } - if ( !val->present ) { + if ( !val->present ) return 0; // unset field - } switch ( val->type ) { case TYPE_BOOL: @@ -1762,72 +1916,73 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { case TYPE_ADDR: { - IPAddr* addr; - switch ( val->val.addr_val.family ) { - case IPv4: - addr = new IPAddr(val->val.addr_val.in.in4); - break; - case IPv6: - addr = new IPAddr(val->val.addr_val.in.in6); - break; - default: - assert(false); - } - AddrVal* addrval = new AddrVal(*addr); - delete addr; - return addrval; - + IPAddr* addr; + switch ( val->val.addr_val.family ) { + case IPv4: + addr = new IPAddr(val->val.addr_val.in.in4); + break; + case IPv6: + addr = new IPAddr(val->val.addr_val.in.in6); + break; + default: + assert(false); + } + AddrVal* addrval = new AddrVal(*addr); + delete addr; + return addrval; } case TYPE_SUBNET: { - IPAddr* addr; - switch ( val->val.subnet_val.prefix.family ) { - case IPv4: - addr = new IPAddr(val->val.subnet_val.prefix.in.in4); - break; - case IPv6: - addr = new IPAddr(val->val.subnet_val.prefix.in.in6); - break; - default: - assert(false); - } - SubNetVal* subnetval = new SubNetVal(*addr, val->val.subnet_val.length); - delete addr; - return subnetval; - + IPAddr* addr; + switch ( val->val.subnet_val.prefix.family ) { + case IPv4: + addr = new IPAddr(val->val.subnet_val.prefix.in.in4); + break; + case IPv6: + addr = new IPAddr(val->val.subnet_val.prefix.in.in6); + break; + default: + assert(false); } + SubNetVal* subnetval = new SubNetVal(*addr, val->val.subnet_val.length); + delete addr; + return subnetval; break; + } - case TYPE_TABLE: { + case TYPE_TABLE: + { // all entries have to have the same type... BroType* type = request_type->AsTableType()->Indices()->PureType(); TypeList* set_index = new TypeList(type->Ref()); set_index->Append(type->Ref()); SetType* s = new SetType(set_index, 0); TableVal* t = new TableVal(s); - for ( int i = 0; i < val->val.set_val.size; i++ ) { + for ( int i = 0; i < val->val.set_val.size; i++ ) + { Val* assignval = ValueToVal( val->val.set_val.vals[i], type ); t->Assign(assignval, 0); Unref(assignval); // idex is not consumed by assign. - } + } Unref(s); return t; break; } - case TYPE_VECTOR: { + case TYPE_VECTOR: + { // all entries have to have the same type... BroType* type = request_type->AsVectorType()->YieldType(); VectorType* vt = new VectorType(type->Ref()); VectorVal* v = new VectorVal(vt); - for ( int i = 0; i < val->val.vector_val.size; i++ ) { + for ( int i = 0; i < val->val.vector_val.size; i++ ) v->Assign(i, ValueToVal( val->val.set_val.vals[i], type ), 0); - } + Unref(vt); return v; - + break; } case TYPE_ENUM: { @@ -1836,9 +1991,10 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { string module = extract_module_name(val->val.string_val->c_str()); string var = extract_var_name(val->val.string_val->c_str()); bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); - if ( index == -1 ) { - reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", module.c_str(), var.c_str()); - } + if ( index == -1 ) + reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", + module.c_str(), var.c_str()); + return new EnumVal(index, request_type->Ref()->AsEnumType() ); break; } @@ -1850,26 +2006,24 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { assert(false); return NULL; -} + } Manager::Stream* Manager::FindStream(const string &name) { for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { if ( (*s).second->name == name ) - { return (*s).second; } - } return 0; } Manager::Stream* Manager::FindStream(ReaderFrontend* reader) -{ + { map::iterator s = readers.find(reader); - if ( s != readers.end() ) { + if ( s != readers.end() ) return s->second; - } + return 0; -} + } diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 27401ffcb8..c625301383 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -15,10 +15,11 @@ public: : threading::OutputMessage("Put", reader), val(val) {} - virtual bool Process() { + virtual bool Process() + { input_mgr->Put(Object(), val); return true; - } + } private: Value* *val; @@ -30,9 +31,10 @@ public: : threading::OutputMessage("Delete", reader), val(val) {} - virtual bool Process() { + virtual bool Process() + { return input_mgr->Delete(Object(), val); - } + } private: Value* *val; @@ -43,10 +45,11 @@ public: ClearMessage(ReaderFrontend* reader) : threading::OutputMessage("Clear", reader) {} - virtual bool Process() { + virtual bool Process() + { input_mgr->Clear(Object()); return true; - } + } private: }; @@ -57,14 +60,15 @@ public: : threading::OutputMessage("SendEvent", reader), name(name), num_vals(num_vals), val(val) {} - virtual bool Process() { + virtual bool Process() + { bool success = input_mgr->SendEvent(name, num_vals, val); if ( !success ) reporter->Error("SendEvent for event %s failed", name.c_str()); return true; // we do not want to die if sendEvent fails because the event did not return. - } + } private: const string name; @@ -78,10 +82,11 @@ public: : threading::OutputMessage("SendEntry", reader), val(val) { } - virtual bool Process() { + virtual bool Process() + { input_mgr->SendEntry(Object(), val); return true; - } + } private: Value* *val; @@ -92,10 +97,11 @@ public: EndCurrentSendMessage(ReaderFrontend* reader) : threading::OutputMessage("EndCurrentSend", reader) {} - virtual bool Process() { + virtual bool Process() + { input_mgr->EndCurrentSend(Object()); return true; - } + } private: }; @@ -105,9 +111,10 @@ public: ReaderClosedMessage(ReaderFrontend* reader) : threading::OutputMessage("ReaderClosed", reader) {} - virtual bool Process() { + virtual bool Process() + { return input_mgr->RemoveStreamContinuation(Object()); - } + } private: }; @@ -119,12 +126,16 @@ public: DisableMessage(ReaderFrontend* writer) : threading::OutputMessage("Disable", writer) {} - virtual bool Process() { Object()->SetDisable(); return true; } + virtual bool Process() + { + Object()->SetDisable(); + return true; + } }; ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() -{ + { buf = 0; buf_len = 1024; disabled = true; // disabled will be set correcty in init. @@ -132,45 +143,45 @@ ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() frontend = arg_frontend; SetName(frontend->Name()); -} + } ReaderBackend::~ReaderBackend() -{ - -} + { + } void ReaderBackend::Put(Value* *val) -{ + { SendOut(new PutMessage(frontend, val)); -} + } void ReaderBackend::Delete(Value* *val) -{ + { SendOut(new DeleteMessage(frontend, val)); -} + } void ReaderBackend::Clear() -{ + { SendOut(new ClearMessage(frontend)); -} + } void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) -{ + { SendOut(new SendEventMessage(frontend, name, num_vals, vals)); -} + } void ReaderBackend::EndCurrentSend() -{ + { SendOut(new EndCurrentSendMessage(frontend)); -} + } void ReaderBackend::SendEntry(Value* *vals) -{ + { SendOut(new SendEntryMessage(frontend, vals)); -} + } -bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* arg_fields) -{ +bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, + const threading::Field* const* arg_fields) + { source = arg_source; SetName("InputReader/"+source); @@ -180,89 +191,90 @@ bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, // disable if DoInit returns error. int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); - if ( !success ) { + if ( !success ) + { Error("Init failed"); DisableFrontend(); - } + } disabled = !success; return success; -} + } void ReaderBackend::Close() -{ + { DoClose(); disabled = true; DisableFrontend(); SendOut(new ReaderClosedMessage(frontend)); - if ( fields != 0 ) { - - for ( unsigned int i = 0; i < num_fields; i++ ) { + if ( fields != 0 ) + { + for ( unsigned int i = 0; i < num_fields; i++ ) delete(fields[i]); - } delete[] (fields); fields = 0; + } } -} bool ReaderBackend::Update() -{ + { if ( disabled ) return false; bool success = DoUpdate(); - if ( !success ) { + if ( !success ) DisableFrontend(); - } return success; -} + } void ReaderBackend::DisableFrontend() -{ - disabled = true; // we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now + { + disabled = true; + // we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now SendOut(new DisableMessage(frontend)); -} + } bool ReaderBackend::DoHeartbeat(double network_time, double current_time) -{ + { MsgThread::DoHeartbeat(network_time, current_time); - return true; -} - -TransportProto ReaderBackend::StringToProto(const string &proto) { - if ( proto == "unknown" ) { - return TRANSPORT_UNKNOWN; - } else if ( proto == "tcp" ) { - return TRANSPORT_TCP; - } else if ( proto == "udp" ) { - return TRANSPORT_UDP; - } else if ( proto == "icmp" ) { - return TRANSPORT_ICMP; } +TransportProto ReaderBackend::StringToProto(const string &proto) + { + if ( proto == "unknown" ) + return TRANSPORT_UNKNOWN; + else if ( proto == "tcp" ) + return TRANSPORT_TCP; + else if ( proto == "udp" ) + return TRANSPORT_UDP; + else if ( proto == "icmp" ) + return TRANSPORT_ICMP; + Error(Fmt("Tried to parse invalid/unknown protocol: %s", proto.c_str())); return TRANSPORT_UNKNOWN; -} + } // more or less verbose copy from IPAddr.cc -- which uses reporter -Value::addr_t ReaderBackend::StringToAddr(const string &s) { +Value::addr_t ReaderBackend::StringToAddr(const string &s) + { Value::addr_t val; if ( s.find(':') == std::string::npos ) // IPv4. { val.family = IPv4; - if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) { + if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) + { Error(Fmt("Bad addres: %s", s.c_str())); memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr)); - } + } } @@ -277,6 +289,6 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s) { } return val; -} + } } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index 6b3c2e6a67..f61fd357b9 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -46,19 +46,23 @@ public: }; -ReaderFrontend::ReaderFrontend(bro_int_t type) { +ReaderFrontend::ReaderFrontend(bro_int_t type) + { disabled = initialized = false; ty_name = ""; backend = input_mgr->CreateBackend(this, type); assert(backend); backend->Start(); -} + } -ReaderFrontend::~ReaderFrontend() { -} +ReaderFrontend::~ReaderFrontend() + { + } -void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, const threading::Field* const* fields) { +void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, + const threading::Field* const* fields) + { if ( disabled ) return; @@ -69,39 +73,43 @@ void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, con initialized = true; backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); -} + } -void ReaderFrontend::Update() { +void ReaderFrontend::Update() + { if ( disabled ) return; - if ( !initialized ) { + if ( !initialized ) + { reporter->Error("Tried to call update on uninitialized reader"); return; - } + } backend->SendIn(new UpdateMessage(backend)); -} + } -void ReaderFrontend::Close() { +void ReaderFrontend::Close() + { if ( disabled ) return; - if ( !initialized ) { + if ( !initialized ) + { reporter->Error("Tried to call finish on uninitialized reader"); return; - } + } backend->SendIn(new CloseMessage(backend)); -} + } string ReaderFrontend::Name() const -{ + { if ( source.size() ) return ty_name; return ty_name + "/" + source; -} + } } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index a167408a0e..c798c21a5e 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -23,69 +23,73 @@ using threading::Field; FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) : name(arg_name), type(arg_type) -{ + { position = arg_position; secondary_position = -1; present = true; -} + } -FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position) +FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, + const TypeTag& arg_subtype, int arg_position) : name(arg_name), type(arg_type), subtype(arg_subtype) -{ + { position = arg_position; secondary_position = -1; present = true; -} + } FieldMapping::FieldMapping(const FieldMapping& arg) : name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present) -{ + { position = arg.position; secondary_position = arg.secondary_position; -} + } -FieldMapping FieldMapping::subType() { +FieldMapping FieldMapping::subType() + { return FieldMapping(name, subtype, position); -} + } Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) -{ + { file = 0; - //keyMap = new map(); - separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), BifConst::InputAscii::separator->Len()); - if ( separator.size() != 1 ) { + separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), + BifConst::InputAscii::separator->Len()); + if ( separator.size() != 1 ) Error("separator length has to be 1. Separator will be truncated."); - } - set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), BifConst::InputAscii::set_separator->Len()); - if ( set_separator.size() != 1 ) { + set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), + BifConst::InputAscii::set_separator->Len()); + if ( set_separator.size() != 1 ) Error("set_separator length has to be 1. Separator will be truncated."); - } - empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), BifConst::InputAscii::empty_field->Len()); - - unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), BifConst::InputAscii::unset_field->Len()); + empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), + BifConst::InputAscii::empty_field->Len()); + unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), + BifConst::InputAscii::unset_field->Len()); + } Ascii::~Ascii() -{ + { DoClose(); -} + } void Ascii::DoClose() -{ - if ( file != 0 ) { + { + if ( file != 0 ) + { file->close(); delete(file); file = 0; + } } -} bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) -{ + { fname = path; mode = arg_mode; mtime = 0; @@ -93,124 +97,135 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c num_fields = arg_num_fields; fields = arg_fields; - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { + if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + { Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); return false; - } + } file = new ifstream(path.c_str()); - if ( !file->is_open() ) { + if ( !file->is_open() ) + { Error(Fmt("Init: cannot open %s", fname.c_str())); delete(file); file = 0; return false; - } + } - if ( ReadHeader(false) == false ) { + if ( ReadHeader(false) == false ) + { Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str())); file->close(); delete(file); file = 0; return false; - } + } DoUpdate(); return true; -} + } -bool Ascii::ReadHeader(bool useCached) { +bool Ascii::ReadHeader(bool useCached) + { // try to read the header line... string line; map ifields; - if ( !useCached ) { - if ( !GetLine(line) ) { + if ( !useCached ) + { + if ( !GetLine(line) ) + { Error("could not read first line"); return false; - } - - + } headerline = line; - - } else { + } + else line = headerline; - } // construct list of field names. istringstream splitstream(line); int pos=0; - while ( splitstream ) { + while ( splitstream ) + { string s; if ( !getline(splitstream, s, separator[0])) break; ifields[s] = pos; pos++; - } + } //printf("Updating fields from description %s\n", line.c_str()); columnMap.clear(); - for ( unsigned int i = 0; i < num_fields; i++ ) { + for ( unsigned int i = 0; i < num_fields; i++ ) + { const Field* field = fields[i]; map::iterator fit = ifields.find(field->name); - if ( fit == ifields.end() ) { - if ( field->optional ) { + if ( fit == ifields.end() ) + { + if ( field->optional ) + { // we do not really need this field. mark it as not present and always send an undef back. FieldMapping f(field->name, field->type, field->subtype, -1); f.present = false; columnMap.push_back(f); continue; - } + } Error(Fmt("Did not find requested field %s in input data file %s.", field->name.c_str(), fname.c_str())); return false; - } + } FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); - if ( field->secondary_name != "" ) { + if ( field->secondary_name != "" ) + { map::iterator fit2 = ifields.find(field->secondary_name); - if ( fit2 == ifields.end() ) { + if ( fit2 == ifields.end() ) + { Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str())); return false; - } + } f.secondary_position = ifields[field->secondary_name]; - } + } columnMap.push_back(f); - } + } // well, that seems to have worked... return true; -} + } -bool Ascii::GetLine(string& str) { - while ( getline(*file, str) ) { - if ( str[0] != '#' ) { +bool Ascii::GetLine(string& str) + { + while ( getline(*file, str) ) + { + if ( str[0] != '#' ) return true; - } - if ( str.compare(0,8, "#fields\t") == 0 ) { + if ( str.compare(0,8, "#fields\t") == 0 ) + { str = str.substr(8); return true; + } } - } return false; -} - - -Value* Ascii::EntryToVal(string s, FieldMapping field) { - - if ( s.compare(unset_field) == 0 ) { // field is not set... - return new Value(field.type, false); } + +Value* Ascii::EntryToVal(string s, FieldMapping field) + { + + if ( s.compare(unset_field) == 0 ) // field is not set... + return new Value(field.type, false); + Value* val = new Value(field.type, true); switch ( field.type ) { @@ -220,14 +235,15 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { break; case TYPE_BOOL: - if ( s == "T" ) { + if ( s == "T" ) val->val.int_val = 1; - } else if ( s == "F" ) { + else if ( s == "F" ) val->val.int_val = 0; - } else { + else + { Error(Fmt("Field: %s Invalid value for boolean: %s", field.name.c_str(), s.c_str())); return false; - } + } break; case TYPE_INT: @@ -250,7 +266,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { val->val.port_val.proto = TRANSPORT_UNKNOWN; break; - case TYPE_SUBNET: { + case TYPE_SUBNET: + { size_t pos = s.find("/"); if ( pos == s.npos ) { Error(Fmt("Invalid value for subnet: %s", s.c_str())); @@ -261,8 +278,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { val->val.subnet_val.prefix = StringToAddr(addr); val->val.subnet_val.length = width; - } break; + } case TYPE_ADDR: val->val.addr_val = StringToAddr(s); @@ -287,47 +304,56 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { Value** lvals = new Value* [length]; - if ( field.type == TYPE_TABLE ) { + if ( field.type == TYPE_TABLE ) + { val->val.set_val.vals = lvals; val->val.set_val.size = length; - } else if ( field.type == TYPE_VECTOR ) { + } + else if ( field.type == TYPE_VECTOR ) + { val->val.vector_val.vals = lvals; val->val.vector_val.size = length; - } else { + } + else + { assert(false); - } + } if ( length == 0 ) break; //empty istringstream splitstream(s); - while ( splitstream ) { + while ( splitstream ) + { string element; if ( !getline(splitstream, element, set_separator[0]) ) break; - if ( pos >= length ) { - Error(Fmt("Internal error while parsing set. pos %d >= length %d. Element: %s", pos, length, element.c_str())); + if ( pos >= length ) + { + Error(Fmt("Internal error while parsing set. pos %d >= length %d." + " Element: %s", pos, length, element.c_str())); break; - } + } Value* newval = EntryToVal(element, field.subType()); - if ( newval == 0 ) { + if ( newval == 0 ) + { Error("Error while reading set"); return 0; - } + } lvals[pos] = newval; pos++; - - } + } - if ( pos != length ) { + if ( pos != length ) + { Error("Internal error while parsing set: did not find all elements"); return 0; - } + } break; } @@ -340,24 +366,23 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { } return val; - -} + } // read the entire file and send appropriate thingies back to InputMgr -bool Ascii::DoUpdate() { +bool Ascii::DoUpdate() + { switch ( mode ) { case REREAD: // check if the file has changed struct stat sb; - if ( stat(fname.c_str(), &sb) == -1 ) { + if ( stat(fname.c_str(), &sb) == -1 ) + { Error(Fmt("Could not get stat for %s", fname.c_str())); return false; - } + } - if ( sb.st_mtime <= mtime ) { - // no change + if ( sb.st_mtime <= mtime ) // no change return true; - } mtime = sb.st_mtime; // file changed. reread. @@ -366,57 +391,56 @@ bool Ascii::DoUpdate() { case MANUAL: case STREAM: - // dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad) - if ( file && file->is_open() ) { - if ( mode == STREAM ) { + // dirty, fix me. (well, apparently after trying seeking, etc + // - this is not that bad) + if ( file && file->is_open() ) + { + if ( mode == STREAM ) + { file->clear(); // remove end of file evil bits if ( !ReadHeader(true) ) // in case filters changed - { return false; // header reading failed - } + break; - } + } file->close(); - } + } file = new ifstream(fname.c_str()); - if ( !file->is_open() ) { + if ( !file->is_open() ) + { Error(Fmt("cannot open %s", fname.c_str())); return false; - } + } - if ( ReadHeader(false) == false ) { + if ( ReadHeader(false) == false ) + { return false; - } + } break; default: assert(false); - } - - - // - - // file->seekg(0, ios::beg); // do not forget clear. - - + } string line; - while ( GetLine(line ) ) { + while ( GetLine(line ) ) + { // split on tabs istringstream splitstream(line); map stringfields; int pos = 0; - while ( splitstream ) { + while ( splitstream ) + { string s; if ( !getline(splitstream, s, separator[0]) ) break; stringfields[pos] = s; pos++; - } + } pos--; // for easy comparisons of max element. @@ -426,69 +450,60 @@ bool Ascii::DoUpdate() { int fpos = 0; for ( vector::iterator fit = columnMap.begin(); fit != columnMap.end(); - fit++ ){ + fit++ ) + { - if ( ! fit->present ) { + if ( ! fit->present ) + { // add non-present field fields[fpos] = new Value((*fit).type, false); fpos++; continue; - } + } assert(fit->position >= 0 ); - if ( (*fit).position > pos || (*fit).secondary_position > pos ) { + if ( (*fit).position > pos || (*fit).secondary_position > pos ) + { Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); return false; - } + } Value* val = EntryToVal(stringfields[(*fit).position], *fit); - if ( val == 0 ) { + if ( val == 0 ) + { Error("Could not convert String value to Val"); return false; - } + } - if ( (*fit).secondary_position != -1 ) { + if ( (*fit).secondary_position != -1 ) + { // we have a port definition :) assert(val->type == TYPE_PORT ); // Error(Fmt("Got type %d != PORT with secondary position!", val->type)); val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]); - } + } fields[fpos] = val; fpos++; - } + } //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); assert ( (unsigned int) fpos == num_fields ); - if ( mode == STREAM ) { + if ( mode == STREAM ) Put(fields); - } else { + else SendEntry(fields); } - /* Do not do this, ownership changes to other thread - * for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { - delete fields[i]; - } - delete [] fields; - */ - - } - - - //file->clear(); // remove end of file evil bits - //file->seekg(0, ios::beg); // and seek to start. - - if ( mode != STREAM ) { + if ( mode != STREAM ) EndCurrentSend(); - } - + return true; -} + } bool Ascii::DoHeartbeat(double network_time, double current_time) { @@ -500,12 +515,13 @@ bool Ascii::DoHeartbeat(double network_time, double current_time) break; case REREAD: case STREAM: - Update(); // call update and not DoUpdate, because update actually checks disabled. + Update(); // call update and not DoUpdate, because update + // checks disabled. break; default: assert(false); } return true; -} + } diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index deff2b038d..29f0070fec 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -22,7 +22,7 @@ using threading::Field; Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) -{ + { multiplication_factor = double(BifConst::InputBenchmark::factor); autospread = double(BifConst::InputBenchmark::autospread); spread = int(BifConst::InputBenchmark::spread); @@ -32,19 +32,19 @@ Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) timedspread = double(BifConst::InputBenchmark::timedspread); heart_beat_interval = double(BifConst::Threading::heart_beat_interval); -} + } Benchmark::~Benchmark() -{ + { DoClose(); -} + } void Benchmark::DoClose() -{ -} + { + } bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) -{ + { mode = arg_mode; num_fields = arg_num_fields; @@ -54,18 +54,20 @@ bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Fiel if ( autospread != 0.0 ) autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { + if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + { Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); return false; - } + } heartbeatstarttime = CurrTime(); DoUpdate(); return true; -} + } -string Benchmark::RandomString(const int len) { +string Benchmark::RandomString(const int len) + { string s(len, ' '); static const char values[] = @@ -73,65 +75,65 @@ string Benchmark::RandomString(const int len) { "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; - for (int i = 0; i < len; ++i) { + for (int i = 0; i < len; ++i) s[i] = values[rand() / (RAND_MAX / sizeof(values))]; - } return s; -} + } -double Benchmark::CurrTime() { +double Benchmark::CurrTime() + { struct timeval tv; assert ( gettimeofday(&tv, 0) >= 0 ); return double(tv.tv_sec) + double(tv.tv_usec) / 1e6; -} + } // read the entire file and send appropriate thingies back to InputMgr -bool Benchmark::DoUpdate() { +bool Benchmark::DoUpdate() + { int linestosend = num_lines * heart_beat_interval; - for ( int i = 0; i < linestosend; i++ ) { + for ( int i = 0; i < linestosend; i++ ) + { Value** field = new Value*[num_fields]; - for (unsigned int j = 0; j < num_fields; j++ ) { + for (unsigned int j = 0; j < num_fields; j++ ) field[j] = EntryToVal(fields[j]->type, fields[j]->subtype); - } - if ( mode == STREAM ) { + if ( mode == STREAM ) // do not do tracking, spread out elements over the second that we have... Put(field); - } else { + else SendEntry(field); - } - if ( stopspreadat == 0 || num_lines < stopspreadat ) { + if ( stopspreadat == 0 || num_lines < stopspreadat ) + { if ( spread != 0 ) usleep(spread); if ( autospread_time != 0 ) usleep( autospread_time ); - } + } - if ( timedspread != 0.0 ) { + if ( timedspread != 0.0 ) + { double diff; - do { + do diff = CurrTime() - heartbeatstarttime; - //printf("%d %f\n", i, diff); - //} while ( diff < i/threading::Manager::HEART_BEAT_INTERVAL*(num_lines + (num_lines * timedspread) ) ); - } while ( diff/heart_beat_interval < i/(linestosend + (linestosend * timedspread) ) ); - //} while ( diff < 0.8); - } + while ( diff/heart_beat_interval < i/(linestosend + + (linestosend * timedspread) ) ); + } } - if ( mode != STREAM ) { + if ( mode != STREAM ) EndCurrentSend(); - } return true; } -threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { +threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) + { Value* val = new Value(type, true); // basically construct something random from the fields that we want. @@ -170,7 +172,8 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { val->val.port_val.proto = TRANSPORT_UNKNOWN; break; - case TYPE_SUBNET: { + case TYPE_SUBNET: + { val->val.subnet_val.prefix = StringToAddr("192.168.17.1"); val->val.subnet_val.length = 16; } @@ -192,28 +195,32 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { Value** lvals = new Value* [length]; - if ( type == TYPE_TABLE ) { + if ( type == TYPE_TABLE ) + { val->val.set_val.vals = lvals; val->val.set_val.size = length; - } else if ( type == TYPE_VECTOR ) { + } + else if ( type == TYPE_VECTOR ) + { val->val.vector_val.vals = lvals; val->val.vector_val.size = length; - } else { + } + else assert(false); - } if ( length == 0 ) break; //empty - for ( unsigned int pos = 0; pos < length; pos++ ) { - + for ( unsigned int pos = 0; pos < length; pos++ ) + { Value* newval = EntryToVal(subtype, TYPE_ENUM); - if ( newval == 0 ) { + if ( newval == 0 ) + { Error("Error while reading set"); return 0; - } + } lvals[pos] = newval; - } + } break; } @@ -226,20 +233,11 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { return val; -} + } bool Benchmark::DoHeartbeat(double network_time, double current_time) { - /* - * This does not work the way I envisioned it, because the queueing is the problem. - printf("%f\n", CurrTime() - current_time); - if ( CurrTime() - current_time > 0.25 ) { - // event has hung for a time. refuse. - SendEvent("EndBenchmark", 0, 0); - return true; - } */ - ReaderBackend::DoHeartbeat(network_time, current_time); num_lines = (int) ( (double) num_lines*multiplication_factor); num_lines += add; @@ -251,7 +249,8 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) break; case REREAD: case STREAM: - if ( multiplication_factor != 1 || add != 0 ) { + if ( multiplication_factor != 1 || add != 0 ) + { // we have to document at what time we changed the factor to what value. Value** v = new Value*[2]; v[0] = new Value(TYPE_COUNT, true); @@ -260,12 +259,11 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) v[1]->val.double_val = CurrTime(); SendEvent("lines_changed", 2, v); - } + } - if ( autospread != 0.0 ) { - autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); + if ( autospread != 0.0 ) // because executing this in every loop is apparently too expensive. - } + autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); Update(); // call update and not DoUpdate, because update actually checks disabled. diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index f656be769c..43c782de29 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -24,79 +24,86 @@ using threading::Value; using threading::Field; Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) -{ + { file = 0; in = 0; - //keyMap = new map(); - separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); - if ( separator.size() != 1 ) { + if ( separator.size() != 1 ) Error("separator length has to be 1. Separator will be truncated."); } -} - Raw::~Raw() -{ + { DoClose(); -} + } void Raw::DoClose() -{ - if ( file != 0 ) { + { + if ( file != 0 ) + { Close(); + } } -} bool Raw::Open() -{ - if ( execute ) { + { + if ( execute ) + { file = popen(fname.c_str(), "r"); - if ( file == NULL ) { + if ( file == NULL ) + { Error(Fmt("Could not execute command %s", fname.c_str())); return false; + } } - } else { + else + { file = fopen(fname.c_str(), "r"); - if ( file == NULL ) { + if ( file == NULL ) + { Error(Fmt("Init: cannot open %s", fname.c_str())); return false; + } } - } in = new boost::fdistream(fileno(file)); - if ( execute && mode == STREAM ) { + if ( execute && mode == STREAM ) + { fcntl(fileno(file), F_SETFL, O_NONBLOCK); - } + } return true; -} + } bool Raw::Close() -{ - if ( file == NULL ) { + { + if ( file == NULL ) + { InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); return false; - } + } - if ( execute ) { + if ( execute ) + { delete(in); pclose(file); - } else { + } + else + { delete(in); fclose(file); - } + } in = NULL; file = NULL; return true; -} + } bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) -{ + { fname = path; mode = arg_mode; mtime = 0; @@ -107,24 +114,30 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con num_fields = arg_num_fields; fields = arg_fields; - if ( path.length() == 0 ) { + if ( path.length() == 0 ) + { Error("No source path provided"); return false; - } + } - if ( arg_num_fields != 1 ) { - Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored."); + if ( arg_num_fields != 1 ) + { + Error("Filter for raw reader contains more than one field. " + "Filters for the raw reader may only contain exactly one string field. " + "Filter ignored."); return false; - } + } - if ( fields[0]->type != TYPE_STRING ) { + if ( fields[0]->type != TYPE_STRING ) + { Error("Filter for raw reader contains a field that is not of type string."); return false; - } + } // do Initialization char last = path[path.length()-1]; - if ( last == '|' ) { + if ( last == '|' ) + { execute = true; fname = path.substr(0, fname.length() - 1); @@ -137,19 +150,17 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con } else { execute = false; - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { + if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) + { Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str())); return false; - } + } result = Open(); + } - } - - if ( result == false ) { + if ( result == false ) return result; - } - #ifdef DEBUG Debug(DBG_INPUT, "Raw reader created, will perform first update"); @@ -162,62 +173,68 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con Debug(DBG_INPUT, "First update went through"); #endif return true; -} - - -bool Raw::GetLine(string& str) { - while ( getline(*in, str, separator[0]) ) { - return true; } + +bool Raw::GetLine(string& str) + { + while ( getline(*in, str, separator[0]) ) + return true; + return false; -} + } // read the entire file and send appropriate thingies back to InputMgr -bool Raw::DoUpdate() { - if ( firstrun ) { +bool Raw::DoUpdate() + { + if ( firstrun ) firstrun = false; - } else { + else + { switch ( mode ) { case REREAD: + { // check if the file has changed struct stat sb; - if ( stat(fname.c_str(), &sb) == -1 ) { + if ( stat(fname.c_str(), &sb) == -1 ) + { Error(Fmt("Could not get stat for %s", fname.c_str())); return false; - } + } - if ( sb.st_mtime <= mtime ) { + if ( sb.st_mtime <= mtime ) // no change return true; - } mtime = sb.st_mtime; // file changed. reread. // fallthrough + } case MANUAL: case STREAM: - if ( mode == STREAM && file != NULL && in != NULL ) { + if ( mode == STREAM && file != NULL && in != NULL ) + { //fpurge(file); in->clear(); // remove end of file evil bits break; - } + } Close(); - if ( !Open() ) { + if ( !Open() ) return false; - } + break; default: assert(false); } - } + } string line; - while ( GetLine(line) ) { + while ( GetLine(line) ) + { assert (num_fields == 1); Value** fields = new Value*[1]; @@ -228,14 +245,14 @@ bool Raw::DoUpdate() { fields[0] = val; Put(fields); - } + } return true; -} + } bool Raw::DoHeartbeat(double network_time, double current_time) -{ + { ReaderBackend::DoHeartbeat(network_time, current_time); switch ( mode ) { @@ -244,12 +261,12 @@ bool Raw::DoHeartbeat(double network_time, double current_time) break; case REREAD: case STREAM: - Update(); // call update and not DoUpdate, because update actually checks disabled. + Update(); // call update and not DoUpdate, because update + // checks disabled. break; default: assert(false); } return true; -} - + }