From b3f01915fbca99ddda434ae791b143150d27fcf7 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Sun, 20 Nov 2011 12:07:50 -0800 Subject: [PATCH] compiles with basic new filter framework - but crashes on use. --- scripts/base/frameworks/input/main.bro | 16 +- src/InputMgr.cc | 255 +++++++++++------------- src/InputMgr.h | 2 +- src/InputReader.cc | 14 +- src/InputReader.h | 3 +- src/InputReaderAscii.cc | 265 ++++++++++++------------- src/InputReaderAscii.h | 30 ++- 7 files changed, 283 insertions(+), 302 deletions(-) diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index d9c0812498..9d83d73ec6 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -14,9 +14,9 @@ export { name: string; ## for tables - idx: any &optional; - val: any &optional; - destination: any &optional; + idx: any; + val: any; + destination: any; want_record: bool &default=T; table_ev: any &optional; # event containing idx, val as values. @@ -25,13 +25,13 @@ export { pred: function(typ: Input::Event, left: any, right: any): bool &optional; ## for "normalized" events - ev: any &optional; - ev_description: any &optional; + # ev: any &optional; + # ev_description: any &optional; }; - const no_filter: Filter = [$name=""]; # Sentinel. + const no_filter: Filter = [$name="", $idx="", $val="", $destination=""]; # Sentinel. - global create_stream: function(id: Log::ID, description: Input::ReaderDescription) : bool; + global create_stream: function(id: Log::ID, description: Input::StreamDescription) : bool; global remove_stream: function(id: Log::ID) : bool; global force_update: function(id: Log::ID) : bool; global add_filter: function(id: Log::ID, filter: Input::Filter) : bool; @@ -47,7 +47,7 @@ module Input; global filters: table[ID, string] of Filter; -function create_stream(id: Log::ID, description: Input::ReaderDescription) : bool +function create_stream(id: Log::ID, description: Input::StreamDescription) : bool { return __create_stream(id, description); } diff --git a/src/InputMgr.cc b/src/InputMgr.cc index f9250f6f0f..403d656140 100644 --- a/src/InputMgr.cc +++ b/src/InputMgr.cc @@ -16,18 +16,20 @@ #include "CompHash.h" -class InputHash { -public: +struct InputHash { HashKey* valhash; HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal. }; declare(PDict, InputHash); -struct InputMgr::Filter { +class InputMgr::Filter { +public: EnumVal* id; string name; + //int filter_type; // to distinguish between event and table filters + unsigned int num_idx_fields; unsigned int num_val_fields; bool want_record; @@ -68,6 +70,8 @@ struct InputMgr::ReaderInfo { //list events; // events we fire when "something" happens map filters; // filters that can prevent our actions + bool HasFilter(int id); + ~ReaderInfo(); }; @@ -80,6 +84,15 @@ InputMgr::ReaderInfo::~ReaderInfo() { delete(reader); } +bool InputMgr::ReaderInfo::HasFilter(int id) { + map::iterator it = filters.find(id); + if ( it == filters.end() ) { + return false; + } + return true; +} + + struct InputReaderDefinition { bro_int_t type; // the type const char *name; // descriptive name for error messages @@ -168,12 +181,12 @@ InputReader* InputMgr::CreateStream(EnumVal* id, RecordVal* description) int success = reader_obj->Init(source); if ( success == false ) { - assert( RemoveReader(id) ); + assert( RemoveStream(id) ); return 0; } success = reader_obj->Update(); if ( success == false ) { - assert ( RemoveReader(id) ); + assert ( RemoveStream(id) ); return 0; } @@ -224,6 +237,7 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) { fields[i] = fieldsV[i]; } + // FIXME: remove those funky 0-tests again as the idea was changed. Filter filter; filter.name = name->AsString()->CheckString(); filter.id = id->Ref()->AsEnumVal(); @@ -231,8 +245,8 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) { filter.num_idx_fields = idxfields; filter.num_val_fields = valfields; filter.tab = dst ? dst->Ref()->AsTableVal() : 0; - filter.rtype = rtype ? val->Ref()->AsRecordType() : 0; - filter.itype = itype ? idx->Ref()->AsRecordType() : 0; + filter.rtype = val ? val->Ref()->AsRecordType() : 0; + filter.itype = idx ? idx->Ref()->AsRecordType() : 0; // ya - well - we actually don't need them in every case... well, a few bytes of memory wasted filter.currDict = new PDict(InputHash); filter.lastDict = new PDict(InputHash); @@ -240,22 +254,11 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) { Unref(want_record); // ref'd by lookupwithdefault if ( valfields > 1 ) { - assert(info->want_record); + assert(filter.want_record); } i->filters[id->InternalInt()] = filter; - - // ok, now we have to alert the reader of our new filter with our funky new fields - // the id is handled in a ... well, to be honest, a little bit sneaky manner. - // the "problem" is, that we can have several filters in the reader for one filter in the log manager. - // this is due to the fact, that a filter can either output it's result as a table, as an event... - // ...or as an table _and_ an event. And... if we have a table and an event, we actually need two different sets - // of filters in the reader, because the fields for the table and the event may differ and I absolutely do not want - // to build a union of these values and figure it out later. - // hence -> filter id is multiplicated with 2. - // filterId*2 -> results for table - // filterId*2+1 -> results for event - i->AddFilter( id->InternalInt() * 2, fieldsV.size(), idxfields, fields ); + i->reader->AddFilter( id->InternalInt(), fieldsV.size(), fields ); return true; } @@ -387,31 +390,15 @@ bool InputMgr::RemoveFilter(EnumVal* id, const string &name) { return false; } -/* - std::list::iterator it = i->filters.begin(); - while ( it != i->filters.end() ) - { - if ( (*it).name == name ) { - it = i->filters.erase(it); - return true; - break; - } - else - ++it; - } - */ - map::iterator it = i->filters.find(id->InternalInt()); if ( it == i->filters.end() ) { return false; } - it->filters.erase(it); + i->filters.erase(it); return true; } - - Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) { Val* idxval; int position = 0; @@ -449,27 +436,28 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const bool updated = false; + assert(i->HasFilter(id)); //reporter->Error("Hashing %d index fields", i->num_idx_fields); - HashKey* idxhash = HashLogVals(i->num_idx_fields, vals); + HashKey* idxhash = HashLogVals(i->filters[id].num_idx_fields, vals); //reporter->Error("Result: %d", (uint64_t) idxhash->Hash()); //reporter->Error("Hashing %d val fields", i->num_val_fields); - HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields); + HashKey* valhash = HashLogVals(i->filters[id].num_val_fields, vals+i->filters[id].num_idx_fields); //reporter->Error("Result: %d", (uint64_t) valhash->Hash()); //reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash()); - InputHash *h = i->lastDict->Lookup(idxhash); + InputHash *h = i->filters[id].lastDict->Lookup(idxhash); if ( h != 0 ) { // seen before if ( h->valhash->Hash() == valhash->Hash() ) { // ok, double. - i->lastDict->Remove(idxhash); - i->currDict->Insert(idxhash, h); + i->filters[id].lastDict->Remove(idxhash); + i->filters[id].currDict->Insert(idxhash, h); return; } else { // updated - i->lastDict->Remove(idxhash); + i->filters[id].lastDict->Remove(idxhash); delete(h); updated = true; @@ -477,27 +465,22 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const } - Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); + Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); Val* valval; - int position = i->num_idx_fields; - if ( i->num_val_fields == 1 && !i->want_record ) { - valval = LogValToVal(vals[i->num_idx_fields], i->rtype->FieldType(i->num_idx_fields)); + int position = i->filters[id].num_idx_fields; + if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) { + valval = LogValToVal(vals[i->filters[id].num_idx_fields], i->filters[id].rtype->FieldType(i->filters[id].num_idx_fields)); } else { - RecordVal * r = new RecordVal(i->rtype); + RecordVal * r = new RecordVal(i->filters[id].rtype); - /* if ( i->rtype->NumFields() != (int) i->num_val_fields ) { - reporter->InternalError("Type mismatch"); - return; - } */ - - for ( int j = 0; j < i->rtype->NumFields(); j++) { + for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) { Val* val = 0; - if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) { - val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position); + if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) { + val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position); } else { - val = LogValToVal(vals[position], i->rtype->FieldType(j)); + val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j)); position++; } @@ -516,17 +499,12 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const Val* oldval = 0; if ( updated == true ) { // in that case, we need the old value to send the event (if we send an event). - oldval = i->tab->Lookup(idxval); + oldval = i->filters[id].tab->Lookup(idxval); } - // call filters first do determine if we really add / change the entry - std::list::iterator it = i->filters.begin(); - while ( it != i->filters.end() ) { - if (! (*it).pred ) { - continue; - } - + // call filter first to determine if we really add / change the entry + if ( i->filters[id].pred ) { EnumVal* ev; Ref(idxval); Ref(valval); @@ -541,44 +519,45 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const vl.append(ev); vl.append(idxval); vl.append(valval); - Val* v = (*it).pred->Call(&vl); + Val* v = i->filters[id].pred->Call(&vl); bool result = v->AsBool(); Unref(v); if ( result == false ) { if ( !updated ) { // throw away. Hence - we quit. And remove the entry from the current dictionary... - delete(i->currDict->RemoveEntry(idxhash)); + delete(i->filters[id].currDict->RemoveEntry(idxhash)); return; } else { // keep old one - i->currDict->Insert(idxhash, h); + i->filters[id].currDict->Insert(idxhash, h); return; } } - ++it; } //i->tab->Assign(idxval, valval); - HashKey* k = i->tab->ComputeHash(idxval); + HashKey* k = i->filters[id].tab->ComputeHash(idxval); if ( !k ) { reporter->InternalError("could not hash"); return; } - i->tab->Assign(idxval, k, valval); + i->filters[id].tab->Assign(idxval, k, valval); InputHash* ih = new InputHash(); - k = i->tab->ComputeHash(idxval); + k = i->filters[id].tab->ComputeHash(idxval); ih->idxkey = k; ih->valhash = valhash; //i->tab->Delete(k); - i->currDict->Insert(idxhash, ih); + i->filters[id].currDict->Insert(idxhash, ih); // send events now that we are kind of finished. + + /* FIXME: fix me. std::list::iterator filter_iterator = i->events.begin(); while ( filter_iterator != i->events.end() ) { EnumVal* ev; @@ -597,7 +576,7 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const ++filter_iterator; - } + } */ } @@ -607,86 +586,74 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) { reporter->InternalError("Unknown reader"); return; } + + assert(i->HasFilter(id)); + // lastdict contains all deleted entries and should be empty apart from that - IterCookie *c = i->lastDict->InitForIteration(); - i->lastDict->MakeRobustCookie(c); + IterCookie *c = i->filters[id].lastDict->InitForIteration(); + i->filters[id].lastDict->MakeRobustCookie(c); InputHash* ih; HashKey *lastDictIdxKey; //while ( ( ih = i->lastDict->NextEntry(c) ) ) { - while ( ( ih = i->lastDict->NextEntry(lastDictIdxKey, c) ) ) { - - if ( i->events.size() != 0 || i->filters.size() != 0 ) // we have a filter or an event - { + while ( ( ih = i->filters[id].lastDict->NextEntry(lastDictIdxKey, c) ) ) { - ListVal *idx = i->tab->RecoverIndex(ih->idxkey); + if ( i->filters[id].pred ) { + ListVal *idx = i->filters[id].tab->RecoverIndex(ih->idxkey); assert(idx != 0); - Val *val = i->tab->Lookup(idx); + Val *val = i->filters[id].tab->Lookup(idx); assert(val != 0); - { - bool doBreak = false; - // ask filter, if we want to expire this element... - std::list::iterator it = i->filters.begin(); - while ( it != i->filters.end() ) { - if (! (*it).pred ) { - continue; - } + bool doBreak = false; + // ask predicate, if we want to expire this element... - EnumVal* ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - Ref(idx); - Ref(val); + EnumVal* ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + Ref(idx); + Ref(val); - val_list vl(3); - vl.append(ev); - vl.append(idx); - vl.append(val); - Val* v = (*it).pred->Call(&vl); - bool result = v->AsBool(); - Unref(v); - - ++it; - - if ( result == false ) { - // Keep it. Hence - we quit and simply go to the next entry of lastDict - // ah well - and we have to add the entry to currDict... - i->currDict->Insert(lastDictIdxKey, i->lastDict->RemoveEntry(lastDictIdxKey)); - doBreak = true; - continue; - } - - } - - if ( doBreak ) { - continue; - } + val_list vl(3); + vl.append(ev); + vl.append(idx); + vl.append(val); + Val* v = i->filters[id].pred->Call(&vl); + bool result = v->AsBool(); + Unref(v); + + if ( result == false ) { + // Keep it. Hence - we quit and simply go to the next entry of lastDict + // ah well - and we have to add the entry to currDict... + i->filters[id].currDict->Insert(lastDictIdxKey, i->filters[id].lastDict->RemoveEntry(lastDictIdxKey)); + continue; } - + + // { - std::list::iterator it = i->events.begin(); - while ( it != i->events.end() ) { + /* FIXME: events + std::list::iterator it = i->filters[id].events.begin(); + while ( it != i->filters[id].events.end() ) { Ref(idx); Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); SendEvent(*it, ev, idx, val); ++it; } + */ } } - i->tab->Delete(ih->idxkey); - i->lastDict->Remove(lastDictIdxKey); // deletex in next line + i->filters[id].tab->Delete(ih->idxkey); + i->filters[id].lastDict->Remove(lastDictIdxKey); // deletex in next line delete(ih); } - i->lastDict->Clear(); // should be empty... but... well... who knows... - delete(i->lastDict); + i->filters[id].lastDict->Clear(); // should be empty... but... well... who knows... + delete(i->filters[id].lastDict); - i->lastDict = i->currDict; - i->currDict = new PDict(InputHash); + i->filters[id].lastDict = i->filters[id].currDict; + i->filters[id].currDict = new PDict(InputHash); } void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) { @@ -696,22 +663,24 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) return; } - Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); + assert(i->HasFilter(id)); + + Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); Val* valval; - int position = i->num_idx_fields; - if ( i->num_val_fields == 1 && !i->want_record ) { - valval = LogValToVal(vals[i->num_idx_fields], i->rtype->FieldType(i->num_idx_fields)); + int position = i->filters[id].num_idx_fields; + if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) { + valval = LogValToVal(vals[i->filters[id].num_idx_fields], i->filters[id].rtype->FieldType(i->filters[id].num_idx_fields)); } else { - RecordVal * r = new RecordVal(i->rtype); + RecordVal * r = new RecordVal(i->filters[id].rtype); - for ( int j = 0; j < i->rtype->NumFields(); j++) { + for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) { Val* val = 0; - if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) { - val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position); + if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) { + val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position); } else { - val = LogValToVal(vals[position], i->rtype->FieldType(j)); + val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j)); position++; } @@ -726,7 +695,7 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) valval = r; } - i->tab->Assign(idxval, valval); + i->filters[id].tab->Assign(idxval, valval); } void InputMgr::Clear(const InputReader* reader, int id) { @@ -735,20 +704,24 @@ void InputMgr::Clear(const InputReader* reader, int id) { reporter->InternalError("Unknown reader"); return; } - - i->tab->RemoveAll(); + + assert(i->HasFilter(id)); + + i->filters[id].tab->RemoveAll(); } -bool InputMgr::Delete(const InputReader* reader, const LogVal* const *vals) { +bool InputMgr::Delete(const InputReader* reader, int id, const LogVal* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); return false; } - - Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); - return ( i->tab->Delete(idxval) != 0 ); + assert(i->HasFilter(id)); + + Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); + + return ( i->filters[id].tab->Delete(idxval) != 0 ); } void InputMgr::Error(InputReader* reader, const char* msg) diff --git a/src/InputMgr.h b/src/InputMgr.h index 1cacf89143..5d531cd6fc 100644 --- a/src/InputMgr.h +++ b/src/InputMgr.h @@ -35,7 +35,7 @@ protected: void Error(InputReader* reader, const char* msg); // for readers to write to input stream in direct mode (reporting new/deleted values directly) - void Put(const InputReader* reader, int id. const LogVal* const *vals); + void Put(const InputReader* reader, int id, const LogVal* const *vals); void Clear(const InputReader* reader, int id); bool Delete(const InputReader* reader, int id, const LogVal* const *vals); diff --git a/src/InputReader.cc b/src/InputReader.cc index 1008cf1b67..1c65985fd6 100644 --- a/src/InputReader.cc +++ b/src/InputReader.cc @@ -26,17 +26,17 @@ void InputReader::Error(const string &msg) void InputReader::Put(int id, const LogVal* const *val) { - input_mgr->Put(this, int id, val); + input_mgr->Put(this, id, val); } void InputReader::Clear(int id) { - input_mgr->Clear(this, int id); + input_mgr->Clear(this, id); } void InputReader::Delete(int id, const LogVal* const *val) { - input_mgr->Delete(this, int id, val); + input_mgr->Delete(this, id, val); } @@ -52,12 +52,12 @@ bool InputReader::Init(string arg_source) bool InputReader::AddFilter(int id, int arg_num_fields, const LogField* const * arg_fields) { - return DoAddFilter(int id, arg_num_fields, arg_fields); + return DoAddFilter(id, arg_num_fields, arg_fields); } bool InputReader::RemoveFilter(int id) { - return DoRemoveFilter(int id); + return DoRemoveFilter(id); } void InputReader::Finish() @@ -105,10 +105,10 @@ const char* InputReader::Fmt(const char* format, ...) void InputReader::SendEntry(int id, const LogVal* const *vals) { - input_mgr->SendEntry(this, int id, vals); + input_mgr->SendEntry(this, id, vals); } void InputReader::EndCurrentSend(int id) { - input_mgr->EndCurrentSend(this, int id); + input_mgr->EndCurrentSend(this, id); } diff --git a/src/InputReader.h b/src/InputReader.h index 12f0bc9db4..6e3d689750 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -30,7 +30,8 @@ protected: virtual bool DoInit(string arg_sources) = 0; virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0; - virtual bool DoRemoveFilter( int id ); + + virtual bool DoRemoveFilter( int id ) = 0; virtual void DoFinish() = 0; diff --git a/src/InputReaderAscii.cc b/src/InputReaderAscii.cc index 4a0d4157bc..84feb74e61 100644 --- a/src/InputReaderAscii.cc +++ b/src/InputReaderAscii.cc @@ -28,6 +28,7 @@ FieldMapping FieldMapping::subType() { return FieldMapping(name, subtype, position); } + InputReaderAscii::InputReaderAscii() { file = 0; @@ -58,7 +59,7 @@ InputReaderAscii::~InputReaderAscii() void InputReaderAscii::DoFinish() { - columnMap.empty(); + filters.empty(); if ( file != 0 ) { file->close(); delete(file); @@ -66,7 +67,7 @@ void InputReaderAscii::DoFinish() } } -bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const LogField* const * fields) +bool InputReaderAscii::DoInit(string path) { fname = path; @@ -76,11 +77,39 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const return false; } + return true; +} - this->num_fields = num_fields; - this->idx_fields = idx_fields; - this->fields = fields; +bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) { + if ( HasFilter(id) ) { + return false; // no, we don't want to add this a second time + } + Filter f; + f.num_fields = arg_num_fields; + f.fields = fields; + + filters[id] = f; + + return true; +} + +bool InputReaderAscii::DoRemoveFilter ( int id ) { + if (!HasFilter(id) ) { + return false; + } + + assert ( filters.erase(id) == 1 ); + + return true; +} + + +bool InputReaderAscii::HasFilter(int id) { + map::iterator it = filters.find(id); + if ( it == filters.end() ) { + return false; + } return true; } @@ -93,46 +122,47 @@ bool InputReaderAscii::ReadHeader() { return false; } - // split on tabs... - istringstream splitstream(line); - unsigned int currTab = 0; - int wantFields = 0; - while ( splitstream ) { - string s; - if ( !getline(splitstream, s, separator[0])) - break; - - // current found heading in s... compare if we want it - for ( unsigned int i = 0; i < num_fields; i++ ) { - const LogField* field = fields[i]; - if ( field->name == s ) { - // cool, found field. note position - FieldMapping f(field->name, field->type, field->subtype, i); - columnMap.push_back(f); - wantFields++; - break; // done with searching + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + // split on tabs... + istringstream splitstream(line); + unsigned int currTab = 0; + int wantFields = 0; + while ( splitstream ) { + string s; + if ( !getline(splitstream, s, separator[0])) + break; + + // current found heading in s... compare if we want it + for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { + const LogField* field = (*it).second.fields[i]; + if ( field->name == s ) { + // cool, found field. note position + FieldMapping f(field->name, field->type, field->subtype, i); + (*it).second.columnMap.push_back(f); + wantFields++; + break; // done with searching + } } + + // look if we did push something... + if ( (*it).second.columnMap.size() == currTab ) { + // no, we didn't. note that... + FieldMapping empty; + (*it).second.columnMap.push_back(empty); + } + + // done + currTab++; + } + + if ( wantFields != (int) (*it).second.num_fields ) { + // we did not find all fields? + // :( + Error(Fmt("One of the requested fields could not be found in the input data file. Found %d fields, wanted %d. Filternum: %d", wantFields, (*it).second.num_fields, (*it).first)); + return false; } - - // look if we did push something... - if ( columnMap.size() == currTab ) { - // no, we didn't. note that... - FieldMapping empty; - columnMap.push_back(empty); - } - - // done - currTab++; - } - - if ( wantFields != (int) num_fields ) { - // we did not find all fields? - // :( - Error(Fmt("One of the requested fields could not be found in the input data file. Found %d fields, wanted %d", wantFields, num_fields)); - return false; } - // well, that seems to have worked... return true; } @@ -314,110 +344,77 @@ bool InputReaderAscii::DoUpdate() { return false; } - // TODO: all the stuff we need for a second reading. - // *cough* - // - // - - - // new keymap - //map *newKeyMap = new map(); - string line; while ( GetLine(line ) ) { - // split on tabs + + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { - istringstream splitstream(line); - - LogVal** fields = new LogVal*[num_fields]; - //string string_fields[num_fields]; - - unsigned int currTab = 0; - unsigned int currField = 0; - while ( splitstream ) { - - string s; - if ( !getline(splitstream, s, separator[0]) ) - break; - + // split on tabs - if ( currTab >= columnMap.size() ) { - Error("Tabs in heading do not match tabs in data?"); - //disabled = true; - return false; - } - - FieldMapping currMapping = columnMap[currTab]; - currTab++; - - if ( currMapping.IsEmpty() ) { - // well, that was easy - continue; - } - - if ( currField >= num_fields ) { - Error("internal error - fieldnum greater as possible"); - return false; - } - - LogVal* val = EntryToVal(s, currMapping); - if ( val == 0 ) { - return false; - } - fields[currMapping.position] = val; - //string_fields[currMapping.position] = s; - - currField++; - } - - if ( currField != num_fields ) { - Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition."); - return false; - } - - - SendEntry(fields); - - /* - string indexstring = ""; - string valstring = ""; - for ( unsigned int i = 0; i < idx_fields; i++ ) { - indexstring.append(string_fields[i]); - } - - for ( unsigned int i = idx_fields; i < num_fields; i++ ) { - valstring.append(string_fields[i]); - } - - string valhash = Hash(valstring); - string indexhash = Hash(indexstring); - - if ( keyMap->find(indexhash) == keyMap->end() ) { - // new key - Put(fields); - } else if ( (*keyMap)[indexhash] != valhash ) { - // changed key - Put(fields); - keyMap->erase(indexhash); - } else { - // field not changed - keyMap->erase(indexhash); - } - - - (*newKeyMap)[indexhash] = valhash; - */ + istringstream splitstream(line); - for ( unsigned int i = 0; i < num_fields; i++ ) { - delete fields[i]; + LogVal** fields = new LogVal*[(*it).second.num_fields]; + //string string_fields[num_fields]; + + unsigned int currTab = 0; + unsigned int currField = 0; + while ( splitstream ) { + + string s; + if ( !getline(splitstream, s, separator[0]) ) + break; + + + if ( currTab >= (*it).second.columnMap.size() ) { + Error("Tabs in heading do not match tabs in data?"); + //disabled = true; + return false; + } + + FieldMapping currMapping = (*it).second.columnMap[currTab]; + currTab++; + + if ( currMapping.IsEmpty() ) { + // well, that was easy + continue; + } + + if ( currField >= (*it).second.num_fields ) { + Error("internal error - fieldnum greater as possible"); + return false; + } + + LogVal* val = EntryToVal(s, currMapping); + if ( val == 0 ) { + return false; + } + fields[currMapping.position] = val; + //string_fields[currMapping.position] = s; + + currField++; + } + + if ( currField != (*it).second.num_fields ) { + Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition."); + return false; + } + + + SendEntry((*it).first, fields); + + for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { + delete fields[i]; + } + delete [] fields; } - delete [] fields; } //file->clear(); // remove end of file evil bits //file->seekg(0, ios::beg); // and seek to start. - EndCurrentSend(); + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + EndCurrentSend((*it).first); + } return true; } diff --git a/src/InputReaderAscii.h b/src/InputReaderAscii.h index c848c17110..01169a3cfc 100644 --- a/src/InputReaderAscii.h +++ b/src/InputReaderAscii.h @@ -34,13 +34,30 @@ public: protected: - virtual bool DoInit(string path, int arg_num_fields, int arg_idx_fields, - const LogField* const * fields); + virtual bool DoInit(string path); + + virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ); + + virtual bool DoRemoveFilter ( int id ); + virtual void DoFinish(); virtual bool DoUpdate(); private: + + struct Filter { + unsigned int num_fields; + + const LogField* const * fields; // raw mapping + + // map columns in the file to columns to send back to the manager + vector columnMap; + + }; + + bool HasFilter(int id); + bool ReadHeader(); LogVal* EntryToVal(string s, FieldMapping type); @@ -49,15 +66,8 @@ private: ifstream* file; string fname; - unsigned int num_fields; - unsigned int idx_fields; + map filters; - // map columns in the file to columns to send back to the manager - vector columnMap; - const LogField* const * fields; // raw mapping - - //map *keyMap; - // // Options set from the script-level. string separator;