compiles with basic new filter framework - but crashes on use.

This commit is contained in:
Bernhard Amann 2011-11-20 12:07:50 -08:00
parent e2c521fc4e
commit b3f01915fb
7 changed files with 283 additions and 302 deletions

View file

@ -14,9 +14,9 @@ export {
name: string; name: string;
## for tables ## for tables
idx: any &optional; idx: any;
val: any &optional; val: any;
destination: any &optional; destination: any;
want_record: bool &default=T; want_record: bool &default=T;
table_ev: any &optional; # event containing idx, val as values. table_ev: any &optional; # event containing idx, val as values.
@ -25,13 +25,13 @@ export {
pred: function(typ: Input::Event, left: any, right: any): bool &optional; pred: function(typ: Input::Event, left: any, right: any): bool &optional;
## for "normalized" events ## for "normalized" events
ev: any &optional; # ev: any &optional;
ev_description: any &optional; # ev_description: any &optional;
}; };
const no_filter: Filter = [$name="<not found>"]; # Sentinel. const no_filter: Filter = [$name="<not found>", $idx="", $val="", $destination=""]; # Sentinel.
global create_stream: function(id: Log::ID, description: Input::ReaderDescription) : bool; global create_stream: function(id: Log::ID, description: Input::StreamDescription) : bool;
global remove_stream: function(id: Log::ID) : bool; global remove_stream: function(id: Log::ID) : bool;
global force_update: function(id: Log::ID) : bool; global force_update: function(id: Log::ID) : bool;
global add_filter: function(id: Log::ID, filter: Input::Filter) : bool; global add_filter: function(id: Log::ID, filter: Input::Filter) : bool;
@ -47,7 +47,7 @@ module Input;
global filters: table[ID, string] of Filter; global filters: table[ID, string] of Filter;
function create_stream(id: Log::ID, description: Input::ReaderDescription) : bool function create_stream(id: Log::ID, description: Input::StreamDescription) : bool
{ {
return __create_stream(id, description); return __create_stream(id, description);
} }

View file

