filters have been called streams for eternity. And I always was too

lazy to change it everywhere...

Fix that.
This commit is contained in:
Bernhard Amann 2012-05-25 14:26:11 -07:00 committed by Robin Sommer
parent 61ce9b5412
commit 658b188dff
7 changed files with 217 additions and 208 deletions

View file

@ -24,7 +24,7 @@ export {
## Read mode to use for this stream ## Read mode to use for this stream
mode: Mode &default=default_mode; mode: Mode &default=default_mode;
## Descriptive name. Used to remove a filter at a later time ## Descriptive name. Used to remove a stream at a later time
name: string; name: string;
## Special definitions for tables ## Special definitions for tables
@ -65,7 +65,7 @@ export {
## Read mode to use for this stream ## Read mode to use for this stream
mode: Mode &default=default_mode; mode: Mode &default=default_mode;
## Descriptive name. Used to remove a filter at a later time ## Descriptive name. Used to remove a stream at a later time
name: string; name: string;
## Special definitions for events ## Special definitions for events

View file

@ -62,7 +62,7 @@ public:
int mode; int mode;
StreamType filter_type; // to distinguish between event and table filters StreamType stream_type; // to distinguish between event and table streams
EnumVal* type; EnumVal* type;
ReaderFrontend* reader; ReaderFrontend* reader;
@ -129,7 +129,7 @@ public:
Manager::TableStream::TableStream() : Manager::Stream::Stream() Manager::TableStream::TableStream() : Manager::Stream::Stream()
{ {
filter_type = TABLE_FILTER; stream_type = TABLE_FILTER;
tab = 0; tab = 0;
itype = 0; itype = 0;
@ -144,7 +144,7 @@ Manager::TableStream::TableStream() : Manager::Stream::Stream()
Manager::EventStream::EventStream() : Manager::Stream::Stream() Manager::EventStream::EventStream() : Manager::Stream::Stream()
{ {
fields = 0; fields = 0;
filter_type = EVENT_FILTER; stream_type = EVENT_FILTER;
} }
Manager::EventStream::~EventStream() Manager::EventStream::~EventStream()
@ -322,16 +322,16 @@ bool Manager::CreateEventStream(RecordVal* fval)
RecordType* rtype = fval->Type()->AsRecordType(); RecordType* rtype = fval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) ) if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) )
{ {
reporter->Error("filter argument not of right type"); reporter->Error("EventDescription argument not of right type");
return false; return false;
} }
EventStream* filter = new EventStream(); EventStream* stream = new EventStream();
{ {
bool res = CreateStream(filter, fval); bool res = CreateStream(stream, fval);
if ( res == false ) if ( res == false )
{ {
delete filter; delete stream;
return false; return false;
} }
} }
@ -428,19 +428,19 @@ bool Manager::CreateEventStream(RecordVal* fval)
logf[i] = fieldsV[i]; logf[i] = fieldsV[i];
Unref(fields); // ref'd by lookupwithdefault Unref(fields); // ref'd by lookupwithdefault
filter->num_fields = fieldsV.size(); stream->num_fields = fieldsV.size();
filter->fields = fields->Ref()->AsRecordType(); stream->fields = fields->Ref()->AsRecordType();
filter->event = event_registry->Lookup(event->GetID()->Name()); stream->event = event_registry->Lookup(event->GetID()->Name());
filter->want_record = ( want_record->InternalInt() == 1 ); stream->want_record = ( want_record->InternalInt() == 1 );
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
assert(filter->reader); assert(stream->reader);
filter->reader->Init(filter->source, filter->mode, filter->num_fields, logf ); stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf );
readers[filter->reader] = filter; readers[stream->reader] = stream;
DBG_LOG(DBG_INPUT, "Successfully created event stream %s", DBG_LOG(DBG_INPUT, "Successfully created event stream %s",
filter->name.c_str()); stream->name.c_str());
return true; return true;
} }
@ -450,16 +450,16 @@ bool Manager::CreateTableStream(RecordVal* fval)
RecordType* rtype = fval->Type()->AsRecordType(); RecordType* rtype = fval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) ) if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) )
{ {
reporter->Error("filter argument not of right type"); reporter->Error("TableDescription argument not of right type");
return false; return false;
} }
TableStream* filter = new TableStream(); TableStream* stream = new TableStream();
{ {
bool res = CreateStream(filter, fval); bool res = CreateStream(stream, fval);
if ( res == false ) if ( res == false )
{ {
delete filter; delete stream;
return false; return false;
} }
} }
@ -587,40 +587,40 @@ bool Manager::CreateTableStream(RecordVal* fval)
for ( unsigned int i = 0; i < fieldsV.size(); i++ ) for ( unsigned int i = 0; i < fieldsV.size(); i++ )
fields[i] = fieldsV[i]; fields[i] = fieldsV[i];
filter->pred = pred ? pred->AsFunc() : 0; stream->pred = pred ? pred->AsFunc() : 0;
filter->num_idx_fields = idxfields; stream->num_idx_fields = idxfields;
filter->num_val_fields = valfields; stream->num_val_fields = valfields;
filter->tab = dst->AsTableVal(); stream->tab = dst->AsTableVal();
filter->rtype = val ? val->AsRecordType() : 0; stream->rtype = val ? val->AsRecordType() : 0;
filter->itype = idx->AsRecordType(); stream->itype = idx->AsRecordType();
filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; stream->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
filter->currDict = new PDict(InputHash); stream->currDict = new PDict(InputHash);
filter->currDict->SetDeleteFunc(input_hash_delete_func); stream->currDict->SetDeleteFunc(input_hash_delete_func);
filter->lastDict = new PDict(InputHash); stream->lastDict = new PDict(InputHash);
filter->lastDict->SetDeleteFunc(input_hash_delete_func); stream->lastDict->SetDeleteFunc(input_hash_delete_func);
filter->want_record = ( want_record->InternalInt() == 1 ); stream->want_record = ( want_record->InternalInt() == 1 );
Unref(want_record); // ref'd by lookupwithdefault Unref(want_record); // ref'd by lookupwithdefault
Unref(pred); Unref(pred);
if ( valfields > 1 ) if ( valfields > 1 )
{ {
if ( ! filter->want_record ) if ( ! stream->want_record )
{ {
reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", filter->name.c_str()); reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", stream->name.c_str());
delete filter; delete stream;
return false; return false;
} }
} }
assert(filter->reader); assert(stream->reader);
filter->reader->Init(filter->source, filter->mode, fieldsV.size(), fields ); stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields );
readers[filter->reader] = filter; readers[stream->reader] = stream;
DBG_LOG(DBG_INPUT, "Successfully created table stream %s", DBG_LOG(DBG_INPUT, "Successfully created table stream %s",
filter->name.c_str()); stream->name.c_str());
return true; return true;
} }
@ -872,9 +872,9 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals)
} }
int readFields; int readFields;
if ( i->filter_type == TABLE_FILTER ) if ( i->stream_type == TABLE_FILTER )
readFields = SendEntryTable(i, vals); readFields = SendEntryTable(i, vals);
else if ( i->filter_type == EVENT_FILTER ) else if ( i->stream_type == EVENT_FILTER )
{ {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
readFields = SendEventStreamEvent(i, type, vals); readFields = SendEventStreamEvent(i, type, vals);
@ -894,21 +894,21 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
assert(i); assert(i);
assert(i->filter_type == TABLE_FILTER); assert(i->stream_type == TABLE_FILTER);
TableStream* filter = (TableStream*) i; TableStream* stream = (TableStream*) i;
HashKey* idxhash = HashValues(filter->num_idx_fields, vals); HashKey* idxhash = HashValues(stream->num_idx_fields, vals);
if ( idxhash == 0 ) if ( idxhash == 0 )
{ {
reporter->Error("Could not hash line. Ignoring"); reporter->Error("Could not hash line. Ignoring");
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
hash_t valhash = 0; hash_t valhash = 0;
if ( filter->num_val_fields > 0 ) if ( stream->num_val_fields > 0 )
{ {
HashKey* valhashkey = HashValues(filter->num_val_fields, vals+filter->num_idx_fields); HashKey* valhashkey = HashValues(stream->num_val_fields, vals+stream->num_idx_fields);
if ( valhashkey == 0 ) { if ( valhashkey == 0 ) {
// empty line. index, but no values. // empty line. index, but no values.
// hence we also have no hash value... // hence we also have no hash value...
@ -920,23 +920,23 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
} }
} }
InputHash *h = filter->lastDict->Lookup(idxhash); InputHash *h = stream->lastDict->Lookup(idxhash);
if ( h != 0 ) if ( h != 0 )
{ {
// seen before // seen before
if ( filter->num_val_fields == 0 || h->valhash == valhash ) if ( stream->num_val_fields == 0 || h->valhash == valhash )
{ {
// ok, exact duplicate, move entry to new dicrionary and do nothing else. // ok, exact duplicate, move entry to new dicrionary and do nothing else.
filter->lastDict->Remove(idxhash); stream->lastDict->Remove(idxhash);
filter->currDict->Insert(idxhash, h); stream->currDict->Insert(idxhash, h);
delete idxhash; delete idxhash;
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
else else
{ {
assert( filter->num_val_fields > 0 ); assert( stream->num_val_fields > 0 );
// entry was updated in some way // entry was updated in some way
filter->lastDict->Remove(idxhash); stream->lastDict->Remove(idxhash);
// keep h for predicates // keep h for predicates
updated = true; updated = true;
@ -947,24 +947,24 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
Val* valval; Val* valval;
RecordVal* predidx = 0; RecordVal* predidx = 0;
int position = filter->num_idx_fields; int position = stream->num_idx_fields;
if ( filter->num_val_fields == 0 ) if ( stream->num_val_fields == 0 )
valval = 0; valval = 0;
else if ( filter->num_val_fields == 1 && !filter->want_record ) else if ( stream->num_val_fields == 1 && !stream->want_record )
valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); valval = ValueToVal(vals[position], stream->rtype->FieldType(0));
else else
valval = ValueToRecordVal(vals, filter->rtype, &position); valval = ValueToRecordVal(vals, stream->rtype, &position);
// call filter first to determine if we really add / change the entry // call stream first to determine if we really add / change the entry
if ( filter->pred ) if ( stream->pred )
{ {
EnumVal* ev; EnumVal* ev;
//Ref(idxval); //Ref(idxval);
int startpos = 0; int startpos = 0;
//Val* predidx = ListValToRecordVal(idxval->AsListVal(), filter->itype, &startpos); //Val* predidx = ListValToRecordVal(idxval->AsListVal(), stream->itype, &startpos);
predidx = ValueToRecordVal(vals, filter->itype, &startpos); predidx = ValueToRecordVal(vals, stream->itype, &startpos);
//ValueToRecordVal(vals, filter->itype, &startpos); //ValueToRecordVal(vals, stream->itype, &startpos);
if ( updated ) if ( updated )
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
@ -972,10 +972,10 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
bool result; bool result;
if ( filter->num_val_fields > 0 ) // we have values if ( stream->num_val_fields > 0 ) // we have values
result = CallPred(filter->pred, 3, ev, predidx->Ref(), valval->Ref()); result = CallPred(stream->pred, 3, ev, predidx->Ref(), valval->Ref());
else // no values else // no values
result = CallPred(filter->pred, 2, ev, predidx->Ref()); result = CallPred(stream->pred, 2, ev, predidx->Ref());
if ( result == false ) if ( result == false )
{ {
@ -985,17 +985,17 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
{ {
// throw away. Hence - we quit. And remove the entry from the current dictionary... // throw away. Hence - we quit. And remove the entry from the current dictionary...
// (but why should it be in there? assert this). // (but why should it be in there? assert this).
assert ( filter->currDict->RemoveEntry(idxhash) == 0 ); assert ( stream->currDict->RemoveEntry(idxhash) == 0 );
delete idxhash; delete idxhash;
delete h; delete h;
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
else else
{ {
// keep old one // keep old one
filter->currDict->Insert(idxhash, h); stream->currDict->Insert(idxhash, h);
delete idxhash; delete idxhash;
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
} }
@ -1016,19 +1016,19 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
// I think there is an unref missing here. But if I insert is, it crashes :) // I think there is an unref missing here. But if I insert is, it crashes :)
} }
else else
idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals);
Val* oldval = 0; Val* oldval = 0;
if ( updated == true ) if ( updated == true )
{ {
assert(filter->num_val_fields > 0); assert(stream->num_val_fields > 0);
// in that case, we need the old value to send the event (if we send an event). // in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval, false); oldval = stream->tab->Lookup(idxval, false);
} }
//i->tab->Assign(idxval, valval); //i->tab->Assign(idxval, valval);
assert(idxval); assert(idxval);
HashKey* k = filter->tab->ComputeHash(idxval); HashKey* k = stream->tab->ComputeHash(idxval);
if ( !k ) if ( !k )
{ {
reporter->InternalError("could not hash"); reporter->InternalError("could not hash");
@ -1039,46 +1039,46 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals)
ih->idxkey = new HashKey(k->Key(), k->Size(), k->Hash()); ih->idxkey = new HashKey(k->Key(), k->Size(), k->Hash());
ih->valhash = valhash; ih->valhash = valhash;
if ( filter->event && updated ) if ( stream->event && updated )
Ref(oldval); // otherwise it is no longer accessible after the assignment Ref(oldval); // otherwise it is no longer accessible after the assignment
filter->tab->Assign(idxval, k, valval); stream->tab->Assign(idxval, k, valval);
Unref(idxval); // asssign does not consume idxval. Unref(idxval); // asssign does not consume idxval.
if ( predidx != 0 ) if ( predidx != 0 )
Unref(predidx); Unref(predidx);
filter->currDict->Insert(idxhash, ih); stream->currDict->Insert(idxhash, ih);
delete idxhash; delete idxhash;
if ( filter->event ) if ( stream->event )
{ {
EnumVal* ev; EnumVal* ev;
int startpos = 0; int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos);
if ( updated ) if ( updated )
{ // in case of update send back the old value. { // in case of update send back the old value.
assert ( filter->num_val_fields > 0 ); assert ( stream->num_val_fields > 0 );
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 ); assert ( oldval != 0 );
SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, oldval); SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, oldval);
} }
else else
{ {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
if ( filter->num_val_fields == 0 ) if ( stream->num_val_fields == 0 )
{ {
Ref(filter->description); Ref(stream->description);
SendEvent(filter->event, 3, filter->description->Ref(), ev, predidx); SendEvent(stream->event, 3, stream->description->Ref(), ev, predidx);
} }
else else
SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, valval->Ref()); SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, valval->Ref());
} }
} }
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
@ -1097,19 +1097,19 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
i->name.c_str()); i->name.c_str());
#endif #endif
if ( i->filter_type == EVENT_FILTER ) // nothing to do.. if ( i->stream_type == EVENT_FILTER ) // nothing to do..
return; return;
assert(i->filter_type == TABLE_FILTER); assert(i->stream_type == TABLE_FILTER);
TableStream* filter = (TableStream*) i; TableStream* stream = (TableStream*) i;
// lastdict contains all deleted entries and should be empty apart from that // lastdict contains all deleted entries and should be empty apart from that
IterCookie *c = filter->lastDict->InitForIteration(); IterCookie *c = stream->lastDict->InitForIteration();
filter->lastDict->MakeRobustCookie(c); stream->lastDict->MakeRobustCookie(c);
InputHash* ih; InputHash* ih;
HashKey *lastDictIdxKey; HashKey *lastDictIdxKey;
//while ( ( ih = i->lastDict->NextEntry(c) ) ) { //while ( ( ih = i->lastDict->NextEntry(c) ) ) {
while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) while ( ( ih = stream->lastDict->NextEntry(lastDictIdxKey, c) ) )
{ {
ListVal * idx = 0; ListVal * idx = 0;
Val *val = 0; Val *val = 0;
@ -1118,17 +1118,17 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
EnumVal* ev = 0; EnumVal* ev = 0;
int startpos = 0; int startpos = 0;
if ( filter->pred || filter->event ) if ( stream->pred || stream->event )
{ {
idx = filter->tab->RecoverIndex(ih->idxkey); idx = stream->tab->RecoverIndex(ih->idxkey);
assert(idx != 0); assert(idx != 0);
val = filter->tab->Lookup(idx); val = stream->tab->Lookup(idx);
assert(val != 0); assert(val != 0);
predidx = ListValToRecordVal(idx, filter->itype, &startpos); predidx = ListValToRecordVal(idx, stream->itype, &startpos);
ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
} }
if ( filter->pred ) if ( stream->pred )
{ {
// ask predicate, if we want to expire this element... // ask predicate, if we want to expire this element...
@ -1136,7 +1136,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
Ref(predidx); Ref(predidx);
Ref(val); Ref(val);
bool result = CallPred(filter->pred, 3, ev, predidx, val); bool result = CallPred(stream->pred, 3, ev, predidx, val);
if ( result == false ) if ( result == false )
{ {
@ -1144,37 +1144,37 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
// ah well - and we have to add the entry to currDict... // ah well - and we have to add the entry to currDict...
Unref(predidx); Unref(predidx);
Unref(ev); Unref(ev);
filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey)); stream->currDict->Insert(lastDictIdxKey, stream->lastDict->RemoveEntry(lastDictIdxKey));
delete lastDictIdxKey; delete lastDictIdxKey;
continue; continue;
} }
} }
if ( filter->event ) if ( stream->event )
{ {
Ref(predidx); Ref(predidx);
Ref(val); Ref(val);
Ref(ev); Ref(ev);
SendEvent(filter->event, 3, ev, predidx, val); SendEvent(stream->event, 3, ev, predidx, val);
} }
if ( predidx ) // if we have a filter or an event... if ( predidx ) // if we have a stream or an event...
Unref(predidx); Unref(predidx);
if ( ev ) if ( ev )
Unref(ev); Unref(ev);
Unref(filter->tab->Delete(ih->idxkey)); Unref(stream->tab->Delete(ih->idxkey));
filter->lastDict->Remove(lastDictIdxKey); // delete in next line stream->lastDict->Remove(lastDictIdxKey); // delete in next line
delete lastDictIdxKey; delete lastDictIdxKey;
delete(ih); delete(ih);
} }
filter->lastDict->Clear(); // should be empt. buti- well... who knows... stream->lastDict->Clear(); // should be empt. buti- well... who knows...
delete(filter->lastDict); delete(stream->lastDict);
filter->lastDict = filter->currDict; stream->lastDict = stream->currDict;
filter->currDict = new PDict(InputHash); stream->currDict = new PDict(InputHash);
filter->currDict->SetDeleteFunc(input_hash_delete_func); stream->currDict->SetDeleteFunc(input_hash_delete_func);
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event",
@ -1199,9 +1199,9 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals)
} }
int readFields; int readFields;
if ( i->filter_type == TABLE_FILTER ) if ( i->stream_type == TABLE_FILTER )
readFields = PutTable(i, vals); readFields = PutTable(i, vals);
else if ( i->filter_type == EVENT_FILTER ) else if ( i->stream_type == EVENT_FILTER )
{ {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
readFields = SendEventStreamEvent(i, type, vals); readFields = SendEventStreamEvent(i, type, vals);
@ -1219,44 +1219,44 @@ int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *
{ {
assert(i); assert(i);
assert(i->filter_type == EVENT_FILTER); assert(i->stream_type == EVENT_FILTER);
EventStream* filter = (EventStream*) i; EventStream* stream = (EventStream*) i;
Val *val; Val *val;
list<Val*> out_vals; list<Val*> out_vals;
Ref(filter->description); Ref(stream->description);
out_vals.push_back(filter->description); out_vals.push_back(stream->description);
// no tracking, send everything with a new event... // no tracking, send everything with a new event...
//out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event)); //out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event));
out_vals.push_back(type); out_vals.push_back(type);
int position = 0; int position = 0;
if ( filter->want_record ) if ( stream->want_record )
{ {
RecordVal * r = ValueToRecordVal(vals, filter->fields, &position); RecordVal * r = ValueToRecordVal(vals, stream->fields, &position);
out_vals.push_back(r); out_vals.push_back(r);
} }
else else
{ {
for ( int j = 0; j < filter->fields->NumFields(); j++) for ( int j = 0; j < stream->fields->NumFields(); j++)
{ {
Val* val = 0; Val* val = 0;
if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) if ( stream->fields->FieldType(j)->Tag() == TYPE_RECORD )
val = ValueToRecordVal(vals, val = ValueToRecordVal(vals,
filter->fields->FieldType(j)->AsRecordType(), stream->fields->FieldType(j)->AsRecordType(),
&position); &position);
else else
{ {
val = ValueToVal(vals[position], filter->fields->FieldType(j)); val = ValueToVal(vals[position], stream->fields->FieldType(j));
position++; position++;
} }
out_vals.push_back(val); out_vals.push_back(val);
} }
} }
SendEvent(filter->event, out_vals); SendEvent(stream->event, out_vals);
return filter->fields->NumFields(); return stream->fields->NumFields();
} }
@ -1264,31 +1264,31 @@ int Manager::PutTable(Stream* i, const Value* const *vals)
{ {
assert(i); assert(i);
assert(i->filter_type == TABLE_FILTER); assert(i->stream_type == TABLE_FILTER);
TableStream* filter = (TableStream*) i; TableStream* stream = (TableStream*) i;
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals);
Val* valval; Val* valval;
int position = filter->num_idx_fields; int position = stream->num_idx_fields;
if ( filter->num_val_fields == 0 ) if ( stream->num_val_fields == 0 )
valval = 0; valval = 0;
else if ( filter->num_val_fields == 1 && filter->want_record == 0 ) else if ( stream->num_val_fields == 1 && stream->want_record == 0 )
valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); valval = ValueToVal(vals[position], stream->rtype->FieldType(0));
else else
valval = ValueToRecordVal(vals, filter->rtype, &position); valval = ValueToRecordVal(vals, stream->rtype, &position);
// if we have a subscribed event, we need to figure out, if this is an update or not // if we have a subscribed event, we need to figure out, if this is an update or not
// same for predicates // same for predicates
if ( filter->pred || filter->event ) if ( stream->pred || stream->event )
{ {
bool updated = false; bool updated = false;
Val* oldval = 0; Val* oldval = 0;
if ( filter->num_val_fields > 0 ) if ( stream->num_val_fields > 0 )
{ {
// in that case, we need the old value to send the event (if we send an event). // in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval, false); oldval = stream->tab->Lookup(idxval, false);
} }
if ( oldval != 0 ) if ( oldval != 0 )
@ -1300,11 +1300,11 @@ int Manager::PutTable(Stream* i, const Value* const *vals)
// predicate if we want the update or not // predicate if we want the update or not
if ( filter->pred ) if ( stream->pred )
{ {
EnumVal* ev; EnumVal* ev;
int startpos = 0; int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos);
Ref(valval); Ref(valval);
if ( updated ) if ( updated )
@ -1315,10 +1315,10 @@ int Manager::PutTable(Stream* i, const Value* const *vals)
BifType::Enum::Input::Event); BifType::Enum::Input::Event);
bool result; bool result;
if ( filter->num_val_fields > 0 ) // we have values if ( stream->num_val_fields > 0 ) // we have values
result = CallPred(filter->pred, 3, ev, predidx, valval); result = CallPred(stream->pred, 3, ev, predidx, valval);
else // no values else // no values
result = CallPred(filter->pred, 2, ev, predidx); result = CallPred(stream->pred, 2, ev, predidx);
if ( result == false ) if ( result == false )
{ {
@ -1326,38 +1326,38 @@ int Manager::PutTable(Stream* i, const Value* const *vals)
Unref(idxval); Unref(idxval);
Unref(valval); Unref(valval);
Unref(oldval); Unref(oldval);
return filter->num_val_fields + filter->num_idx_fields; return stream->num_val_fields + stream->num_idx_fields;
} }
} }
filter->tab->Assign(idxval, valval); stream->tab->Assign(idxval, valval);
if ( filter->event ) if ( stream->event )
{ {
EnumVal* ev; EnumVal* ev;
int startpos = 0; int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos);
if ( updated ) if ( updated )
{ {
// in case of update send back the old value. // in case of update send back the old value.
assert ( filter->num_val_fields > 0 ); assert ( stream->num_val_fields > 0 );
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, ev = new EnumVal(BifEnum::Input::EVENT_CHANGED,
BifType::Enum::Input::Event); BifType::Enum::Input::Event);
assert ( oldval != 0 ); assert ( oldval != 0 );
SendEvent(filter->event, 4, filter->description->Ref(), SendEvent(stream->event, 4, stream->description->Ref(),
ev, predidx, oldval); ev, predidx, oldval);
} }
else else
{ {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, ev = new EnumVal(BifEnum::Input::EVENT_NEW,
BifType::Enum::Input::Event); BifType::Enum::Input::Event);
if ( filter->num_val_fields == 0 ) if ( stream->num_val_fields == 0 )
SendEvent(filter->event, 4, filter->description->Ref(), SendEvent(stream->event, 4, stream->description->Ref(),
ev, predidx); ev, predidx);
else else
SendEvent(filter->event, 4, filter->description->Ref(), SendEvent(stream->event, 4, stream->description->Ref(),
ev, predidx, valval->Ref()); ev, predidx, valval->Ref());
} }
@ -1365,10 +1365,10 @@ int Manager::PutTable(Stream* i, const Value* const *vals)
} }
else // no predicates or other stuff else // no predicates or other stuff
filter->tab->Assign(idxval, valval); stream->tab->Assign(idxval, valval);
return filter->num_idx_fields + filter->num_val_fields; return stream->num_idx_fields + stream->num_val_fields;
} }
// Todo:: perhaps throw some kind of clear-event? // Todo:: perhaps throw some kind of clear-event?
@ -1386,10 +1386,10 @@ void Manager::Clear(ReaderFrontend* reader)
i->name.c_str()); i->name.c_str());
#endif #endif
assert(i->filter_type == TABLE_FILTER); assert(i->stream_type == TABLE_FILTER);
TableStream* filter = (TableStream*) i; TableStream* stream = (TableStream*) i;
filter->tab->RemoveAll(); stream->tab->RemoveAll();
} }
// put interface: delete old entry from table. // put interface: delete old entry from table.
@ -1405,28 +1405,28 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals)
bool success = false; bool success = false;
int readVals = 0; int readVals = 0;
if ( i->filter_type == TABLE_FILTER ) if ( i->stream_type == TABLE_FILTER )
{ {
TableStream* filter = (TableStream*) i; TableStream* stream = (TableStream*) i;
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals);
assert(idxval != 0); assert(idxval != 0);
readVals = filter->num_idx_fields + filter->num_val_fields; readVals = stream->num_idx_fields + stream->num_val_fields;
bool filterresult = true; bool streamresult = true;
if ( filter->pred || filter->event ) if ( stream->pred || stream->event )
{ {
Val *val = filter->tab->Lookup(idxval); Val *val = stream->tab->Lookup(idxval);
if ( filter->pred ) if ( stream->pred )
{ {
Ref(val); Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
int startpos = 0; int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos);
filterresult = CallPred(filter->pred, 3, ev, predidx, val); streamresult = CallPred(stream->pred, 3, ev, predidx, val);
if ( filterresult == false ) if ( streamresult == false )
{ {
// keep it. // keep it.
Unref(idxval); Unref(idxval);
@ -1435,21 +1435,21 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals)
} }
// only if filter = true -> no filtering // only if stream = true -> no streaming
if ( filterresult && filter->event ) if ( streamresult && stream->event )
{ {
Ref(idxval); Ref(idxval);
assert(val != 0); assert(val != 0);
Ref(val); Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(filter->event, 4, filter->description->Ref(), ev, idxval, val); SendEvent(stream->event, 4, stream->description->Ref(), ev, idxval, val);
} }
} }
// only if filter = true -> no filtering // only if stream = true -> no streaming
if ( filterresult ) if ( streamresult )
{ {
Val* retptr = filter->tab->Delete(idxval); Val* retptr = stream->tab->Delete(idxval);
success = ( retptr != 0 ); success = ( retptr != 0 );
if ( !success ) if ( !success )
reporter->Error("Internal error while deleting values from input table"); reporter->Error("Internal error while deleting values from input table");
@ -1458,7 +1458,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals)
} }
} }
else if ( i->filter_type == EVENT_FILTER ) else if ( i->stream_type == EVENT_FILTER )
{ {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
readVals = SendEventStreamEvent(i, type, vals); readVals = SendEventStreamEvent(i, type, vals);

View file

@ -1,6 +1,6 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
// //
// Class for managing input streams and filters // Class for managing input streams
#ifndef INPUT_MANAGER_H #ifndef INPUT_MANAGER_H
#define INPUT_MANAGER_H #define INPUT_MANAGER_H
@ -34,12 +34,7 @@ public:
~Manager(); ~Manager();
/** /**
* Creates a new input stream. * Creates a new input stream which will write the data from the data source into
* Add a filter to an input source, which will write the data from the data source into
* a Bro table.
* Add a filter to an input source, which sends events for read input data.
*
* @param id The enum value corresponding the input stream.
* *
* @param description A record of script type \c Input:StreamDescription. * @param description A record of script type \c Input:StreamDescription.
* *
@ -47,6 +42,15 @@ public:
* input.bif, which just forwards here. * input.bif, which just forwards here.
*/ */
bool CreateTableStream(RecordVal* description); bool CreateTableStream(RecordVal* description);
/**
* Creates a new input stream which sends events for read input data.
*
* @param description A record of script type \c Input:StreamDescription.
*
* This method corresponds directly to the internal BiF defined in
* input.bif, which just forwards here.
*/
bool CreateEventStream(RecordVal* description); bool CreateEventStream(RecordVal* description);
@ -104,11 +108,11 @@ protected:
// doing so creates a new thread!). // doing so creates a new thread!).
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
// Functions are called from the ReaderBackend to notify the manager, that a filter has been removed // Functions are called from the ReaderBackend to notify the manager, that a stream has been removed
// or a stream has been closed. // or a stream has been closed.
// Used to prevent race conditions where data for a specific filter is still in the queue when the // Used to prevent race conditions where data for a specific stream is still in the queue when the
// RemoveStream directive is executed by the main thread. // RemoveStream directive is executed by the main thread.
// This makes sure all data that has ben queued for a filter is still received. // This makes sure all data that has ben queued for a stream is still received.
bool RemoveStreamContinuation(ReaderFrontend* reader); bool RemoveStreamContinuation(ReaderFrontend* reader);
private: private:
@ -118,13 +122,13 @@ private:
bool CreateStream(Stream*, RecordVal* description); bool CreateStream(Stream*, RecordVal* description);
// SendEntry implementation for Tablefilter // SendEntry implementation for Table stream
int SendEntryTable(Stream* i, const threading::Value* const *vals); int SendEntryTable(Stream* i, const threading::Value* const *vals);
// Put implementation for Tablefilter // Put implementation for Table stream
int PutTable(Stream* i, const threading::Value* const *vals); int PutTable(Stream* i, const threading::Value* const *vals);
// SendEntry and Put implementation for Eventfilter // SendEntry and Put implementation for Event stream
int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals); int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals);
// Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types // Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types

