make filters pointers (for inheritance)

This commit is contained in:
Bernhard Amann 2011-11-22 11:39:27 -08:00
parent f82bf3f35f
commit 3c40f00a53
7 changed files with 507 additions and 160 deletions

View file

@ -23,10 +23,19 @@ export {
## decision function, that decides if an insertion, update or removal should really be executed. ## decision function, that decides if an insertion, update or removal should really be executed.
## or events should be thought ## or events should be thought
pred: function(typ: Input::Event, left: any, right: any): bool &optional; pred: function(typ: Input::Event, left: any, right: any): bool &optional;
};
## for "normalized" events type EventFilter: record {
# ev: any &optional; ## descriptive name. for later removal
# ev_description: any &optional; name: string;
# the event
ev: any;
# record describing the fields
fields: any;
# does the event want the field unrolled (default) or as a simple record value?
want_record: bool &default=F;
}; };
#const no_filter: Filter = [$name="<not found>", $idx="", $val="", $destination=""]; # Sentinel. #const no_filter: Filter = [$name="<not found>", $idx="", $val="", $destination=""]; # Sentinel.
@ -36,6 +45,8 @@ export {
global force_update: function(id: Log::ID) : bool; global force_update: function(id: Log::ID) : bool;
global add_tablefilter: function(id: Log::ID, filter: Input::TableFilter) : bool; global add_tablefilter: function(id: Log::ID, filter: Input::TableFilter) : bool;
global remove_tablefilter: function(id: Log::ID, name: string) : bool; global remove_tablefilter: function(id: Log::ID, name: string) : bool;
global add_eventfilter: function(id: Log::ID, filter: Input::EventFilter) : bool;
global remove_eventfilter: function(id: Log::ID, name: string) : bool;
#global get_filter: function(id: ID, name: string) : Filter; #global get_filter: function(id: ID, name: string) : Filter;
} }
@ -74,6 +85,18 @@ function remove_tablefilter(id: Log::ID, name: string) : bool
return __remove_tablefilter(id, name); return __remove_tablefilter(id, name);
} }
function add_eventfilter(id: Log::ID, filter: Input::EventFilter) : bool
{
# filters[id, filter$name] = filter;
return __add_eventfilter(id, filter);
}
function remove_eventfilter(id: Log::ID, name: string) : bool
{
# delete filters[id, name];
return __remove_eventfilter(id, name);
}
#function get_filter(id: ID, name: string) : Filter #function get_filter(id: ID, name: string) : Filter
# { # {
# if ( [id, name] in filters ) # if ( [id, name] in filters )

View file