@ -16,18 +16,20 @@
#include "CompHash.h" #include "CompHash.h"
class InputHash { struct InputHash {
public:
HashKey* valhash; HashKey* valhash;
HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal. HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal.
}; };
declare(PDict, InputHash); declare(PDict, InputHash);
struct InputMgr::Filter { class InputMgr::Filter {
public:
EnumVal* id; EnumVal* id;
string name; string name;
//int filter_type; // to distinguish between event and table filters
unsigned int num_idx_fields; unsigned int num_idx_fields;
unsigned int num_val_fields; unsigned int num_val_fields;
bool want_record; bool want_record;
@ -68,6 +70,8 @@ struct InputMgr::ReaderInfo {
//list<string> events; // events we fire when "something" happens //list<string> events; // events we fire when "something" happens
map<int, InputMgr::Filter> filters; // filters that can prevent our actions map<int, InputMgr::Filter> filters; // filters that can prevent our actions
bool HasFilter(int id);
~ReaderInfo(); ~ReaderInfo();
}; };
@ -80,6 +84,15 @@ InputMgr::ReaderInfo::~ReaderInfo() {
delete(reader); delete(reader);
} }
bool InputMgr::ReaderInfo::HasFilter(int id) {
map<int, InputMgr::Filter>::iterator it = filters.find(id);
if ( it == filters.end() ) {
return false;
}
return true;
}
struct InputReaderDefinition { struct InputReaderDefinition {
bro_int_t type; // the type bro_int_t type; // the type
const char *name; // descriptive name for error messages const char *name; // descriptive name for error messages
@ -168,12 +181,12 @@ InputReader* InputMgr::CreateStream(EnumVal* id, RecordVal* description)
int success = reader_obj->Init(source); int success = reader_obj->Init(source);
if ( success == false ) { if ( success == false ) {
assert( RemoveReader(id) ); assert( RemoveStream(id) );
return 0; return 0;
} }
success = reader_obj->Update(); success = reader_obj->Update();
if ( success == false ) { if ( success == false ) {
assert ( RemoveReader(id) ); assert ( RemoveStream(id) );
return 0; return 0;
} }
@ -224,6 +237,7 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) {
fields[i] = fieldsV[i]; fields[i] = fieldsV[i];
} }
// FIXME: remove those funky 0-tests again as the idea was changed.
Filter filter; Filter filter;
filter.name = name->AsString()->CheckString(); filter.name = name->AsString()->CheckString();
filter.id = id->Ref()->AsEnumVal(); filter.id = id->Ref()->AsEnumVal();
@ -231,8 +245,8 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) {
filter.num_idx_fields = idxfields; filter.num_idx_fields = idxfields;
filter.num_val_fields = valfields; filter.num_val_fields = valfields;
filter.tab = dst ? dst->Ref()->AsTableVal() : 0; filter.tab = dst ? dst->Ref()->AsTableVal() : 0;
filter.rtype = rtype ? val->Ref()->AsRecordType() : 0; filter.rtype = val ? val->Ref()->AsRecordType() : 0;
filter.itype = itype ? idx->Ref()->AsRecordType() : 0; filter.itype = idx ? idx->Ref()->AsRecordType() : 0;
// ya - well - we actually don't need them in every case... well, a few bytes of memory wasted // ya - well - we actually don't need them in every case... well, a few bytes of memory wasted
filter.currDict = new PDict(InputHash); filter.currDict = new PDict(InputHash);
filter.lastDict = new PDict(InputHash); filter.lastDict = new PDict(InputHash);
@ -240,22 +254,11 @@ bool InputMgr::AddFilter(EnumVal *id, RecordVal* fval) {
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
if ( valfields > 1 ) { if ( valfields > 1 ) {
assert(info->want_record); assert(filter.want_record);
} }
i->filters[id->InternalInt()] = filter; i->filters[id->InternalInt()] = filter;
i->reader->AddFilter( id->InternalInt(), fieldsV.size(), fields );
// ok, now we have to alert the reader of our new filter with our funky new fields
// the id is handled in a ... well, to be honest, a little bit sneaky manner.
// the "problem" is, that we can have several filters in the reader for one filter in the log manager.
// this is due to the fact, that a filter can either output it's result as a table, as an event...
// ...or as an table _and_ an event. And... if we have a table and an event, we actually need two different sets
// of filters in the reader, because the fields for the table and the event may differ and I absolutely do not want
// to build a union of these values and figure it out later.
// hence -> filter id is multiplicated with 2.
// filterId*2 -> results for table
// filterId*2+1 -> results for event
i->AddFilter( id->InternalInt() * 2, fieldsV.size(), idxfields, fields );
return true; return true;
} }
@ -387,31 +390,15 @@ bool InputMgr::RemoveFilter(EnumVal* id, const string &name) {
return false; return false;
} }
/*
std::list<InputMgr::Filter>::iterator it = i->filters.begin();
while ( it != i->filters.end() )
{
if ( (*it).name == name ) {
it = i->filters.erase(it);
return true;
break;
}
else
++it;
}
*/
map<int, InputMgr::Filter>::iterator it = i->filters.find(id->InternalInt()); map<int, InputMgr::Filter>::iterator it = i->filters.find(id->InternalInt());
if ( it == i->filters.end() ) { if ( it == i->filters.end() ) {
return false; return false;
} }
it->filters.erase(it); i->filters.erase(it);
return true; return true;
} }
Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) { Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) {
Val* idxval; Val* idxval;
int position = 0; int position = 0;
@ -449,27 +436,28 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
bool updated = false; bool updated = false;
assert(i->HasFilter(id));
//reporter->Error("Hashing %d index fields", i->num_idx_fields); //reporter->Error("Hashing %d index fields", i->num_idx_fields);
HashKey* idxhash = HashLogVals(i->num_idx_fields, vals); HashKey* idxhash = HashLogVals(i->filters[id].num_idx_fields, vals);
//reporter->Error("Result: %d", (uint64_t) idxhash->Hash()); //reporter->Error("Result: %d", (uint64_t) idxhash->Hash());
//reporter->Error("Hashing %d val fields", i->num_val_fields); //reporter->Error("Hashing %d val fields", i->num_val_fields);
HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields); HashKey* valhash = HashLogVals(i->filters[id].num_val_fields, vals+i->filters[id].num_idx_fields);
//reporter->Error("Result: %d", (uint64_t) valhash->Hash()); //reporter->Error("Result: %d", (uint64_t) valhash->Hash());
//reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash()); //reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash());
InputHash *h = i->lastDict->Lookup(idxhash); InputHash *h = i->filters[id].lastDict->Lookup(idxhash);
if ( h != 0 ) { if ( h != 0 ) {
// seen before // seen before
if ( h->valhash->Hash() == valhash->Hash() ) { if ( h->valhash->Hash() == valhash->Hash() ) {
// ok, double. // ok, double.
i->lastDict->Remove(idxhash); i->filters[id].lastDict->Remove(idxhash);
i->currDict->Insert(idxhash, h); i->filters[id].currDict->Insert(idxhash, h);
return; return;
} else { } else {
// updated // updated
i->lastDict->Remove(idxhash); i->filters[id].lastDict->Remove(idxhash);
delete(h); delete(h);
updated = true; updated = true;
@ -477,27 +465,22 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
} }
Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals);
Val* valval; Val* valval;
int position = i->num_idx_fields; int position = i->filters[id].num_idx_fields;
if ( i->num_val_fields == 1 && !i->want_record ) { if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) {
valval = LogValToVal(vals[i->num_idx_fields], i->rtype->FieldType(i->num_idx_fields)); valval = LogValToVal(vals[i->filters[id].num_idx_fields], i->filters[id].rtype->FieldType(i->filters[id].num_idx_fields));
} else { } else {
RecordVal * r = new RecordVal(i->rtype); RecordVal * r = new RecordVal(i->filters[id].rtype);
/* if ( i->rtype->NumFields() != (int) i->num_val_fields ) { for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) {
reporter->InternalError("Type mismatch");
return;
} */
for ( int j = 0; j < i->rtype->NumFields(); j++) {
Val* val = 0; Val* val = 0;
if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) { if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position); val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position);
} else { } else {
val = LogValToVal(vals[position], i->rtype->FieldType(j)); val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j));
position++; position++;
} }
@ -516,17 +499,12 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
Val* oldval = 0; Val* oldval = 0;
if ( updated == true ) { if ( updated == true ) {
// in that case, we need the old value to send the event (if we send an event). // in that case, we need the old value to send the event (if we send an event).
oldval = i->tab->Lookup(idxval); oldval = i->filters[id].tab->Lookup(idxval);
} }
// call filters first do determine if we really add / change the entry // call filter first to determine if we really add / change the entry
std::list<InputMgr::Filter>::iterator it = i->filters.begin(); if ( i->filters[id].pred ) {
while ( it != i->filters.end() ) {
if (! (*it).pred ) {
continue;
}
EnumVal* ev; EnumVal* ev;
Ref(idxval); Ref(idxval);
Ref(valval); Ref(valval);
@ -541,44 +519,45 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
vl.append(ev); vl.append(ev);
vl.append(idxval); vl.append(idxval);
vl.append(valval); vl.append(valval);
Val* v = (*it).pred->Call(&vl); Val* v = i->filters[id].pred->Call(&vl);
bool result = v->AsBool(); bool result = v->AsBool();
Unref(v); Unref(v);
if ( result == false ) { if ( result == false ) {
if ( !updated ) { if ( !updated ) {
// throw away. Hence - we quit. And remove the entry from the current dictionary... // throw away. Hence - we quit. And remove the entry from the current dictionary...
delete(i->currDict->RemoveEntry(idxhash)); delete(i->filters[id].currDict->RemoveEntry(idxhash));
return; return;
} else { } else {
// keep old one // keep old one
i->currDict->Insert(idxhash, h); i->filters[id].currDict->Insert(idxhash, h);
return; return;
} }
} }
++it;
} }
//i->tab->Assign(idxval, valval); //i->tab->Assign(idxval, valval);
HashKey* k = i->tab->ComputeHash(idxval); HashKey* k = i->filters[id].tab->ComputeHash(idxval);
if ( !k ) { if ( !k ) {
reporter->InternalError("could not hash"); reporter->InternalError("could not hash");
return; return;
} }
i->tab->Assign(idxval, k, valval); i->filters[id].tab->Assign(idxval, k, valval);
InputHash* ih = new InputHash(); InputHash* ih = new InputHash();
k = i->tab->ComputeHash(idxval); k = i->filters[id].tab->ComputeHash(idxval);
ih->idxkey = k; ih->idxkey = k;
ih->valhash = valhash; ih->valhash = valhash;
//i->tab->Delete(k); //i->tab->Delete(k);
i->currDict->Insert(idxhash, ih); i->filters[id].currDict->Insert(idxhash, ih);
// send events now that we are kind of finished. // send events now that we are kind of finished.
/* FIXME: fix me.
std::list<string>::iterator filter_iterator = i->events.begin(); std::list<string>::iterator filter_iterator = i->events.begin();
while ( filter_iterator != i->events.end() ) { while ( filter_iterator != i->events.end() ) {
EnumVal* ev; EnumVal* ev;
@ -597,7 +576,7 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
++filter_iterator; ++filter_iterator;
} } */
} }
@ -607,86 +586,74 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
reporter->InternalError("Unknown reader"); reporter->InternalError("Unknown reader");
return; return;
} }
assert(i->HasFilter(id));
// lastdict contains all deleted entries and should be empty apart from that // lastdict contains all deleted entries and should be empty apart from that
IterCookie *c = i->lastDict->InitForIteration(); IterCookie *c = i->filters[id].lastDict->InitForIteration();
i->lastDict->MakeRobustCookie(c); i->filters[id].lastDict->MakeRobustCookie(c);
InputHash* ih; InputHash* ih;
HashKey *lastDictIdxKey; HashKey *lastDictIdxKey;
//while ( ( ih = i->lastDict->NextEntry(c) ) ) { //while ( ( ih = i->lastDict->NextEntry(c) ) ) {
while ( ( ih = i->lastDict->NextEntry(lastDictIdxKey, c) ) ) { while ( ( ih = i->filters[id].lastDict->NextEntry(lastDictIdxKey, c) ) ) {
if ( i->events.size() != 0 || i->filters.size() != 0 ) // we have a filter or an event
{
ListVal *idx = i->tab->RecoverIndex(ih->idxkey); if ( i->filters[id].pred ) {
ListVal *idx = i->filters[id].tab->RecoverIndex(ih->idxkey);
assert(idx != 0); assert(idx != 0);
Val *val = i->tab->Lookup(idx); Val *val = i->filters[id].tab->Lookup(idx);
assert(val != 0); assert(val != 0);
{ bool doBreak = false;
bool doBreak = false; // ask predicate, if we want to expire this element...
// ask filter, if we want to expire this element...
std::list<InputMgr::Filter>::iterator it = i->filters.begin();
while ( it != i->filters.end() ) {
if (! (*it).pred ) {
continue;
}
EnumVal* ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal* ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
Ref(idx); Ref(idx);
Ref(val); Ref(val);
val_list vl(3); val_list vl(3);
vl.append(ev); vl.append(ev);
vl.append(idx); vl.append(idx);
vl.append(val); vl.append(val);
Val* v = (*it).pred->Call(&vl); Val* v = i->filters[id].pred->Call(&vl);
bool result = v->AsBool(); bool result = v->AsBool();
Unref(v); Unref(v);
++it; if ( result == false ) {
// Keep it. Hence - we quit and simply go to the next entry of lastDict
if ( result == false ) { // ah well - and we have to add the entry to currDict...
// Keep it. Hence - we quit and simply go to the next entry of lastDict i->filters[id].currDict->Insert(lastDictIdxKey, i->filters[id].lastDict->RemoveEntry(lastDictIdxKey));
// ah well - and we have to add the entry to currDict... continue;
i->currDict->Insert(lastDictIdxKey, i->lastDict->RemoveEntry(lastDictIdxKey));
doBreak = true;
continue;
}
}
if ( doBreak ) {
continue;
}
} }
// //
{ {
std::list<string>::iterator it = i->events.begin(); /* FIXME: events
while ( it != i->events.end() ) { std::list<string>::iterator it = i->filters[id].events.begin();
while ( it != i->filters[id].events.end() ) {
Ref(idx); Ref(idx);
Ref(val); Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(*it, ev, idx, val); SendEvent(*it, ev, idx, val);
++it; ++it;
} }
*/
} }
} }
i->tab->Delete(ih->idxkey); i->filters[id].tab->Delete(ih->idxkey);
i->lastDict->Remove(lastDictIdxKey); // deletex in next line i->filters[id].lastDict->Remove(lastDictIdxKey); // deletex in next line
delete(ih); delete(ih);
} }
i->lastDict->Clear(); // should be empty... but... well... who knows... i->filters[id].lastDict->Clear(); // should be empty... but... well... who knows...
delete(i->lastDict); delete(i->filters[id].lastDict);
i->lastDict = i->currDict; i->filters[id].lastDict = i->filters[id].currDict;
i->currDict = new PDict(InputHash); i->filters[id].currDict = new PDict(InputHash);
} }
void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) { void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) {
@ -696,22 +663,24 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals)
return; return;
} }
Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); assert(i->HasFilter(id));
Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals);
Val* valval; Val* valval;
int position = i->num_idx_fields; int position = i->filters[id].num_idx_fields;
if ( i->num_val_fields == 1 && !i->want_record ) { if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) {
valval = LogValToVal(vals[i->num_idx_fields], i->rtype->FieldType(i->num_idx_fields)); valval = LogValToVal(vals[i->filters[id].num_idx_fields], i->filters[id].rtype->FieldType(i->filters[id].num_idx_fields));
} else { } else {
RecordVal * r = new RecordVal(i->rtype); RecordVal * r = new RecordVal(i->filters[id].rtype);
for ( int j = 0; j < i->rtype->NumFields(); j++) { for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) {
Val* val = 0; Val* val = 0;
if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) { if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position); val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position);
} else { } else {
val = LogValToVal(vals[position], i->rtype->FieldType(j)); val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j));
position++; position++;
} }
@ -726,7 +695,7 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals)
valval = r; valval = r;
} }
i->tab->Assign(idxval, valval); i->filters[id].tab->Assign(idxval, valval);
} }
void InputMgr::Clear(const InputReader* reader, int id) { void InputMgr::Clear(const InputReader* reader, int id) {
@ -735,20 +704,24 @@ void InputMgr::Clear(const InputReader* reader, int id) {
reporter->InternalError("Unknown reader"); reporter->InternalError("Unknown reader");
return; return;
} }
i->tab->RemoveAll(); assert(i->HasFilter(id));
i->filters[id].tab->RemoveAll();
} }
bool InputMgr::Delete(const InputReader* reader, const LogVal* const *vals) { bool InputMgr::Delete(const InputReader* reader, int id, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader); ReaderInfo *i = FindReader(reader);
if ( i == 0 ) { if ( i == 0 ) {
reporter->InternalError("Unknown reader"); reporter->InternalError("Unknown reader");
return false; return false;
} }
Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals);
return ( i->tab->Delete(idxval) != 0 ); assert(i->HasFilter(id));
Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals);
return ( i->filters[id].tab->Delete(idxval) != 0 );
} }
void InputMgr::Error(InputReader* reader, const char* msg) void InputMgr::Error(InputReader* reader, const char* msg)