View file

@ -55,7 +55,8 @@ public:
* *
* @param arg_num_fields number of fields contained in \a fields * @param arg_num_fields number of fields contained in \a fields
* *
* @param fields the types and names of the fields to be retrieved from the input source * @param fields the types and names of the fields to be retrieved
* from the input source
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
@ -72,7 +73,8 @@ public:
/** /**
* Force trigger an update of the input stream. * Force trigger an update of the input stream.
* The action that will be taken depends on the current read mode and the individual input backend * The action that will be taken depends on the current read mode and the
* individual input backend
* *
* An backend can choose to ignore this. * An backend can choose to ignore this.
* *
@ -90,8 +92,8 @@ protected:
// Methods that have to be overwritten by the individual readers // Methods that have to be overwritten by the individual readers
/** /**
* Reader-specific intialization method. Note that data may only be read from the input source * Reader-specific intialization method. Note that data may only be
* after the Start function has been called. * read from the input source after the Start function has been called.
* *
* A reader implementation must override this method. If it returns * A reader implementation must override this method. If it returns
* false, it will be assumed that a fatal error has occured that * false, it will be assumed that a fatal error has occured that
@ -145,29 +147,32 @@ protected:
*/ */
void SendEvent(const string& name, const int num_vals, threading::Value* *vals); void SendEvent(const string& name, const int num_vals, threading::Value* *vals);
// Content-sending-functions (simple mode). Including table-specific stuff that simply is not used if we have no table // Content-sending-functions (simple mode). Including table-specific stuff that
// simply is not used if we have no table
/** /**
* Method allowing a reader to send a list of values read for a specific filter back to the manager. * Method allowing a reader to send a list of values read for a specific stream
* back to the manager.
* *
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised * If the stream is a table stream, the values are inserted into the table;
* if it is an event stream, the event is raised.
* *
* @param val list of threading::Values expected by the filter * @param val list of threading::Values expected by the stream
*/ */
void Put(threading::Value* *val); void Put(threading::Value* *val);
/** /**
* Method allowing a reader to delete a specific value from a bro table. * Method allowing a reader to delete a specific value from a bro table.
* *
* If the receiving filter is an event, only a removed event is raised * If the receiving stream is an event stream, only a removed event is raised
* *
* @param val list of threading::Values expected by the filter * @param val list of threading::Values expected by the stream
*/ */
void Delete(threading::Value* *val); void Delete(threading::Value* *val);
/** /**
* Method allowing a reader to clear a value from a bro table. * Method allowing a reader to clear a value from a bro table.
* *
* If the receiving filter is an event, this is ignored. * If the receiving stream is an event stream, this is ignored.
* *
*/ */
void Clear(); void Clear();
@ -176,19 +181,22 @@ protected:
/** /**
* Method allowing a reader to send a list of values read for a specific filter back to the manager. * Method allowing a reader to send a list of values read for a specific stream
* back to the manager.
* *
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised. * If the stream is a table stream, the values are inserted into the table;
* if it is an event stream, the event is raised.
* *
* @param val list of threading::Values expected by the filter * @param val list of threading::Values expected by the stream
*/ */
void SendEntry(threading::Value* *vals); void SendEntry(threading::Value* *vals);
/** /**
* Method telling the manager, that the current list of entries sent by SendEntry is finished. * Method telling the manager, that the current list of entries sent by SendEntry
* is finished.
* *
* For table filters, all entries that were not updated since the last EndCurrentSend will be deleted, because they are no longer * For table streams, all entries that were not updated since the last EndCurrentSend
* present in the input source * will be deleted, because they are no longer present in the input source
* *
*/ */
void EndCurrentSend(); void EndCurrentSend();

View file

@ -398,7 +398,7 @@ bool Ascii::DoUpdate()
if ( mode == STREAM ) if ( mode == STREAM )
{ {
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) // in case filters changed if ( !ReadHeader(true) )
return false; // header reading failed return false; // header reading failed
break; break;

View file

@ -74,7 +74,7 @@ private:
string unset_field; string unset_field;
// keep a copy of the headerline to determine field locations when filters change // keep a copy of the headerline to determine field locations when stream descriptions change
string headerline; string headerline;
int mode; int mode;

View file

@ -42,9 +42,6 @@ private:
// Options set from the script-level. // Options set from the script-level.
string separator; string separator;
// keep a copy of the headerline to determine field locations when filters change
string headerline;
int mode; int mode;
bool execute; bool execute;
bool firstrun; bool firstrun;