diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index a52cd97b4b..c9ce0e321e 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -24,7 +24,7 @@ export { ## Read mode to use for this stream mode: Mode &default=default_mode; - ## Descriptive name. Used to remove a filter at a later time + ## Descriptive name. Used to remove a stream at a later time name: string; ## Special definitions for tables @@ -65,7 +65,7 @@ export { ## Read mode to use for this stream mode: Mode &default=default_mode; - ## Descriptive name. Used to remove a filter at a later time + ## Descriptive name. Used to remove a stream at a later time name: string; ## Special definitions for events diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 3f7fcea078..3bae7dbb28 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -62,7 +62,7 @@ public: int mode; - StreamType filter_type; // to distinguish between event and table filters + StreamType stream_type; // to distinguish between event and table streams EnumVal* type; ReaderFrontend* reader; @@ -129,7 +129,7 @@ public: Manager::TableStream::TableStream() : Manager::Stream::Stream() { - filter_type = TABLE_FILTER; + stream_type = TABLE_FILTER; tab = 0; itype = 0; @@ -144,7 +144,7 @@ Manager::TableStream::TableStream() : Manager::Stream::Stream() Manager::EventStream::EventStream() : Manager::Stream::Stream() { fields = 0; - filter_type = EVENT_FILTER; + stream_type = EVENT_FILTER; } Manager::EventStream::~EventStream() @@ -322,16 +322,16 @@ bool Manager::CreateEventStream(RecordVal* fval) RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) ) { - reporter->Error("filter argument not of right type"); + reporter->Error("EventDescription argument not of right type"); return false; } - EventStream* filter = new EventStream(); + EventStream* stream = new EventStream(); { - bool res = CreateStream(filter, fval); + bool res = CreateStream(stream, fval); if ( res == false ) { - delete filter; + delete stream; return false; } } @@ -428,19 +428,19 @@ bool Manager::CreateEventStream(RecordVal* fval) logf[i] = fieldsV[i]; Unref(fields); // ref'd by lookupwithdefault - filter->num_fields = fieldsV.size(); - filter->fields = fields->Ref()->AsRecordType(); - filter->event = event_registry->Lookup(event->GetID()->Name()); - filter->want_record = ( want_record->InternalInt() == 1 ); + stream->num_fields = fieldsV.size(); + stream->fields = fields->Ref()->AsRecordType(); + stream->event = event_registry->Lookup(event->GetID()->Name()); + stream->want_record = ( want_record->InternalInt() == 1 ); Unref(want_record); // ref'd by lookupwithdefault - assert(filter->reader); - filter->reader->Init(filter->source, filter->mode, filter->num_fields, logf ); + assert(stream->reader); + stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf ); - readers[filter->reader] = filter; + readers[stream->reader] = stream; DBG_LOG(DBG_INPUT, "Successfully created event stream %s", - filter->name.c_str()); + stream->name.c_str()); return true; } @@ -450,16 +450,16 @@ bool Manager::CreateTableStream(RecordVal* fval) RecordType* rtype = fval->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) ) { - reporter->Error("filter argument not of right type"); + reporter->Error("TableDescription argument not of right type"); return false; } - TableStream* filter = new TableStream(); + TableStream* stream = new TableStream(); { - bool res = CreateStream(filter, fval); + bool res = CreateStream(stream, fval); if ( res == false ) { - delete filter; + delete stream; return false; } } @@ -587,40 +587,40 @@ bool Manager::CreateTableStream(RecordVal* fval) for ( unsigned int i = 0; i < fieldsV.size(); i++ ) fields[i] = fieldsV[i]; - filter->pred = pred ? pred->AsFunc() : 0; - filter->num_idx_fields = idxfields; - filter->num_val_fields = valfields; - filter->tab = dst->AsTableVal(); - filter->rtype = val ? val->AsRecordType() : 0; - filter->itype = idx->AsRecordType(); - filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; - filter->currDict = new PDict(InputHash); - filter->currDict->SetDeleteFunc(input_hash_delete_func); - filter->lastDict = new PDict(InputHash); - filter->lastDict->SetDeleteFunc(input_hash_delete_func); - filter->want_record = ( want_record->InternalInt() == 1 ); + stream->pred = pred ? pred->AsFunc() : 0; + stream->num_idx_fields = idxfields; + stream->num_val_fields = valfields; + stream->tab = dst->AsTableVal(); + stream->rtype = val ? val->AsRecordType() : 0; + stream->itype = idx->AsRecordType(); + stream->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; + stream->currDict = new PDict(InputHash); + stream->currDict->SetDeleteFunc(input_hash_delete_func); + stream->lastDict = new PDict(InputHash); + stream->lastDict->SetDeleteFunc(input_hash_delete_func); + stream->want_record = ( want_record->InternalInt() == 1 ); Unref(want_record); // ref'd by lookupwithdefault Unref(pred); if ( valfields > 1 ) { - if ( ! filter->want_record ) + if ( ! stream->want_record ) { - reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", filter->name.c_str()); - delete filter; + reporter->Error("Stream %s does not want a record (want_record=F), but has more then one value field. Aborting", stream->name.c_str()); + delete stream; return false; } } - assert(filter->reader); - filter->reader->Init(filter->source, filter->mode, fieldsV.size(), fields ); + assert(stream->reader); + stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields ); - readers[filter->reader] = filter; + readers[stream->reader] = stream; DBG_LOG(DBG_INPUT, "Successfully created table stream %s", - filter->name.c_str()); + stream->name.c_str()); return true; } @@ -872,9 +872,9 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) } int readFields; - if ( i->filter_type == TABLE_FILTER ) + if ( i->stream_type == TABLE_FILTER ) readFields = SendEntryTable(i, vals); - else if ( i->filter_type == EVENT_FILTER ) + else if ( i->stream_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); readFields = SendEventStreamEvent(i, type, vals); @@ -894,21 +894,21 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) assert(i); - assert(i->filter_type == TABLE_FILTER); - TableStream* filter = (TableStream*) i; + assert(i->stream_type == TABLE_FILTER); + TableStream* stream = (TableStream*) i; - HashKey* idxhash = HashValues(filter->num_idx_fields, vals); + HashKey* idxhash = HashValues(stream->num_idx_fields, vals); if ( idxhash == 0 ) { reporter->Error("Could not hash line. Ignoring"); - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } hash_t valhash = 0; - if ( filter->num_val_fields > 0 ) + if ( stream->num_val_fields > 0 ) { - HashKey* valhashkey = HashValues(filter->num_val_fields, vals+filter->num_idx_fields); + HashKey* valhashkey = HashValues(stream->num_val_fields, vals+stream->num_idx_fields); if ( valhashkey == 0 ) { // empty line. index, but no values. // hence we also have no hash value... @@ -920,23 +920,23 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) } } - InputHash *h = filter->lastDict->Lookup(idxhash); + InputHash *h = stream->lastDict->Lookup(idxhash); if ( h != 0 ) { // seen before - if ( filter->num_val_fields == 0 || h->valhash == valhash ) + if ( stream->num_val_fields == 0 || h->valhash == valhash ) { // ok, exact duplicate, move entry to new dicrionary and do nothing else. - filter->lastDict->Remove(idxhash); - filter->currDict->Insert(idxhash, h); + stream->lastDict->Remove(idxhash); + stream->currDict->Insert(idxhash, h); delete idxhash; - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } else { - assert( filter->num_val_fields > 0 ); + assert( stream->num_val_fields > 0 ); // entry was updated in some way - filter->lastDict->Remove(idxhash); + stream->lastDict->Remove(idxhash); // keep h for predicates updated = true; @@ -947,24 +947,24 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) Val* valval; RecordVal* predidx = 0; - int position = filter->num_idx_fields; - if ( filter->num_val_fields == 0 ) + int position = stream->num_idx_fields; + if ( stream->num_val_fields == 0 ) valval = 0; - else if ( filter->num_val_fields == 1 && !filter->want_record ) - valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); + else if ( stream->num_val_fields == 1 && !stream->want_record ) + valval = ValueToVal(vals[position], stream->rtype->FieldType(0)); else - valval = ValueToRecordVal(vals, filter->rtype, &position); + valval = ValueToRecordVal(vals, stream->rtype, &position); - // call filter first to determine if we really add / change the entry - if ( filter->pred ) + // call stream first to determine if we really add / change the entry + if ( stream->pred ) { EnumVal* ev; //Ref(idxval); int startpos = 0; - //Val* predidx = ListValToRecordVal(idxval->AsListVal(), filter->itype, &startpos); - predidx = ValueToRecordVal(vals, filter->itype, &startpos); - //ValueToRecordVal(vals, filter->itype, &startpos); + //Val* predidx = ListValToRecordVal(idxval->AsListVal(), stream->itype, &startpos); + predidx = ValueToRecordVal(vals, stream->itype, &startpos); + //ValueToRecordVal(vals, stream->itype, &startpos); if ( updated ) ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); @@ -972,10 +972,10 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); bool result; - if ( filter->num_val_fields > 0 ) // we have values - result = CallPred(filter->pred, 3, ev, predidx->Ref(), valval->Ref()); + if ( stream->num_val_fields > 0 ) // we have values + result = CallPred(stream->pred, 3, ev, predidx->Ref(), valval->Ref()); else // no values - result = CallPred(filter->pred, 2, ev, predidx->Ref()); + result = CallPred(stream->pred, 2, ev, predidx->Ref()); if ( result == false ) { @@ -985,17 +985,17 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) { // throw away. Hence - we quit. And remove the entry from the current dictionary... // (but why should it be in there? assert this). - assert ( filter->currDict->RemoveEntry(idxhash) == 0 ); + assert ( stream->currDict->RemoveEntry(idxhash) == 0 ); delete idxhash; delete h; - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } else { // keep old one - filter->currDict->Insert(idxhash, h); + stream->currDict->Insert(idxhash, h); delete idxhash; - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } } @@ -1016,19 +1016,19 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) // I think there is an unref missing here. But if I insert is, it crashes :) } else - idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); + idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); Val* oldval = 0; if ( updated == true ) { - assert(filter->num_val_fields > 0); + assert(stream->num_val_fields > 0); // in that case, we need the old value to send the event (if we send an event). - oldval = filter->tab->Lookup(idxval, false); + oldval = stream->tab->Lookup(idxval, false); } //i->tab->Assign(idxval, valval); assert(idxval); - HashKey* k = filter->tab->ComputeHash(idxval); + HashKey* k = stream->tab->ComputeHash(idxval); if ( !k ) { reporter->InternalError("could not hash"); @@ -1039,46 +1039,46 @@ int Manager::SendEntryTable(Stream* i, const Value* const *vals) ih->idxkey = new HashKey(k->Key(), k->Size(), k->Hash()); ih->valhash = valhash; - if ( filter->event && updated ) + if ( stream->event && updated ) Ref(oldval); // otherwise it is no longer accessible after the assignment - filter->tab->Assign(idxval, k, valval); + stream->tab->Assign(idxval, k, valval); Unref(idxval); // asssign does not consume idxval. if ( predidx != 0 ) Unref(predidx); - filter->currDict->Insert(idxhash, ih); + stream->currDict->Insert(idxhash, ih); delete idxhash; - if ( filter->event ) + if ( stream->event ) { EnumVal* ev; int startpos = 0; - Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); if ( updated ) { // in case of update send back the old value. - assert ( filter->num_val_fields > 0 ); + assert ( stream->num_val_fields > 0 ); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); - SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, oldval); + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, oldval); } else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - if ( filter->num_val_fields == 0 ) + if ( stream->num_val_fields == 0 ) { - Ref(filter->description); - SendEvent(filter->event, 3, filter->description->Ref(), ev, predidx); + Ref(stream->description); + SendEvent(stream->event, 3, stream->description->Ref(), ev, predidx); } else - SendEvent(filter->event, 4, filter->description->Ref(), ev, predidx, valval->Ref()); + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, valval->Ref()); } } - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } @@ -1097,19 +1097,19 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) i->name.c_str()); #endif - if ( i->filter_type == EVENT_FILTER ) // nothing to do.. + if ( i->stream_type == EVENT_FILTER ) // nothing to do.. return; - assert(i->filter_type == TABLE_FILTER); - TableStream* filter = (TableStream*) i; + assert(i->stream_type == TABLE_FILTER); + TableStream* stream = (TableStream*) i; // lastdict contains all deleted entries and should be empty apart from that - IterCookie *c = filter->lastDict->InitForIteration(); - filter->lastDict->MakeRobustCookie(c); + IterCookie *c = stream->lastDict->InitForIteration(); + stream->lastDict->MakeRobustCookie(c); InputHash* ih; HashKey *lastDictIdxKey; //while ( ( ih = i->lastDict->NextEntry(c) ) ) { - while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) + while ( ( ih = stream->lastDict->NextEntry(lastDictIdxKey, c) ) ) { ListVal * idx = 0; Val *val = 0; @@ -1118,17 +1118,17 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) EnumVal* ev = 0; int startpos = 0; - if ( filter->pred || filter->event ) + if ( stream->pred || stream->event ) { - idx = filter->tab->RecoverIndex(ih->idxkey); + idx = stream->tab->RecoverIndex(ih->idxkey); assert(idx != 0); - val = filter->tab->Lookup(idx); + val = stream->tab->Lookup(idx); assert(val != 0); - predidx = ListValToRecordVal(idx, filter->itype, &startpos); + predidx = ListValToRecordVal(idx, stream->itype, &startpos); ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); } - if ( filter->pred ) + if ( stream->pred ) { // ask predicate, if we want to expire this element... @@ -1136,7 +1136,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) Ref(predidx); Ref(val); - bool result = CallPred(filter->pred, 3, ev, predidx, val); + bool result = CallPred(stream->pred, 3, ev, predidx, val); if ( result == false ) { @@ -1144,37 +1144,37 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) // ah well - and we have to add the entry to currDict... Unref(predidx); Unref(ev); - filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey)); + stream->currDict->Insert(lastDictIdxKey, stream->lastDict->RemoveEntry(lastDictIdxKey)); delete lastDictIdxKey; continue; } } - if ( filter->event ) + if ( stream->event ) { Ref(predidx); Ref(val); Ref(ev); - SendEvent(filter->event, 3, ev, predidx, val); + SendEvent(stream->event, 3, ev, predidx, val); } - if ( predidx ) // if we have a filter or an event... + if ( predidx ) // if we have a stream or an event... Unref(predidx); if ( ev ) Unref(ev); - Unref(filter->tab->Delete(ih->idxkey)); - filter->lastDict->Remove(lastDictIdxKey); // delete in next line + Unref(stream->tab->Delete(ih->idxkey)); + stream->lastDict->Remove(lastDictIdxKey); // delete in next line delete lastDictIdxKey; delete(ih); } - filter->lastDict->Clear(); // should be empt. buti- well... who knows... - delete(filter->lastDict); + stream->lastDict->Clear(); // should be empt. buti- well... who knows... + delete(stream->lastDict); - filter->lastDict = filter->currDict; - filter->currDict = new PDict(InputHash); - filter->currDict->SetDeleteFunc(input_hash_delete_func); + stream->lastDict = stream->currDict; + stream->currDict = new PDict(InputHash); + stream->currDict->SetDeleteFunc(input_hash_delete_func); #ifdef DEBUG DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", @@ -1199,9 +1199,9 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) } int readFields; - if ( i->filter_type == TABLE_FILTER ) + if ( i->stream_type == TABLE_FILTER ) readFields = PutTable(i, vals); - else if ( i->filter_type == EVENT_FILTER ) + else if ( i->stream_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); readFields = SendEventStreamEvent(i, type, vals); @@ -1219,44 +1219,44 @@ int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const * { assert(i); - assert(i->filter_type == EVENT_FILTER); - EventStream* filter = (EventStream*) i; + assert(i->stream_type == EVENT_FILTER); + EventStream* stream = (EventStream*) i; Val *val; list out_vals; - Ref(filter->description); - out_vals.push_back(filter->description); + Ref(stream->description); + out_vals.push_back(stream->description); // no tracking, send everything with a new event... //out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event)); out_vals.push_back(type); int position = 0; - if ( filter->want_record ) + if ( stream->want_record ) { - RecordVal * r = ValueToRecordVal(vals, filter->fields, &position); + RecordVal * r = ValueToRecordVal(vals, stream->fields, &position); out_vals.push_back(r); } else { - for ( int j = 0; j < filter->fields->NumFields(); j++) + for ( int j = 0; j < stream->fields->NumFields(); j++) { Val* val = 0; - if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) + if ( stream->fields->FieldType(j)->Tag() == TYPE_RECORD ) val = ValueToRecordVal(vals, - filter->fields->FieldType(j)->AsRecordType(), + stream->fields->FieldType(j)->AsRecordType(), &position); else { - val = ValueToVal(vals[position], filter->fields->FieldType(j)); + val = ValueToVal(vals[position], stream->fields->FieldType(j)); position++; } out_vals.push_back(val); } } - SendEvent(filter->event, out_vals); + SendEvent(stream->event, out_vals); - return filter->fields->NumFields(); + return stream->fields->NumFields(); } @@ -1264,31 +1264,31 @@ int Manager::PutTable(Stream* i, const Value* const *vals) { assert(i); - assert(i->filter_type == TABLE_FILTER); - TableStream* filter = (TableStream*) i; + assert(i->stream_type == TABLE_FILTER); + TableStream* stream = (TableStream*) i; - Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); Val* valval; - int position = filter->num_idx_fields; - if ( filter->num_val_fields == 0 ) + int position = stream->num_idx_fields; + if ( stream->num_val_fields == 0 ) valval = 0; - else if ( filter->num_val_fields == 1 && filter->want_record == 0 ) - valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); + else if ( stream->num_val_fields == 1 && stream->want_record == 0 ) + valval = ValueToVal(vals[position], stream->rtype->FieldType(0)); else - valval = ValueToRecordVal(vals, filter->rtype, &position); + valval = ValueToRecordVal(vals, stream->rtype, &position); // if we have a subscribed event, we need to figure out, if this is an update or not // same for predicates - if ( filter->pred || filter->event ) + if ( stream->pred || stream->event ) { bool updated = false; Val* oldval = 0; - if ( filter->num_val_fields > 0 ) + if ( stream->num_val_fields > 0 ) { // in that case, we need the old value to send the event (if we send an event). - oldval = filter->tab->Lookup(idxval, false); + oldval = stream->tab->Lookup(idxval, false); } if ( oldval != 0 ) @@ -1300,11 +1300,11 @@ int Manager::PutTable(Stream* i, const Value* const *vals) // predicate if we want the update or not - if ( filter->pred ) + if ( stream->pred ) { EnumVal* ev; int startpos = 0; - Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); Ref(valval); if ( updated ) @@ -1315,10 +1315,10 @@ int Manager::PutTable(Stream* i, const Value* const *vals) BifType::Enum::Input::Event); bool result; - if ( filter->num_val_fields > 0 ) // we have values - result = CallPred(filter->pred, 3, ev, predidx, valval); + if ( stream->num_val_fields > 0 ) // we have values + result = CallPred(stream->pred, 3, ev, predidx, valval); else // no values - result = CallPred(filter->pred, 2, ev, predidx); + result = CallPred(stream->pred, 2, ev, predidx); if ( result == false ) { @@ -1326,38 +1326,38 @@ int Manager::PutTable(Stream* i, const Value* const *vals) Unref(idxval); Unref(valval); Unref(oldval); - return filter->num_val_fields + filter->num_idx_fields; + return stream->num_val_fields + stream->num_idx_fields; } } - filter->tab->Assign(idxval, valval); + stream->tab->Assign(idxval, valval); - if ( filter->event ) + if ( stream->event ) { EnumVal* ev; int startpos = 0; - Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); if ( updated ) { // in case of update send back the old value. - assert ( filter->num_val_fields > 0 ); + assert ( stream->num_val_fields > 0 ); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); - SendEvent(filter->event, 4, filter->description->Ref(), + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, oldval); } else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - if ( filter->num_val_fields == 0 ) - SendEvent(filter->event, 4, filter->description->Ref(), + if ( stream->num_val_fields == 0 ) + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx); else - SendEvent(filter->event, 4, filter->description->Ref(), + SendEvent(stream->event, 4, stream->description->Ref(), ev, predidx, valval->Ref()); } @@ -1365,10 +1365,10 @@ int Manager::PutTable(Stream* i, const Value* const *vals) } else // no predicates or other stuff - filter->tab->Assign(idxval, valval); + stream->tab->Assign(idxval, valval); - return filter->num_idx_fields + filter->num_val_fields; + return stream->num_idx_fields + stream->num_val_fields; } // Todo:: perhaps throw some kind of clear-event? @@ -1386,10 +1386,10 @@ void Manager::Clear(ReaderFrontend* reader) i->name.c_str()); #endif - assert(i->filter_type == TABLE_FILTER); - TableStream* filter = (TableStream*) i; + assert(i->stream_type == TABLE_FILTER); + TableStream* stream = (TableStream*) i; - filter->tab->RemoveAll(); + stream->tab->RemoveAll(); } // put interface: delete old entry from table. @@ -1405,28 +1405,28 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) bool success = false; int readVals = 0; - if ( i->filter_type == TABLE_FILTER ) + if ( i->stream_type == TABLE_FILTER ) { - TableStream* filter = (TableStream*) i; - Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); + TableStream* stream = (TableStream*) i; + Val* idxval = ValueToIndexVal(stream->num_idx_fields, stream->itype, vals); assert(idxval != 0); - readVals = filter->num_idx_fields + filter->num_val_fields; - bool filterresult = true; + readVals = stream->num_idx_fields + stream->num_val_fields; + bool streamresult = true; - if ( filter->pred || filter->event ) + if ( stream->pred || stream->event ) { - Val *val = filter->tab->Lookup(idxval); + Val *val = stream->tab->Lookup(idxval); - if ( filter->pred ) + if ( stream->pred ) { Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); int startpos = 0; - Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + Val* predidx = ValueToRecordVal(vals, stream->itype, &startpos); - filterresult = CallPred(filter->pred, 3, ev, predidx, val); + streamresult = CallPred(stream->pred, 3, ev, predidx, val); - if ( filterresult == false ) + if ( streamresult == false ) { // keep it. Unref(idxval); @@ -1435,21 +1435,21 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) } - // only if filter = true -> no filtering - if ( filterresult && filter->event ) + // only if stream = true -> no streaming + if ( streamresult && stream->event ) { Ref(idxval); assert(val != 0); Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - SendEvent(filter->event, 4, filter->description->Ref(), ev, idxval, val); + SendEvent(stream->event, 4, stream->description->Ref(), ev, idxval, val); } } - // only if filter = true -> no filtering - if ( filterresult ) + // only if stream = true -> no streaming + if ( streamresult ) { - Val* retptr = filter->tab->Delete(idxval); + Val* retptr = stream->tab->Delete(idxval); success = ( retptr != 0 ); if ( !success ) reporter->Error("Internal error while deleting values from input table"); @@ -1458,7 +1458,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) } } - else if ( i->filter_type == EVENT_FILTER ) + else if ( i->stream_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); readVals = SendEventStreamEvent(i, type, vals); diff --git a/src/input/Manager.h b/src/input/Manager.h index 0bdb2eb58d..d15febe0d6 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -1,6 +1,6 @@ // See the file "COPYING" in the main distribution directory for copyright. // -// Class for managing input streams and filters +// Class for managing input streams #ifndef INPUT_MANAGER_H #define INPUT_MANAGER_H @@ -34,12 +34,7 @@ public: ~Manager(); /** - * Creates a new input stream. - * Add a filter to an input source, which will write the data from the data source into - * a Bro table. - * Add a filter to an input source, which sends events for read input data. - * - * @param id The enum value corresponding the input stream. + * Creates a new input stream which will write the data from the data source into * * @param description A record of script type \c Input:StreamDescription. * @@ -47,6 +42,15 @@ public: * input.bif, which just forwards here. */ bool CreateTableStream(RecordVal* description); + + /** + * Creates a new input stream which sends events for read input data. + * + * @param description A record of script type \c Input:StreamDescription. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool CreateEventStream(RecordVal* description); @@ -104,11 +108,11 @@ protected: // doing so creates a new thread!). ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); - // Functions are called from the ReaderBackend to notify the manager, that a filter has been removed + // Functions are called from the ReaderBackend to notify the manager, that a stream has been removed // or a stream has been closed. - // Used to prevent race conditions where data for a specific filter is still in the queue when the + // Used to prevent race conditions where data for a specific stream is still in the queue when the // RemoveStream directive is executed by the main thread. - // This makes sure all data that has ben queued for a filter is still received. + // This makes sure all data that has ben queued for a stream is still received. bool RemoveStreamContinuation(ReaderFrontend* reader); private: @@ -118,13 +122,13 @@ private: bool CreateStream(Stream*, RecordVal* description); - // SendEntry implementation for Tablefilter + // SendEntry implementation for Table stream int SendEntryTable(Stream* i, const threading::Value* const *vals); - // Put implementation for Tablefilter + // Put implementation for Table stream int PutTable(Stream* i, const threading::Value* const *vals); - // SendEntry and Put implementation for Eventfilter + // SendEntry and Put implementation for Event stream int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals); // Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index ca54d8a204..b4d9101bc8 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -55,7 +55,8 @@ public: * * @param arg_num_fields number of fields contained in \a fields * - * @param fields the types and names of the fields to be retrieved from the input source + * @param fields the types and names of the fields to be retrieved + * from the input source * * @return False if an error occured. */ @@ -72,7 +73,8 @@ public: /** * Force trigger an update of the input stream. - * The action that will be taken depends on the current read mode and the individual input backend + * The action that will be taken depends on the current read mode and the + * individual input backend * * An backend can choose to ignore this. * @@ -90,8 +92,8 @@ protected: // Methods that have to be overwritten by the individual readers /** - * Reader-specific intialization method. Note that data may only be read from the input source - * after the Start function has been called. + * Reader-specific intialization method. Note that data may only be + * read from the input source after the Start function has been called. * * A reader implementation must override this method. If it returns * false, it will be assumed that a fatal error has occured that @@ -145,29 +147,32 @@ protected: */ void SendEvent(const string& name, const int num_vals, threading::Value* *vals); - // Content-sending-functions (simple mode). Including table-specific stuff that simply is not used if we have no table + // Content-sending-functions (simple mode). Including table-specific stuff that + // simply is not used if we have no table /** - * Method allowing a reader to send a list of values read for a specific filter back to the manager. + * Method allowing a reader to send a list of values read for a specific stream + * back to the manager. * - * If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised + * If the stream is a table stream, the values are inserted into the table; + * if it is an event stream, the event is raised. * - * @param val list of threading::Values expected by the filter + * @param val list of threading::Values expected by the stream */ void Put(threading::Value* *val); /** * Method allowing a reader to delete a specific value from a bro table. * - * If the receiving filter is an event, only a removed event is raised + * If the receiving stream is an event stream, only a removed event is raised * - * @param val list of threading::Values expected by the filter + * @param val list of threading::Values expected by the stream */ void Delete(threading::Value* *val); /** * Method allowing a reader to clear a value from a bro table. * - * If the receiving filter is an event, this is ignored. + * If the receiving stream is an event stream, this is ignored. * */ void Clear(); @@ -176,19 +181,22 @@ protected: /** - * Method allowing a reader to send a list of values read for a specific filter back to the manager. + * Method allowing a reader to send a list of values read for a specific stream + * back to the manager. * - * If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised. + * If the stream is a table stream, the values are inserted into the table; + * if it is an event stream, the event is raised. * - * @param val list of threading::Values expected by the filter + * @param val list of threading::Values expected by the stream */ void SendEntry(threading::Value* *vals); /** - * Method telling the manager, that the current list of entries sent by SendEntry is finished. + * Method telling the manager, that the current list of entries sent by SendEntry + * is finished. * - * For table filters, all entries that were not updated since the last EndCurrentSend will be deleted, because they are no longer - * present in the input source + * For table streams, all entries that were not updated since the last EndCurrentSend + * will be deleted, because they are no longer present in the input source * */ void EndCurrentSend(); diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index c798c21a5e..8223d6e201 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -398,7 +398,7 @@ bool Ascii::DoUpdate() if ( mode == STREAM ) { file->clear(); // remove end of file evil bits - if ( !ReadHeader(true) ) // in case filters changed + if ( !ReadHeader(true) ) return false; // header reading failed break; diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index 0953075bff..e5f3070724 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -74,7 +74,7 @@ private: string unset_field; - // keep a copy of the headerline to determine field locations when filters change + // keep a copy of the headerline to determine field locations when stream descriptions change string headerline; int mode; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 59f9202960..9f575bb89c 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -42,9 +42,6 @@ private: // Options set from the script-level. string separator; - // keep a copy of the headerline to determine field locations when filters change - string headerline; - int mode; bool execute; bool firstrun;