View file

@ -35,7 +35,7 @@ protected:
void Error(InputReader* reader, const char* msg); void Error(InputReader* reader, const char* msg);
// for readers to write to input stream in direct mode (reporting new/deleted values directly) // for readers to write to input stream in direct mode (reporting new/deleted values directly)
void Put(const InputReader* reader, int id. const LogVal* const *vals); void Put(const InputReader* reader, int id, const LogVal* const *vals);
void Clear(const InputReader* reader, int id); void Clear(const InputReader* reader, int id);
bool Delete(const InputReader* reader, int id, const LogVal* const *vals); bool Delete(const InputReader* reader, int id, const LogVal* const *vals);

View file

@ -26,17 +26,17 @@ void InputReader::Error(const string &msg)
void InputReader::Put(int id, const LogVal* const *val) void InputReader::Put(int id, const LogVal* const *val)
{ {
input_mgr->Put(this, int id, val); input_mgr->Put(this, id, val);
} }
void InputReader::Clear(int id) void InputReader::Clear(int id)
{ {
input_mgr->Clear(this, int id); input_mgr->Clear(this, id);
} }
void InputReader::Delete(int id, const LogVal* const *val) void InputReader::Delete(int id, const LogVal* const *val)
{ {
input_mgr->Delete(this, int id, val); input_mgr->Delete(this, id, val);
} }
@ -52,12 +52,12 @@ bool InputReader::Init(string arg_source)
bool InputReader::AddFilter(int id, int arg_num_fields, bool InputReader::AddFilter(int id, int arg_num_fields,
const LogField* const * arg_fields) const LogField* const * arg_fields)
{ {
return DoAddFilter(int id, arg_num_fields, arg_fields); return DoAddFilter(id, arg_num_fields, arg_fields);
} }
bool InputReader::RemoveFilter(int id) bool InputReader::RemoveFilter(int id)
{ {
return DoRemoveFilter(int id); return DoRemoveFilter(id);
} }
void InputReader::Finish() void InputReader::Finish()
@ -105,10 +105,10 @@ const char* InputReader::Fmt(const char* format, ...)
void InputReader::SendEntry(int id, const LogVal* const *vals) void InputReader::SendEntry(int id, const LogVal* const *vals)
{ {
input_mgr->SendEntry(this, int id, vals); input_mgr->SendEntry(this, id, vals);
} }
void InputReader::EndCurrentSend(int id) void InputReader::EndCurrentSend(int id)
{ {
input_mgr->EndCurrentSend(this, int id); input_mgr->EndCurrentSend(this, id);
} }