@ -28,7 +28,13 @@ public:
EnumVal* id; EnumVal* id;
string name; string name;
//int filter_type; // to distinguish between event and table filters FilterType filter_type; // to distinguish between event and table filters
virtual ~Filter();
};
class InputMgr::TableFilter: public InputMgr::Filter {
public:
unsigned int num_idx_fields; unsigned int num_idx_fields;
unsigned int num_val_fields; unsigned int num_val_fields;
@ -45,61 +51,42 @@ public:
Func* pred; Func* pred;
EventHandlerPtr event; EventHandlerPtr event;
RecordType* event_type;
// ~Filter(); TableFilter();
// Filter(); ~TableFilter();
// Filter(const InputMgr::Filter& filter);
void DoCleanup();
}; };
/* class InputMgr::EventFilter: public InputMgr::Filter {
InputMgr::Filter::Filter() { public:
EventHandlerPtr event;
RecordType* fields;
unsigned int num_fields;
bool want_record;
EventFilter();
};
InputMgr::TableFilter::TableFilter() {
filter_type = TABLE_FILTER;
tab = 0; tab = 0;
itype = 0; itype = 0;
rtype = 0; rtype = 0;
event_type = 0;
} }
InputMgr::Filter::Filter(const InputMgr::Filter& f) { InputMgr::EventFilter::EventFilter() {
id = f.id; filter_type = EVENT_FILTER;
id->Ref(); }
tab = f.tab; InputMgr::Filter::~Filter() {
if ( tab )
tab->Ref();
itype = f.itype;
if ( itype )
itype->Ref();
rtype = f.rtype;
if ( rtype )
Ref(rtype);
event_type = f.event_type;
if ( event_type )
Ref(event_type);
name = f.name;
num_idx_fields = f.num_idx_fields;
num_val_fields = f.num_val_fields;
want_record = f.want_record;
} */
void InputMgr::Filter::DoCleanup() {
Unref(id); Unref(id);
if ( tab ) }
Unref(tab);
if ( itype ) InputMgr::TableFilter::~TableFilter() {
Unref(itype); Unref(tab);
if ( rtype ) Unref(itype);
Unref(rtype); Unref(rtype);
if ( event_type)
Unref(event_type);
delete currDict; delete currDict;
delete lastDict; delete lastDict;
@ -111,7 +98,7 @@ struct InputMgr::ReaderInfo {
InputReader* reader; InputReader* reader;
//list<string> events; // events we fire when "something" happens //list<string> events; // events we fire when "something" happens
map<int, InputMgr::Filter> filters; // filters that can prevent our actions map<int, InputMgr::Filter*> filters; // filters that can prevent our actions
bool HasFilter(int id); bool HasFilter(int id);
@ -119,7 +106,11 @@ struct InputMgr::ReaderInfo {
}; };
InputMgr::ReaderInfo::~ReaderInfo() { InputMgr::ReaderInfo::~ReaderInfo() {
// all the contents of filters should delete themselves automatically... map<int, InputMgr::Filter*>::iterator it = filters.begin();
while ( it != filters.end() ) {
delete (*it).second;
}
Unref(type); Unref(type);
Unref(id); Unref(id);
@ -128,7 +119,7 @@ InputMgr::ReaderInfo::~ReaderInfo() {
} }
bool InputMgr::ReaderInfo::HasFilter(int id) { bool InputMgr::ReaderInfo::HasFilter(int id) {
map<int, InputMgr::Filter>::iterator it = filters.find(id); map<int, InputMgr::Filter*>::iterator it = filters.find(id);
if ( it == filters.end() ) { if ( it == filters.end() ) {
return false; return false;
} }
@ -236,6 +227,114 @@ InputReader* InputMgr::CreateStream(EnumVal* id, RecordVal* description)
} }
bool InputMgr::AddEventFilter(EnumVal *id, RecordVal* fval) {
ReaderInfo *i = FindReader(id);
if ( i == 0 ) {
reporter->Error("Stream not found");
return false;
}
RecordType* rtype = fval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Input::EventFilter, 0) )
{
reporter->Error("filter argument not of right type");
return false;
}
Val* name = fval->Lookup(rtype->FieldOffset("name"));
RecordType *fields = fval->Lookup(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType();
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
Val* event_val = fval->Lookup(rtype->FieldOffset("ev"));
Func* event = event_val->AsFunc();
{
FuncType* etype = event->FType()->AsFuncType();
if ( ! etype->IsEvent() ) {
reporter->Error("stream event is a function, not an event");
return false;
}
const type_list* args = etype->ArgTypes()->Types();
if ( args->length() < 2 ) {
reporter->Error("event takes not enough arguments");
return false;
}
if ( ! same_type((*args)[0], BifType::Enum::Input::Event, 0) )
{
reporter->Error("events first attribute must be of type Input::Event");
return false;
}
if ( want_record->InternalInt() == 0 ) {
if ( args->length() != fields->NumFields() + 1 ) {
reporter->Error("events has wrong number of arguments");
return false;
}
for ( int i = 0; i < fields->NumFields(); i++ ) {
if ( !same_type((*args)[i+1], fields->FieldType(i) ) ) {
reporter->Error("Incompatible type for event");
return false;
}
}
} else if ( want_record->InternalInt() == 1 ) {
if ( args->length() != 2 ) {
reporter->Error("events has wrong number of arguments");
return false;
}
if ( !same_type((*args)[1], fields ) ) {
reporter->Error("Incompatible type for event");
return false;
}
} else {
assert(false);
}
}
vector<LogField*> fieldsV; // vector, because UnrollRecordType needs it
bool status = !UnrollRecordType(&fieldsV, fields, "");
if ( status ) {
reporter->Error("Problem unrolling");
return false;
}
LogField** logf = new LogField*[fieldsV.size()];
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
logf[i] = fieldsV[i];
}
EventFilter* filter = new EventFilter();
filter->name = name->AsString()->CheckString();
filter->id = id->Ref()->AsEnumVal();
filter->num_fields = fieldsV.size();
filter->fields = fields->Ref()->AsRecordType();
filter->event = event_registry->Lookup(event->GetID()->Name());
filter->want_record = ( want_record->InternalInt() == 1 );
Unref(want_record); // ref'd by lookupwithdefault
int filterid = 0;
if ( i->filters.size() > 0 ) {
filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1->
}
i->filters[filterid] = filter;
i->reader->AddFilter( filterid, fieldsV.size(), logf );
return true;
}
bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) { bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
ReaderInfo *i = FindReader(id); ReaderInfo *i = FindReader(id);
if ( i == 0 ) { if ( i == 0 ) {
@ -299,6 +398,7 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
reporter->Error("table events value attribute does not match"); reporter->Error("table events value attribute does not match");
return false; return false;
} }
assert(want_record->InternalInt() == 1 || want_record->InternalInt() == 0);
} }
@ -322,28 +422,28 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
fields[i] = fieldsV[i]; fields[i] = fieldsV[i];
} }
Filter filter; TableFilter* filter = new TableFilter();
filter.name = name->AsString()->CheckString(); filter->name = name->AsString()->CheckString();
filter.id = id->Ref()->AsEnumVal(); filter->id = id->Ref()->AsEnumVal();
filter.pred = pred ? pred->AsFunc() : 0; filter->pred = pred ? pred->AsFunc() : 0;
filter.num_idx_fields = idxfields; filter->num_idx_fields = idxfields;
filter.num_val_fields = valfields; filter->num_val_fields = valfields;
filter.tab = dst->Ref()->AsTableVal(); filter->tab = dst->Ref()->AsTableVal();
filter.rtype = val->Ref()->AsRecordType(); filter->rtype = val->Ref()->AsRecordType();
filter.itype = idx->Ref()->AsRecordType(); filter->itype = idx->Ref()->AsRecordType();
filter.event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
filter.currDict = new PDict(InputHash); filter->currDict = new PDict(InputHash);
filter.lastDict = new PDict(InputHash); filter->lastDict = new PDict(InputHash);
filter.want_record = ( want_record->InternalInt() == 1 ); filter->want_record = ( want_record->InternalInt() == 1 );
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
if ( valfields > 1 ) { if ( valfields > 1 ) {
assert(filter.want_record); assert(filter->want_record);
} }
int filterid = 0; int filterid = 0;
if ( i->filters.size() > 0 ) { if ( i->filters.size() > 0 ) {
filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map. new id = old id + 1. filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1->
} }
i->filters[filterid] = filter; i->filters[filterid] = filter;
i->reader->AddFilter( filterid, fieldsV.size(), fields ); i->reader->AddFilter( filterid, fieldsV.size(), fields );
@ -478,13 +578,39 @@ bool InputMgr::RemoveTableFilter(EnumVal* id, const string &name) {
return false; return false;
} }
map<int, InputMgr::Filter>::iterator it = i->filters.find(id->InternalInt()); map<int, InputMgr::Filter*>::iterator it = i->filters.find(id->InternalInt());
if ( it == i->filters.end() ) { if ( it == i->filters.end() ) {
return false; return false;
} }
i->filters[id->InternalInt()].DoCleanup(); if ( i->filters[id->InternalInt()]->filter_type != TABLE_FILTER ) {
// wrong type;
return false;
}
delete (*it).second;
i->filters.erase(it);
return true;
}
bool InputMgr::RemoveEventFilter(EnumVal* id, const string &name) {
ReaderInfo *i = FindReader(id);
if ( i == 0 ) {
reporter->Error("Reader not found");
return false;
}
map<int, InputMgr::Filter*>::iterator it = i->filters.find(id->InternalInt());
if ( it == i->filters.end() ) {
return false;
}
if ( i->filters[id->InternalInt()]->filter_type != EVENT_FILTER ) {
// wrong type;
return false;
}
delete (*it).second;
i->filters.erase(it); i->filters.erase(it);
return true; return true;
} }
@ -524,30 +650,53 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
return; return;
} }
if ( !i->HasFilter(id) ) {
reporter->InternalError("Unknown filter");
return;
}
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
SendEntryTable(reader, id, vals);
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
SendEventFilterEvent(reader, type, id, vals);
} else {
assert(false);
}
}
void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader);
bool updated = false; bool updated = false;
assert(i);
assert(i->HasFilter(id)); assert(i->HasFilter(id));
assert(i->filters[id]->filter_type == TABLE_FILTER);
TableFilter* filter = (TableFilter*) i->filters[id];
//reporter->Error("Hashing %d index fields", i->num_idx_fields); //reporter->Error("Hashing %d index fields", i->num_idx_fields);
HashKey* idxhash = HashLogVals(i->filters[id].num_idx_fields, vals); HashKey* idxhash = HashLogVals(filter->num_idx_fields, vals);
//reporter->Error("Result: %d", (uint64_t) idxhash->Hash()); //reporter->Error("Result: %d", (uint64_t) idxhash->Hash());
//reporter->Error("Hashing %d val fields", i->num_val_fields); //reporter->Error("Hashing %d val fields", i->num_val_fields);
HashKey* valhash = HashLogVals(i->filters[id].num_val_fields, vals+i->filters[id].num_idx_fields); HashKey* valhash = HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields);
//reporter->Error("Result: %d", (uint64_t) valhash->Hash()); //reporter->Error("Result: %d", (uint64_t) valhash->Hash());
//reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash()); //reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash());
InputHash *h = i->filters[id].lastDict->Lookup(idxhash); InputHash *h = filter->lastDict->Lookup(idxhash);
if ( h != 0 ) { if ( h != 0 ) {
// seen before // seen before
if ( h->valhash->Hash() == valhash->Hash() ) { if ( h->valhash->Hash() == valhash->Hash() ) {
// ok, double. // ok, double.
i->filters[id].lastDict->Remove(idxhash); filter->lastDict->Remove(idxhash);
i->filters[id].currDict->Insert(idxhash, h); filter->currDict->Insert(idxhash, h);
return; return;
} else { } else {
// updated // updated
i->filters[id].lastDict->Remove(idxhash); filter->lastDict->Remove(idxhash);
delete(h); delete(h);
updated = true; updated = true;
@ -555,30 +704,25 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
} }
Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
Val* valval; Val* valval;
int position = i->filters[id].num_idx_fields; int position = filter->num_idx_fields;
if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) { if ( filter->num_val_fields == 1 && !filter->want_record ) {
valval = LogValToVal(vals[position], i->filters[id].rtype->FieldType(0)); valval = LogValToVal(vals[position], filter->rtype->FieldType(0));
} else { } else {
RecordVal * r = new RecordVal(i->filters[id].rtype); RecordVal * r = new RecordVal(filter->rtype);
for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) { for ( int j = 0; j < filter->rtype->NumFields(); j++) {
Val* val = 0; Val* val = 0;
if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) { if ( filter->rtype->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position); val = LogValToRecordVal(vals, filter->rtype->FieldType(j)->AsRecordType(), &position);
} else { } else {
val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j)); val = LogValToVal(vals[position], filter->rtype->FieldType(j));
position++; position++;
} }
/* if ( val == 0 ) {
reporter->InternalError("conversion error");
return;
} */
r->Assign(j,val); r->Assign(j,val);
} }
@ -589,12 +733,12 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
Val* oldval = 0; Val* oldval = 0;
if ( updated == true ) { if ( updated == true ) {
// in that case, we need the old value to send the event (if we send an event). // in that case, we need the old value to send the event (if we send an event).
oldval = i->filters[id].tab->Lookup(idxval); oldval = filter->tab->Lookup(idxval);
} }
// call filter first to determine if we really add / change the entry // call filter first to determine if we really add / change the entry
if ( i->filters[id].pred ) { if ( filter->pred ) {
EnumVal* ev; EnumVal* ev;
Ref(idxval); Ref(idxval);
Ref(valval); Ref(valval);
@ -609,18 +753,18 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
vl.append(ev); vl.append(ev);
vl.append(idxval); vl.append(idxval);
vl.append(valval); vl.append(valval);
Val* v = i->filters[id].pred->Call(&vl); Val* v = filter->pred->Call(&vl);
bool result = v->AsBool(); bool result = v->AsBool();
Unref(v); Unref(v);
if ( result == false ) { if ( result == false ) {
if ( !updated ) { if ( !updated ) {
// throw away. Hence - we quit. And remove the entry from the current dictionary... // throw away. Hence - we quit. And remove the entry from the current dictionary...
delete(i->filters[id].currDict->RemoveEntry(idxhash)); delete(filter->currDict->RemoveEntry(idxhash));
return; return;
} else { } else {
// keep old one // keep old one
i->filters[id].currDict->Insert(idxhash, h); filter->currDict->Insert(idxhash, h);
return; return;
} }
} }
@ -629,24 +773,23 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
//i->tab->Assign(idxval, valval); //i->tab->Assign(idxval, valval);
HashKey* k = i->filters[id].tab->ComputeHash(idxval); HashKey* k = filter->tab->ComputeHash(idxval);
if ( !k ) { if ( !k ) {
reporter->InternalError("could not hash"); reporter->InternalError("could not hash");
return; return;
} }
reporter->Error("assigning"); filter->tab->Assign(idxval, k, valval);
i->filters[id].tab->Assign(idxval, k, valval);
InputHash* ih = new InputHash(); InputHash* ih = new InputHash();
k = i->filters[id].tab->ComputeHash(idxval); k = filter->tab->ComputeHash(idxval);
ih->idxkey = k; ih->idxkey = k;
ih->valhash = valhash; ih->valhash = valhash;
//i->tab->Delete(k); //i->tab->Delete(k);
i->filters[id].currDict->Insert(idxhash, ih); filter->currDict->Insert(idxhash, ih);
if ( i->filters[id].event ) { if ( filter->event ) {
EnumVal* ev; EnumVal* ev;
Ref(idxval); Ref(idxval);
@ -654,11 +797,11 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 ); assert ( oldval != 0 );
Ref(oldval); Ref(oldval);
SendEvent(i->filters[id].event, ev, idxval, oldval); SendEvent(filter->event, 3, ev, idxval, oldval);
} else { } else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
Ref(valval); Ref(valval);
SendEvent(i->filters[id].event, ev, idxval, valval); SendEvent(filter->event, 3, ev, idxval, valval);
} }
} }
} }
@ -673,25 +816,33 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
assert(i->HasFilter(id)); assert(i->HasFilter(id));
if ( i->filters[id]->filter_type == EVENT_FILTER ) {
// nothing to do..
return;
}
assert(i->filters[id]->filter_type == TABLE_FILTER);
TableFilter* filter = (TableFilter*) i->filters[id];
// lastdict contains all deleted entries and should be empty apart from that // lastdict contains all deleted entries and should be empty apart from that
IterCookie *c = i->filters[id].lastDict->InitForIteration(); IterCookie *c = filter->lastDict->InitForIteration();
i->filters[id].lastDict->MakeRobustCookie(c); filter->lastDict->MakeRobustCookie(c);
InputHash* ih; InputHash* ih;
HashKey *lastDictIdxKey; HashKey *lastDictIdxKey;
//while ( ( ih = i->lastDict->NextEntry(c) ) ) { //while ( ( ih = i->lastDict->NextEntry(c) ) ) {
while ( ( ih = i->filters[id].lastDict->NextEntry(lastDictIdxKey, c) ) ) { while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) {
ListVal * idx; ListVal * idx;
Val *val; Val *val;
if ( i->filters[id].pred || i->filters[id].event ) { if ( filter->pred || filter->event ) {
idx = i->filters[id].tab->RecoverIndex(ih->idxkey); idx = filter->tab->RecoverIndex(ih->idxkey);
assert(idx != 0); assert(idx != 0);
val = i->filters[id].tab->Lookup(idx); val = filter->tab->Lookup(idx);
assert(val != 0); assert(val != 0);
} }
if ( i->filters[id].pred ) { if ( filter->pred ) {
bool doBreak = false; bool doBreak = false;
// ask predicate, if we want to expire this element... // ask predicate, if we want to expire this element...
@ -704,37 +855,37 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
vl.append(ev); vl.append(ev);
vl.append(idx); vl.append(idx);
vl.append(val); vl.append(val);
Val* v = i->filters[id].pred->Call(&vl); Val* v = filter->pred->Call(&vl);
bool result = v->AsBool(); bool result = v->AsBool();
Unref(v); Unref(v);
if ( result == false ) { if ( result == false ) {
// Keep it. Hence - we quit and simply go to the next entry of lastDict // Keep it. Hence - we quit and simply go to the next entry of lastDict
// ah well - and we have to add the entry to currDict... // ah well - and we have to add the entry to currDict...
i->filters[id].currDict->Insert(lastDictIdxKey, i->filters[id].lastDict->RemoveEntry(lastDictIdxKey)); filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey));
continue; continue;
} }
} }
if ( i->filters[id].event ) { if ( filter->event ) {
Ref(idx); Ref(idx);
Ref(val); Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(i->filters[id].event, ev, idx, val); SendEvent(filter->event, 3, ev, idx, val);
} }
i->filters[id].tab->Delete(ih->idxkey); filter->tab->Delete(ih->idxkey);
i->filters[id].lastDict->Remove(lastDictIdxKey); // deletex in next line filter->lastDict->Remove(lastDictIdxKey); // deletex in next line
delete(ih); delete(ih);
} }
i->filters[id].lastDict->Clear(); // should be empty... but... well... who knows... filter->lastDict->Clear(); // should be empty->->-> but->->-> well->->-> who knows->->->
delete(i->filters[id].lastDict); delete(filter->lastDict);
i->filters[id].lastDict = i->filters[id].currDict; filter->lastDict = filter->currDict;
i->filters[id].currDict = new PDict(InputHash); filter->currDict = new PDict(InputHash);
} }
void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) { void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals) {
@ -744,24 +895,86 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals)
return; return;
} }
if ( !i->HasFilter(id) ) {
reporter->InternalError("Unknown filter");
return;
}
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
PutTable(reader, id, vals);
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
SendEventFilterEvent(reader, type, id, vals);
} else {
assert(false);
}
}
void InputMgr::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int id, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader);
bool updated = false;
assert(i);
assert(i->HasFilter(id)); assert(i->HasFilter(id));
Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); assert(i->filters[id]->filter_type == EVENT_FILTER);
EventFilter* filter = (EventFilter*) i->filters[id];
Val *val;
list<Val*> out_vals;
// no tracking, send everything with a new event...
//out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event));
out_vals.push_back(type);
int position = 0;
if ( filter->want_record ) {
RecordVal * r = LogValToRecordVal(vals, filter->fields, &position);
out_vals.push_back(r);
} else {
for ( int j = 0; j < filter->fields->NumFields(); j++) {
Val* val = 0;
if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position);
} else {
val = LogValToVal(vals[position], filter->fields->FieldType(j));
position++;
}
out_vals.push_back(val);
}
}
SendEvent(filter->event, out_vals);
}
void InputMgr::PutTable(const InputReader* reader, int id, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader);
assert(i);
assert(i->HasFilter(id));
assert(i->filters[id]->filter_type == TABLE_FILTER);
TableFilter* filter = (TableFilter*) i->filters[id];
Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
Val* valval; Val* valval;
int position = i->filters[id].num_idx_fields; int position = filter->num_idx_fields;
if ( i->filters[id].num_val_fields == 1 && !i->filters[id].want_record ) { if ( filter->num_val_fields == 1 && !filter->want_record ) {
valval = LogValToVal(vals[i->filters[id].num_idx_fields], i->filters[id].rtype->FieldType(i->filters[id].num_idx_fields)); valval = LogValToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields));
} else { } else {
RecordVal * r = new RecordVal(i->filters[id].rtype); RecordVal * r = new RecordVal(filter->rtype);
for ( int j = 0; j < i->filters[id].rtype->NumFields(); j++) { for ( int j = 0; j < filter->rtype->NumFields(); j++) {
Val* val = 0; Val* val = 0;
if ( i->filters[id].rtype->FieldType(j)->Tag() == TYPE_RECORD ) { if ( filter->rtype->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, i->filters[id].rtype->FieldType(j)->AsRecordType(), &position); val = LogValToRecordVal(vals, filter->rtype->FieldType(j)->AsRecordType(), &position);
} else { } else {
val = LogValToVal(vals[position], i->filters[id].rtype->FieldType(j)); val = LogValToVal(vals[position], filter->rtype->FieldType(j));
position++; position++;
} }
@ -776,7 +989,7 @@ void InputMgr::Put(const InputReader* reader, int id, const LogVal* const *vals)
valval = r; valval = r;
} }
i->filters[id].tab->Assign(idxval, valval); filter->tab->Assign(idxval, valval);
} }
void InputMgr::Clear(const InputReader* reader, int id) { void InputMgr::Clear(const InputReader* reader, int id) {
@ -788,7 +1001,10 @@ void InputMgr::Clear(const InputReader* reader, int id) {
assert(i->HasFilter(id)); assert(i->HasFilter(id));
i->filters[id].tab->RemoveAll(); assert(i->filters[id]->filter_type == TABLE_FILTER);
TableFilter* filter = (TableFilter*) i->filters[id];
filter->tab->RemoveAll();
} }
bool InputMgr::Delete(const InputReader* reader, int id, const LogVal* const *vals) { bool InputMgr::Delete(const InputReader* reader, int id, const LogVal* const *vals) {
@ -800,9 +1016,18 @@ bool InputMgr::Delete(const InputReader* reader, int id, const LogVal* const *va
assert(i->HasFilter(id)); assert(i->HasFilter(id));
Val* idxval = LogValToIndexVal(i->filters[id].num_idx_fields, i->filters[id].itype, vals); if ( i->filters[id]->filter_type == TABLE_FILTER ) {
TableFilter* filter = (TableFilter*) i->filters[id];
return ( i->filters[id].tab->Delete(idxval) != 0 ); Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals);
return( filter->tab->Delete(idxval) != 0 );
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEventFilterEvent(reader, type, id, vals);
return true;
} else {
assert(false);
return false;
}
} }
void InputMgr::Error(InputReader* reader, const char* msg) void InputMgr::Error(InputReader* reader, const char* msg)
@ -835,25 +1060,35 @@ bool InputMgr::SendEvent(const string& name, const int num_vals, const LogVal* c
return true; return true;
} }
void InputMgr::SendEvent(EventHandlerPtr ev, EnumVal* event, Val* left, Val* right) void InputMgr::SendEvent(EventHandlerPtr ev, const int numvals, ...)
{ {
//EventHandler* handler = event_registry->Lookup(name.c_str());
//if ( handler == 0 ) {
// reporter->Error("Event %s not found", name.c_str());
// return;
//}
val_list* vl = new val_list; val_list* vl = new val_list;
vl->append(event);
vl->append(left);
vl->append(right);
//mgr.Dispatch(new Event(handler, vl)); va_list lP;
va_start(lP, numvals);
for ( int i = 0; i < numvals; i++ )
{
vl->append( va_arg(lP, Val*) );
}
va_end(lP);
mgr.QueueEvent(ev, vl, SOURCE_LOCAL);
}
void InputMgr::SendEvent(EventHandlerPtr ev, list<Val*> events)
{
val_list* vl = new val_list;
for ( list<Val*>::iterator i = events.begin(); i != events.end(); i++ ) {
vl->append( *i );
}
mgr.QueueEvent(ev, vl, SOURCE_LOCAL); mgr.QueueEvent(ev, vl, SOURCE_LOCAL);
} }
Val* InputMgr::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) {
RecordVal* InputMgr::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) {
if ( position == 0 ) { if ( position == 0 ) {
reporter->InternalError("Need position"); reporter->InternalError("Need position");
return 0; return 0;

View file

@ -27,6 +27,8 @@ public:
bool AddTableFilter(EnumVal *id, RecordVal* filter); bool AddTableFilter(EnumVal *id, RecordVal* filter);
bool RemoveTableFilter(EnumVal* id, const string &name); bool RemoveTableFilter(EnumVal* id, const string &name);
bool AddEventFilter(EnumVal *id, RecordVal* filter);
bool RemoveEventFilter(EnumVal* id, const string &name);
protected: protected:
friend class InputReader; friend class InputReader;
@ -46,10 +48,17 @@ protected:
private: private:
struct ReaderInfo; struct ReaderInfo;
void SendEntryTable(const InputReader* reader, int id, const LogVal* const *vals);
void PutTable(const InputReader* reader, int id, const LogVal* const *vals);
void SendEventFilterEvent(const InputReader* reader, EnumVal* type, int id, const LogVal* const *vals);
bool IsCompatibleType(BroType* t, bool atomic_only=false); bool IsCompatibleType(BroType* t, bool atomic_only=false);
bool UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend); bool UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend);
void SendEvent(EventHandlerPtr ev, EnumVal* event, Val* left, Val* right);
void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events);
bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
HashKey* HashLogVals(const int num_elements, const LogVal* const *vals); HashKey* HashLogVals(const int num_elements, const LogVal* const *vals);
int GetLogValLength(const LogVal* val); int GetLogValLength(const LogVal* val);
@ -57,9 +66,8 @@ private:
Val* LogValToVal(const LogVal* val, BroType* request_type); Val* LogValToVal(const LogVal* val, BroType* request_type);
Val* LogValToIndexVal(int num_fields, const RecordType* type, const LogVal* const *vals); Val* LogValToIndexVal(int num_fields, const RecordType* type, const LogVal* const *vals);
Val* LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position); RecordVal* LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position);
bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals);
ReaderInfo* FindReader(const InputReader* reader); ReaderInfo* FindReader(const InputReader* reader);
ReaderInfo* FindReader(const EnumVal* id); ReaderInfo* FindReader(const EnumVal* id);
@ -68,7 +76,12 @@ private:
string Hash(const string &input); string Hash(const string &input);
struct Filter; class Filter;
class TableFilter;
class EventFilter;
enum FilterType { TABLE_FILTER, EVENT_FILTER };
}; };

