allow sets to be read from files, convenience function for reading a file once,

bug in destructor that could lead to a segfault.
This commit is contained in:
Bernhard Amann 2011-11-29 14:32:53 -08:00
parent 2a6387129c
commit a68e6b9fa4
3 changed files with 75 additions and 15 deletions

View file

@ -4,6 +4,8 @@
module Input;
export {
redef enum Input::ID += { TABLE_READ };
## The default input reader used. Defaults to `READER_ASCII`.
const default_reader = READER_ASCII &redef;
@ -27,7 +29,8 @@ export {
## Record that defines the values used as the index of the table
idx: any;
## Record that defines the values used as the values of the table
val: any;
## If val is undefined, destination has to be a set.
val: any &optional;
## Defines if the value of the table is a record (default), or a single value.
## Val can only contain one element when this is set to false.
want_record: bool &default=T;
@ -103,6 +106,14 @@ export {
global remove_eventfilter: function(id: Input::ID, name: string) : bool;
#global get_filter: function(id: ID, name: string) : Filter;
## Convenience function for reading a specific input source exactly once using
## exactly one tablefilter
##
## id: `Input::ID` enum value identifying the stream
## description: `StreamDescription` record describing the source.
## filter: the `TableFilter` record describing the filter.
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
}
@load base/input.bif
@ -151,6 +162,27 @@ function remove_eventfilter(id: Input::ID, name: string) : bool
return __remove_eventfilter(id, name);
}
function read_table(description: Input::StreamDescription, filter: Input::TableFilter) : bool {
local ok: bool = T;
# since we create and delete it ourselves this should be ok... at least for singlethreaded operation
local id: Input::ID = Input::TABLE_READ;
ok = create_stream(id, description);
if ( ok ) {
ok = add_tablefilter(id, filter);
}
if ( ok ) {
ok = force_update(id);
}
if ( ok ) {
ok = remove_stream(id);
} else {
remove_stream(id);
}
return ok;
}
#function get_filter(id: ID, name: string) : Filter
# {
# if ( [id, name] in filters )

View file

@ -86,6 +86,7 @@ InputMgr::Filter::~Filter() {
InputMgr::TableFilter::~TableFilter() {
Unref(tab);
Unref(itype);
if ( rtype ) // can be 0 for sets
Unref(rtype);
delete currDict;
@ -110,6 +111,7 @@ InputMgr::ReaderInfo::~ReaderInfo() {
while ( it != filters.end() ) {
delete (*it).second;
++it;
}
Unref(type);
@ -354,7 +356,10 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
Val* pred = fval->Lookup(rtype->FieldOffset("pred"));
RecordType *idx = fval->Lookup(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType();
RecordType *val = fval->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType();
RecordType *val = 0;
if ( fval->Lookup(rtype->FieldOffset("val")) != 0 ) {
val = fval->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType();
}
TableVal *dst = fval->Lookup(rtype->FieldOffset("destination"))->AsTableVal();
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
@ -408,9 +413,14 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
int idxfields = fieldsV.size();
if ( val ) // if we are not a set
status = status || !UnrollRecordType(&fieldsV, val, "");
int valfields = fieldsV.size() - idxfields;
if ( !val )
assert(valfields == 0);
if ( status ) {
reporter->Error("Problem unrolling");
return false;
@ -429,7 +439,7 @@ bool InputMgr::AddTableFilter(EnumVal *id, RecordVal* fval) {
filter->num_idx_fields = idxfields;
filter->num_val_fields = valfields;
filter->tab = dst->Ref()->AsTableVal();
filter->rtype = val->Ref()->AsRecordType();
filter->rtype = val ? val->Ref()->AsRecordType() : 0;
filter->itype = idx->Ref()->AsRecordType();
filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
filter->currDict = new PDict(InputHash);
@ -681,7 +691,10 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
HashKey* idxhash = HashLogVals(filter->num_idx_fields, vals);
//reporter->Error("Result: %d", (uint64_t) idxhash->Hash());
//reporter->Error("Hashing %d val fields", i->num_val_fields);
HashKey* valhash = HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields);
HashKey* valhash = 0;
if ( filter->num_val_fields > 0 )
HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields);
//reporter->Error("Result: %d", (uint64_t) valhash->Hash());
//reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash());
@ -689,12 +702,13 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
InputHash *h = filter->lastDict->Lookup(idxhash);
if ( h != 0 ) {
// seen before
if ( h->valhash->Hash() == valhash->Hash() ) {
// ok, double.
if ( filter->num_val_fields == 0 || h->valhash->Hash() == valhash->Hash() ) {
// ok, exact duplicate
filter->lastDict->Remove(idxhash);
filter->currDict->Insert(idxhash, h);
return;
} else {
assert( filter->num_val_fields > 0 );
// updated
filter->lastDict->Remove(idxhash);
delete(h);
@ -708,7 +722,9 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
Val* valval;
int position = filter->num_idx_fields;
if ( filter->num_val_fields == 1 && !filter->want_record ) {
if ( filter->num_val_fields == 0 ) {
valval = 0;
} else if ( filter->num_val_fields == 1 && !filter->want_record ) {
valval = LogValToVal(vals[position], filter->rtype->FieldType(0));
} else {
RecordVal * r = new RecordVal(filter->rtype);
@ -732,6 +748,7 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
Val* oldval = 0;
if ( updated == true ) {
assert(filter->num_val_fields > 0);
// in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval);
}
@ -749,10 +766,12 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
}
val_list vl(3);
val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise.
vl.append(ev);
vl.append(idxval);
if ( filter->num_val_fields > 0 )
vl.append(valval);
Val* v = filter->pred->Call(&vl);
bool result = v->AsBool();
Unref(v);
@ -794,6 +813,7 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
Ref(idxval);
if ( updated ) { // in case of update send back the old value.
assert ( filter->num_val_fields > 0 );
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 );
Ref(oldval);
@ -801,9 +821,13 @@ void InputMgr::SendEntryTable(const InputReader* reader, int id, const LogVal* c
} else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
Ref(valval);
if ( filter->num_val_fields == 0 ) {
SendEvent(filter->event, 3, ev, idxval);
} else {
SendEvent(filter->event, 3, ev, idxval, valval);
}
}
}
}
@ -963,7 +987,9 @@ void InputMgr::PutTable(const InputReader* reader, int id, const LogVal* const *
Val* valval;
int position = filter->num_idx_fields;
if ( filter->num_val_fields == 1 && !filter->want_record ) {
if ( filter->num_val_fields == 0 ) {
valval = 0;
} else if ( filter->num_val_fields == 1 && !filter->want_record ) {
valval = LogValToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields));
} else {
RecordVal * r = new RecordVal(filter->rtype);

View file

@ -49,4 +49,6 @@ event bro_init()
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
Input::force_update(A::INPUT);
print servers;
Input::remove_tablefilter(A::INPUT, "ssh");
Input::remove_stream(A::INPUT);
}