View file

@ -30,7 +30,8 @@ protected:
virtual bool DoInit(string arg_sources) = 0; virtual bool DoInit(string arg_sources) = 0;
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0; virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0;
virtual bool DoRemoveFilter( int id );
virtual bool DoRemoveFilter( int id ) = 0;
virtual void DoFinish() = 0; virtual void DoFinish() = 0;

View file

@ -28,6 +28,7 @@ FieldMapping FieldMapping::subType() {
return FieldMapping(name, subtype, position); return FieldMapping(name, subtype, position);
} }
InputReaderAscii::InputReaderAscii() InputReaderAscii::InputReaderAscii()
{ {
file = 0; file = 0;
@ -58,7 +59,7 @@ InputReaderAscii::~InputReaderAscii()
void InputReaderAscii::DoFinish() void InputReaderAscii::DoFinish()
{ {
columnMap.empty(); filters.empty();
if ( file != 0 ) { if ( file != 0 ) {
file->close(); file->close();
delete(file); delete(file);
@ -66,7 +67,7 @@ void InputReaderAscii::DoFinish()
} }
} }
bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const LogField* const * fields) bool InputReaderAscii::DoInit(string path)
{ {
fname = path; fname = path;
@ -76,11 +77,39 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
return false; return false;
} }
return true;
}
this->num_fields = num_fields; bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) {
this->idx_fields = idx_fields; if ( HasFilter(id) ) {
this->fields = fields; return false; // no, we don't want to add this a second time
}
Filter f;
f.num_fields = arg_num_fields;
f.fields = fields;
filters[id] = f;
return true;
}
bool InputReaderAscii::DoRemoveFilter ( int id ) {
if (!HasFilter(id) ) {
return false;
}
assert ( filters.erase(id) == 1 );
return true;
}
bool InputReaderAscii::HasFilter(int id) {
map<int, Filter>::iterator it = filters.find(id);
if ( it == filters.end() ) {
return false;
}
return true; return true;
} }
@ -93,46 +122,47 @@ bool InputReaderAscii::ReadHeader() {
return false; return false;
} }
// split on tabs... for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
istringstream splitstream(line); // split on tabs...
unsigned int currTab = 0; istringstream splitstream(line);
int wantFields = 0; unsigned int currTab = 0;
while ( splitstream ) { int wantFields = 0;
string s; while ( splitstream ) {
if ( !getline(splitstream, s, separator[0])) string s;
break; if ( !getline(splitstream, s, separator[0]))
break;
// current found heading in s... compare if we want it
for ( unsigned int i = 0; i < num_fields; i++ ) { // current found heading in s... compare if we want it
const LogField* field = fields[i]; for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
if ( field->name == s ) { const LogField* field = (*it).second.fields[i];
// cool, found field. note position if ( field->name == s ) {
FieldMapping f(field->name, field->type, field->subtype, i); // cool, found field. note position
columnMap.push_back(f); FieldMapping f(field->name, field->type, field->subtype, i);
wantFields++; (*it).second.columnMap.push_back(f);
break; // done with searching wantFields++;
break; // done with searching
}
} }
// look if we did push something...
if ( (*it).second.columnMap.size() == currTab ) {
// no, we didn't. note that...
FieldMapping empty;
(*it).second.columnMap.push_back(empty);
}
// done
currTab++;
}
if ( wantFields != (int) (*it).second.num_fields ) {
// we did not find all fields?
// :(
Error(Fmt("One of the requested fields could not be found in the input data file. Found %d fields, wanted %d. Filternum: %d", wantFields, (*it).second.num_fields, (*it).first));
return false;
} }
// look if we did push something...
if ( columnMap.size() == currTab ) {
// no, we didn't. note that...
FieldMapping empty;
columnMap.push_back(empty);
}
// done
currTab++;
}
if ( wantFields != (int) num_fields ) {
// we did not find all fields?
// :(
Error(Fmt("One of the requested fields could not be found in the input data file. Found %d fields, wanted %d", wantFields, num_fields));
return false;
} }
// well, that seems to have worked... // well, that seems to have worked...
return true; return true;
} }
@ -314,110 +344,77 @@ bool InputReaderAscii::DoUpdate() {
return false; return false;
} }
// TODO: all the stuff we need for a second reading.
// *cough*
//
//
// new keymap
//map<string, string> *newKeyMap = new map<string, string>();
string line; string line;
while ( GetLine(line ) ) { while ( GetLine(line ) ) {
// split on tabs
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
istringstream splitstream(line); // split on tabs
LogVal** fields = new LogVal*[num_fields];
//string string_fields[num_fields];
unsigned int currTab = 0;
unsigned int currField = 0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]) )
break;
if ( currTab >= columnMap.size() ) { istringstream splitstream(line);
Error("Tabs in heading do not match tabs in data?");
//disabled = true;
return false;
}
FieldMapping currMapping = columnMap[currTab];
currTab++;
if ( currMapping.IsEmpty() ) {
// well, that was easy
continue;
}
if ( currField >= num_fields ) {
Error("internal error - fieldnum greater as possible");
return false;
}
LogVal* val = EntryToVal(s, currMapping);
if ( val == 0 ) {
return false;
}
fields[currMapping.position] = val;
//string_fields[currMapping.position] = s;
currField++;
}
if ( currField != num_fields ) {
Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition.");
return false;
}
SendEntry(fields);
/*
string indexstring = "";
string valstring = "";
for ( unsigned int i = 0; i < idx_fields; i++ ) {
indexstring.append(string_fields[i]);
}
for ( unsigned int i = idx_fields; i < num_fields; i++ ) {
valstring.append(string_fields[i]);
}
string valhash = Hash(valstring);
string indexhash = Hash(indexstring);
if ( keyMap->find(indexhash) == keyMap->end() ) {
// new key
Put(fields);
} else if ( (*keyMap)[indexhash] != valhash ) {
// changed key
Put(fields);
keyMap->erase(indexhash);
} else {
// field not changed
keyMap->erase(indexhash);
}
(*newKeyMap)[indexhash] = valhash;
*/
for ( unsigned int i = 0; i < num_fields; i++ ) { LogVal** fields = new LogVal*[(*it).second.num_fields];
delete fields[i]; //string string_fields[num_fields];
unsigned int currTab = 0;
unsigned int currField = 0;
while ( splitstream ) {
string s;
if ( !getline(splitstream, s, separator[0]) )
break;
if ( currTab >= (*it).second.columnMap.size() ) {
Error("Tabs in heading do not match tabs in data?");
//disabled = true;
return false;
}
FieldMapping currMapping = (*it).second.columnMap[currTab];
currTab++;
if ( currMapping.IsEmpty() ) {
// well, that was easy
continue;
}
if ( currField >= (*it).second.num_fields ) {
Error("internal error - fieldnum greater as possible");
return false;
}
LogVal* val = EntryToVal(s, currMapping);
if ( val == 0 ) {
return false;
}
fields[currMapping.position] = val;
//string_fields[currMapping.position] = s;
currField++;
}
if ( currField != (*it).second.num_fields ) {
Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition.");
return false;
}
SendEntry((*it).first, fields);
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
delete fields[i];
}
delete [] fields;
} }
delete [] fields;
} }
//file->clear(); // remove end of file evil bits //file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start. //file->seekg(0, ios::beg); // and seek to start.
EndCurrentSend(); for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
EndCurrentSend((*it).first);
}
return true; return true;
} }

View file

@ -34,13 +34,30 @@ public:
protected: protected:
virtual bool DoInit(string path, int arg_num_fields, int arg_idx_fields, virtual bool DoInit(string path);
const LogField* const * fields);
virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields );
virtual bool DoRemoveFilter ( int id );
virtual void DoFinish(); virtual void DoFinish();
virtual bool DoUpdate(); virtual bool DoUpdate();
private: private:
struct Filter {
unsigned int num_fields;
const LogField* const * fields; // raw mapping
// map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap;
};
bool HasFilter(int id);
bool ReadHeader(); bool ReadHeader();
LogVal* EntryToVal(string s, FieldMapping type); LogVal* EntryToVal(string s, FieldMapping type);
@ -49,15 +66,8 @@ private:
ifstream* file; ifstream* file;
string fname; string fname;
unsigned int num_fields; map<int, Filter> filters;
unsigned int idx_fields;
// map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap;
const LogField* const * fields; // raw mapping
//map<string, string> *keyMap;
//
// Options set from the script-level. // Options set from the script-level.
string separator; string separator;