View file

@ -9,6 +9,7 @@ module Input;
type StreamDescription: record; type StreamDescription: record;
type TableFilter: record; type TableFilter: record;
type EventFilter: record;
function Input::__create_stream%(id: Log::ID, description: Input::StreamDescription%) : bool function Input::__create_stream%(id: Log::ID, description: Input::StreamDescription%) : bool
%{ %{
@ -40,6 +41,18 @@ function Input::__remove_tablefilter%(id: Log::ID, name: string%) : bool
return new Val( res, TYPE_BOOL); return new Val( res, TYPE_BOOL);
%} %}
function Input::__add_eventfilter%(id: Log::ID, filter: Input::EventFilter%) : bool
%{
bool res = input_mgr->AddEventFilter(id->AsEnumVal(), filter->AsRecordVal());
return new Val( res, TYPE_BOOL );
%}
function Input::__remove_eventfilter%(id: Log::ID, name: string%) : bool
%{
bool res = input_mgr->RemoveEventFilter(id->AsEnumVal(), name->AsString()->CheckString());
return new Val( res, TYPE_BOOL);
%}
# Options for Ascii Reader # Options for Ascii Reader
module InputAscii; module InputAscii;

View file

@ -0,0 +1,21 @@
Input::EVENT_NEW
1
T
Input::EVENT_NEW
2
T
Input::EVENT_NEW
3
F
Input::EVENT_NEW
4
F
Input::EVENT_NEW
5
F
Input::EVENT_NEW
6
F
Input::EVENT_NEW
7
T

