diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 102fd78d6f..8c8d6d8ba3 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -23,6 +23,11 @@ using namespace input; using threading::Value; using threading::Field; +/** + * InputHashes are used as Dictionaries to store the value and index hashes for all lines currently stored in a table. + * Index hash is stored as HashKey*, because it is thrown into other Bro functions that need the complex structure of it. + * For everything we do (with values), we just take the hash_t value and compare it directly with == + */ struct InputHash { hash_t valhash; HashKey* idxkey; @@ -41,7 +46,10 @@ static void input_hash_delete_func(void* val) { declare(PDict, InputHash); -class Manager::Filter { +/** + * Base stuff that every stream can do + */ +class Manager::Stream { public: string name; string source; @@ -49,25 +57,25 @@ public: int mode; - FilterType filter_type; // to distinguish between event and table filters + StreamType filter_type; // to distinguish between event and table filters EnumVal* type; ReaderFrontend* reader; RecordVal* description; - Filter(); - virtual ~Filter(); + Stream(); + virtual ~Stream(); }; -Manager::Filter::Filter() { +Manager::Stream::Stream() { type = 0; reader = 0; description = 0; removed = false; } -Manager::Filter::~Filter() { +Manager::Stream::~Stream() { if ( type ) Unref(type); if ( description ) @@ -77,7 +85,7 @@ Manager::Filter::~Filter() { delete(reader); } -class Manager::TableFilter: public Manager::Filter { +class Manager::TableStream: public Manager::Stream { public: unsigned int num_idx_fields; @@ -96,11 +104,11 @@ public: EventHandlerPtr event; - TableFilter(); - ~TableFilter(); + TableStream(); + ~TableStream(); }; -class Manager::EventFilter: public Manager::Filter { +class Manager::EventStream: public Manager::Stream { public: EventHandlerPtr event; @@ -108,11 +116,11 @@ public: unsigned int num_fields; bool want_record; - EventFilter(); - ~EventFilter(); + EventStream(); + ~EventStream(); }; -Manager::TableFilter::TableFilter() : Manager::Filter::Filter() { +Manager::TableStream::TableStream() : Manager::Stream::Stream() { filter_type = TABLE_FILTER; tab = 0; @@ -125,18 +133,18 @@ Manager::TableFilter::TableFilter() : Manager::Filter::Filter() { pred = 0; } -Manager::EventFilter::EventFilter() : Manager::Filter::Filter() { +Manager::EventStream::EventStream() : Manager::Stream::Stream() { fields = 0; filter_type = EVENT_FILTER; } -Manager::EventFilter::~EventFilter() { +Manager::EventStream::~EventStream() { if ( fields ) { Unref(fields); } } -Manager::TableFilter::~TableFilter() { +Manager::TableStream::~TableStream() { if ( tab ) Unref(tab); if ( itype ) @@ -176,7 +184,7 @@ Manager::Manager() } Manager::~Manager() { - for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { + for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { delete s->second; delete s->first; } @@ -233,7 +241,7 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) } // create a new input reader object to be used at whomevers leisure lateron. -bool Manager::CreateStream(Filter* info, RecordVal* description) +bool Manager::CreateStream(Stream* info, RecordVal* description) { ReaderDefinition* ir = input_readers; @@ -249,7 +257,7 @@ bool Manager::CreateStream(Filter* info, RecordVal* description) Unref(name_val); { - Filter *i = FindFilter(name); + Stream *i = FindStream(name); if ( i != 0 ) { reporter->Error("Trying create already existing input stream %s", name.c_str()); return false; @@ -296,7 +304,7 @@ bool Manager::CreateEventStream(RecordVal* fval) { return false; } - EventFilter* filter = new EventFilter(); + EventStream* filter = new EventStream(); { bool res = CreateStream(filter, fval); if ( res == false ) { @@ -412,7 +420,7 @@ bool Manager::CreateTableStream(RecordVal* fval) { return false; } - TableFilter* filter = new TableFilter(); + TableStream* filter = new TableStream(); { bool res = CreateStream(filter, fval); if ( res == false ) { @@ -620,7 +628,7 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only) bool Manager::RemoveStream(const string &name) { - Filter *i = FindFilter(name); + Stream *i = FindStream(name); if ( i == 0 ) { return false; // not found @@ -644,7 +652,7 @@ bool Manager::RemoveStream(const string &name) { } bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->Error("Stream not found in RemoveStreamContinuation"); @@ -712,7 +720,7 @@ bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, co bool Manager::ForceUpdate(const string &name) { - Filter *i = FindFilter(name); + Stream *i = FindStream(name); if ( i == 0 ) { reporter->Error("Stream %s not found", name.c_str()); return false; @@ -786,7 +794,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader in SendEntry"); return; @@ -797,7 +805,7 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) { readFields = SendEntryTable(i, vals); } else if ( i->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - readFields = SendEventFilterEvent(i, type, vals); + readFields = SendEventStreamEvent(i, type, vals); } else { assert(false); } @@ -808,13 +816,13 @@ void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) { delete [] vals; } -int Manager::SendEntryTable(Filter* i, const Value* const *vals) { +int Manager::SendEntryTable(Stream* i, const Value* const *vals) { bool updated = false; assert(i); assert(i->filter_type == TABLE_FILTER); - TableFilter* filter = (TableFilter*) i; + TableStream* filter = (TableStream*) i; HashKey* idxhash = HashValues(filter->num_idx_fields, vals); @@ -979,7 +987,7 @@ int Manager::SendEntryTable(Filter* i, const Value* const *vals) { void Manager::EndCurrentSend(ReaderFrontend* reader) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader in EndCurrentSend"); return; @@ -996,7 +1004,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { } assert(i->filter_type == TABLE_FILTER); - TableFilter* filter = (TableFilter*) i; + TableStream* filter = (TableStream*) i; // lastdict contains all deleted entries and should be empty apart from that IterCookie *c = filter->lastDict->InitForIteration(); @@ -1083,7 +1091,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) { } void Manager::Put(ReaderFrontend* reader, Value* *vals) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader in Put"); return; @@ -1094,7 +1102,7 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) { readFields = PutTable(i, vals); } else if ( i->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - readFields = SendEventFilterEvent(i, type, vals); + readFields = SendEventStreamEvent(i, type, vals); } else { assert(false); } @@ -1106,11 +1114,11 @@ void Manager::Put(ReaderFrontend* reader, Value* *vals) { } -int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) { +int Manager::SendEventStreamEvent(Stream* i, EnumVal* type, const Value* const *vals) { assert(i); assert(i->filter_type == EVENT_FILTER); - EventFilter* filter = (EventFilter*) i; + EventStream* filter = (EventStream*) i; Val *val; list out_vals; @@ -1143,11 +1151,11 @@ int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const * } -int Manager::PutTable(Filter* i, const Value* const *vals) { +int Manager::PutTable(Stream* i, const Value* const *vals) { assert(i); assert(i->filter_type == TABLE_FILTER); - TableFilter* filter = (TableFilter*) i; + TableStream* filter = (TableStream*) i; Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* valval; @@ -1244,7 +1252,7 @@ int Manager::PutTable(Filter* i, const Value* const *vals) { // Todo:: perhaps throw some kind of clear-event? void Manager::Clear(ReaderFrontend* reader) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader in Clear"); return; @@ -1256,14 +1264,14 @@ void Manager::Clear(ReaderFrontend* reader) { #endif assert(i->filter_type == TABLE_FILTER); - TableFilter* filter = (TableFilter*) i; + TableStream* filter = (TableStream*) i; filter->tab->RemoveAll(); } // put interface: delete old entry from table. bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { - Filter *i = FindFilter(reader); + Stream *i = FindStream(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader in Delete"); return false; @@ -1273,7 +1281,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { int readVals = 0; if ( i->filter_type == TABLE_FILTER ) { - TableFilter* filter = (TableFilter*) i; + TableStream* filter = (TableStream*) i; Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); assert(idxval != 0); readVals = filter->num_idx_fields + filter->num_val_fields; @@ -1320,7 +1328,7 @@ bool Manager::Delete(ReaderFrontend* reader, Value* *vals) { } } else if ( i->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - readVals = SendEventFilterEvent(i, type, vals); + readVals = SendEventStreamEvent(i, type, vals); success = true; } else { assert(false); @@ -1840,9 +1848,9 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) { return NULL; } -Manager::Filter* Manager::FindFilter(const string &name) +Manager::Stream* Manager::FindStream(const string &name) { - for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) + for ( map::iterator s = readers.begin(); s != readers.end(); ++s ) { if ( (*s).second->name == name ) { @@ -1853,9 +1861,9 @@ Manager::Filter* Manager::FindFilter(const string &name) return 0; } -Manager::Filter* Manager::FindFilter(ReaderFrontend* reader) +Manager::Stream* Manager::FindStream(ReaderFrontend* reader) { - map::iterator s = readers.find(reader); + map::iterator s = readers.find(reader); if ( s != readers.end() ) { return s->second; } diff --git a/src/input/Manager.h b/src/input/Manager.h index c6dd40bd95..8f09828988 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -107,25 +107,25 @@ protected: // Functions are called from the ReaderBackend to notify the manager, that a filter has been removed // or a stream has been closed. // Used to prevent race conditions where data for a specific filter is still in the queue when the - // RemoveFilter directive is executed by the main thread. + // RemoveStream directive is executed by the main thread. // This makes sure all data that has ben queued for a filter is still received. bool RemoveStreamContinuation(ReaderFrontend* reader); private: - class Filter; - class TableFilter; - class EventFilter; + class Stream; + class TableStream; + class EventStream; - bool CreateStream(Filter*, RecordVal* description); + bool CreateStream(Stream*, RecordVal* description); // SendEntry implementation for Tablefilter - int SendEntryTable(Filter* i, const threading::Value* const *vals); + int SendEntryTable(Stream* i, const threading::Value* const *vals); // Put implementation for Tablefilter - int PutTable(Filter* i, const threading::Value* const *vals); + int PutTable(Stream* i, const threading::Value* const *vals); // SendEntry and Put implementation for Eventfilter - int SendEventFilterEvent(Filter* i, EnumVal* type, const threading::Value* const *vals); + int SendEventStreamEvent(Stream* i, EnumVal* type, const threading::Value* const *vals); // Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types // from the log framework @@ -163,12 +163,12 @@ private: // Converts a Bro ListVal to a RecordVal given the record type RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); - Filter* FindFilter(const string &name); - Filter* FindFilter(ReaderFrontend* reader); + Stream* FindStream(const string &name); + Stream* FindStream(ReaderFrontend* reader); - enum FilterType { TABLE_FILTER, EVENT_FILTER }; + enum StreamType { TABLE_FILTER, EVENT_FILTER }; - map readers; + map readers; };