re-enable table events

This commit is contained in:
Bernhard Amann 2011-11-21 19:01:07 -08:00
parent 77a517f2b5
commit 53af0544cc
5 changed files with 147 additions and 46 deletions

View file

@ -18,7 +18,7 @@ export {
val: any;
destination: any;
want_record: bool &default=T;
table_ev: any &optional; # event containing idx, val as values.
ev: any &optional; # event containing idx, val as values.
## decision function, that decides if an insertion, update or removal should really be executed.
## or events should be thought

View file

@ -214,7 +214,6 @@ InputReader* InputMgr::CreateStream(EnumVal* id, RecordVal* description)
const BroString* bsource = description->Lookup(rtype->FieldOffset("source"))->AsString();
string source((const char*) bsource->Bytes(), bsource->Len());
ReaderInfo* info = new ReaderInfo;
info->reader = reader_obj;
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
@ -259,6 +258,50 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
RecordType *val = fval->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType();
TableVal *dst = fval->Lookup(rtype->FieldOffset("destination"))->AsTableVal();
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
Val* event_val = fval->Lookup(rtype->FieldOffset("ev"));
Func* event = event_val ? event_val->AsFunc() : 0;
if ( event ) {
FuncType* etype = event->FType()->AsFuncType();
if ( ! etype->IsEvent() ) {
reporter->Error("stream event is a function, not an event");
return false;
}
const type_list* args = etype->ArgTypes()->Types();
if ( args->length() != 3 )
{
reporter->Error("Table event must take 3 arguments");
return false;
}
if ( ! same_type((*args)[0], BifType::Enum::Input::Event, 0) )
{
reporter->Error("table events first attribute must be of type Input::Event");
return false;
}
if ( ! same_type((*args)[1], idx) )
{
reporter->Error("table events index attributes do not match");
return false;
}
if ( want_record->InternalInt() == 1 && ! same_type((*args)[2], val) )
{
reporter->Error("table events value attributes do not match");
return false;
} else if ( want_record->InternalInt() == 0 && !same_type((*args)[2], val->FieldType(0) ) ) {
reporter->Error("table events value attribute does not match");
return false;
}
}
vector<LogField*> fieldsV; // vector, because we don't know the length beforehands
bool status = !UnrollRecordType(&fieldsV, idx, "");
@ -273,24 +316,22 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
return false;
}
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
LogField** fields = new LogField*[fieldsV.size()];
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) {
fields[i] = fieldsV[i];
}
// FIXME: remove those funky 0-tests again as the idea was changed.
Filter filter;
filter.name = name->AsString()->CheckString();
filter.id = id->Ref()->AsEnumVal();
filter.pred = pred ? pred->AsFunc() : 0;
filter.num_idx_fields = idxfields;
filter.num_val_fields = valfields;
filter.tab = dst ? dst->Ref()->AsTableVal() : 0;
filter.rtype = val ? val->Ref()->AsRecordType() : 0;
filter.itype = idx ? idx->Ref()->AsRecordType() : 0;
// ya - well - we actually don't need them in every case... well, a few bytes of memory wasted
filter.tab = dst->Ref()->AsTableVal();
filter.rtype = val->Ref()->AsRecordType();
filter.itype = idx->Ref()->AsRecordType();
filter.event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
filter.currDict = new PDict(InputHash);
filter.lastDict = new PDict(InputHash);
filter.want_record = ( want_record->InternalInt() == 1 );
@ -601,11 +642,7 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
i->filters[id].currDict->Insert(idxhash, ih);
// send events now that we are kind of finished.
/* FIXME: fix me.
std::list<string>::iterator filter_iterator = i->events.begin();
while ( filter_iterator != i->events.end() ) {
if ( i->filters[id].event ) {
EnumVal* ev;
Ref(idxval);
@ -613,16 +650,13 @@ void InputMgr::SendEntry(const InputReader* reader, int id, const LogVal* const
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 );
Ref(oldval);
SendEvent(*filter_iterator, ev, idxval, oldval);
SendEvent(i->filters[id].event, ev, idxval, oldval);
} else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
Ref(valval);
SendEvent(*filter_iterator, ev, idxval, valval);
SendEvent(i->filters[id].event, ev, idxval, valval);
}
++filter_iterator;
} */
}
}
@ -643,12 +677,17 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
//while ( ( ih = i->lastDict->NextEntry(c) ) ) {
while ( ( ih = i->filters[id].lastDict->NextEntry(lastDictIdxKey, c) ) ) {
if ( i->filters[id].pred ) {
ListVal *idx = i->filters[id].tab->RecoverIndex(ih->idxkey);
assert(idx != 0);
Val *val = i->filters[id].tab->Lookup(idx);
assert(val != 0);
ListVal * idx;
Val *val;
if ( i->filters[id].pred || i->filters[id].event ) {
idx = i->filters[id].tab->RecoverIndex(ih->idxkey);
assert(idx != 0);
val = i->filters[id].tab->Lookup(idx);
assert(val != 0);
}
if ( i->filters[id].pred ) {
bool doBreak = false;
// ask predicate, if we want to expire this element...
@ -673,21 +712,13 @@ void InputMgr::EndCurrentSend(const InputReader* reader, int id) {
}
//
{
/* FIXME: events
std::list<string>::iterator it = i->filters[id].events.begin();
while ( it != i->filters[id].events.end() ) {
Ref(idx);
Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(*it, ev, idx, val);
++it;
}
*/
}
}
if ( i->filters[id].event ) {
Ref(idx);
Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(i->filters[id].event, ev, idx, val);
}
i->filters[id].tab->Delete(ih->idxkey);
@ -792,20 +823,21 @@ void InputMgr::SendEvent(const string& name, const int num_vals, const LogVal* c
mgr.Dispatch(new Event(handler, vl));
} */
void InputMgr::SendEvent(const string& name, EnumVal* event, Val* left, Val* right)
void InputMgr::SendEvent(EventHandlerPtr ev, EnumVal* event, Val* left, Val* right)
{
EventHandler* handler = event_registry->Lookup(name.c_str());
if ( handler == 0 ) {
reporter->Error("Event %s not found", name.c_str());
return;
}
//EventHandler* handler = event_registry->Lookup(name.c_str());
//if ( handler == 0 ) {
// reporter->Error("Event %s not found", name.c_str());
// return;
//}
val_list* vl = new val_list;
vl->append(event);
vl->append(left);
vl->append(right);
mgr.Dispatch(new Event(handler, vl));
//mgr.Dispatch(new Event(handler, vl));
mgr.QueueEvent(ev, vl, SOURCE_LOCAL);
}

View file

@ -49,7 +49,7 @@ private:
bool IsCompatibleType(BroType* t, bool atomic_only=false);
bool UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend);
void SendEvent(const string& name, EnumVal* event, Val* left, Val* right);
void SendEvent(EventHandlerPtr ev, EnumVal* event, Val* left, Val* right);
HashKey* HashLogVals(const int num_elements, const LogVal* const *vals);
int GetLogValLength(const LogVal* val);

View file

@ -0,0 +1,21 @@
Input::EVENT_NEW
1
T
Input::EVENT_NEW
2
T
Input::EVENT_NEW
3
F
Input::EVENT_NEW
4
F
Input::EVENT_NEW
5
F
Input::EVENT_NEW
6
F
Input::EVENT_NEW
7
T

View file

@ -0,0 +1,48 @@
#
# @TEST-EXEC: bro %INPUT >out
# @TEST-EXEC: btest-diff out
@TEST-START-FILE input.log
#separator \x09
#path ssh
#fields i b
#types int bool
1 T
2 T
3 F
4 F
5 F
6 F
7 T
@TEST-END-FILE
redef InputAscii::empty_field = "EMPTY";
module A;
export {
redef enum Log::ID += { LOG };
}
type Idx: record {
i: int;
};
type Val: record {
b: bool;
};
global destination: table[int] of Val = table();
event line(tpe: Input::Event, left: Idx, right: bool) {
print tpe;
print left;
print right;
}
event bro_init()
{
Input::create_stream(A::LOG, [$source="input.log"]);
Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]);
Input::force_update(A::LOG);
}