View file

@ -16,7 +16,6 @@
7 T 7 T
@TEST-END-FILE @TEST-END-FILE
redef InputAscii::empty_field = "EMPTY";
module A; module A;
@ -24,25 +23,20 @@ export {
redef enum Log::ID += { LOG }; redef enum Log::ID += { LOG };
} }
type Idx: record {
i: int;
};
type Val: record { type Val: record {
i: int;
b: bool; b: bool;
}; };
global destination: table[int] of Val = table(); event line(tpe: Input::Event, i: int, b: bool) {
event line(tpe: Input::Event, left: Idx, right: bool) {
print tpe; print tpe;
print left; print i;
print right; print b;
} }
event bro_init() event bro_init()
{ {
Input::create_stream(A::LOG, [$source="input.log"]); Input::create_stream(A::LOG, [$source="input.log"]);
Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]); Input::add_eventfilter(A::LOG, [$name="input", $fields=Val, $ev=line]);
Input::force_update(A::LOG); Input::force_update(A::LOG);
} }

View file

@ -0,0 +1,48 @@
#
# @TEST-EXEC: bro %INPUT >out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log
#separator \x09
#path ssh
#fields i b
#types int bool
1 T
2 T
3 F
4 F
5 F
6 F
7 T
@TEST-END-FILE
redef InputAscii::empty_field = "EMPTY";
module A;
export {
redef enum Log::ID += { LOG };
}
type Idx: record {
i: int;
};
type Val: record {
b: bool;
};
global destination: table[int] of Val = table();
event line(tpe: Input::Event, left: Idx, right: bool) {
print tpe;
print left;
print right;
}
event bro_init()
{
Input::create_stream(A::LOG, [$source="input.log"]);
Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]);
Input::force_update(A::LOG);
}