diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index c6995121bd..1df8563d94 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -5,7 +5,7 @@ module Input; export { - redef enum Input::ID += { TABLE_READ }; + redef enum Input::ID += { TABLE_READ, EVENT_READ }; ## The default input reader used. Defaults to `READER_ASCII`. const default_reader = READER_ASCII &redef; @@ -123,6 +123,8 @@ export { ## filter: the `TableFilter` record describing the filter. global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool; + global read_event: function(description: Input::StreamDescription, filter: Input::EventFilter) : bool; + global update_finished: event(id: Input::ID); } @@ -182,6 +184,27 @@ function read_table(description: Input::StreamDescription, filter: Input::TableF if ( ok ) { ok = add_tablefilter(id, filter); } + if ( ok ) { + ok = remove_tablefilter(id, filter$name); + } + if ( ok ) { + ok = remove_stream(id); + } else { + remove_stream(id); + } + + return ok; +} + +function read_event(description: Input::StreamDescription, filter: Input::EventFilter) : bool { + local ok: bool = T; + # since we create and delete it ourselves this should be ok... at least for singlethreaded operation + local id: Input::ID = Input::EVENT_READ; + + ok = create_stream(id, description); + if ( ok ) { + ok = add_eventfilter(id, filter); + } if ( ok ) { ok = remove_stream(id); } else { diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index c41a0552c6..3394486ff2 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -15,7 +15,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = { { "compressor", 0, false }, {"string", 0, false }, { "notifiers", 0, false }, { "main-loop", 0, false }, { "dpd", 0, false }, { "tm", 0, false }, - { "logging", 0, false }, { "threading", 0, false } + { "logging", 0, false }, {"input", 0, false }, + { "threading", 0, false } }; DebugLogger::DebugLogger(const char* filename) diff --git a/src/DebugLogger.h b/src/DebugLogger.h index 71e21bfa26..ca422072c5 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -24,6 +24,7 @@ enum DebugStream { DBG_DPD, // Dynamic application detection framework DBG_TM, // Time-machine packet input via Brocolli DBG_LOGGING, // Logging streams + DBG_INPUT, // Input streams DBG_THREADING, // Threading system NUM_DBGS // Has to be last diff --git a/src/input/Manager.cc b/src/input/Manager.cc index db98cb7a33..aa50453bdf 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -205,12 +205,24 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) // create a new input reader object to be used at whomevers leisure lateron. ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) { + { + ReaderInfo *i = FindReader(id); + if ( i != 0 ) { + ODesc desc; + id->Describe(&desc); + reporter->Error("Trying create already existing input stream %s", desc.Description()); + return 0; + } + } + ReaderDefinition* ir = input_readers; RecordType* rtype = description->Type()->AsRecordType(); if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) ) { - reporter->Error("Streamdescription argument not of right type"); + ODesc desc; + id->Describe(&desc); + reporter->Error("Streamdescription argument not of right type for new input stream %s", desc.Description()); return 0; } @@ -239,6 +251,13 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) reader_obj->Init(source, mode->InternalInt(), do_autostart); +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", + desc.Description()); +#endif + return reader_obj; } @@ -503,6 +522,13 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) { i->filters[filterid] = filter; i->reader->AddFilter( filterid, fieldsV.size(), fields ); +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Successfully created new table filter %s for stream", + filter->name.c_str(), desc.Description()); +#endif + return true; } @@ -574,6 +600,13 @@ bool Manager::RemoveStream(const EnumVal* id) { i->reader->Finish(); +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", + desc.Description()); +#endif + return true; } @@ -586,6 +619,12 @@ bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) { if ( (*s)->reader && (*s)->reader == reader ) { i = *s; +#ifdef DEBUG + ODesc desc; + i->id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Successfully executed removal of stream %s", + desc.Description()); +#endif delete(i); readers.erase(s); return true; @@ -651,6 +690,13 @@ bool Manager::ForceUpdate(const EnumVal* id) i->reader->Update(); +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Forcing update of stream %s", + desc.Description()); +#endif + return true; // update is async :( } @@ -685,6 +731,13 @@ bool Manager::RemoveTableFilter(EnumVal* id, const string &name) { i->reader->RemoveFilter(filterId); +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Queued removal of tablefilter %s for stream %s", + name.c_str(), desc.Description()); +#endif + return true; } @@ -701,6 +754,13 @@ bool Manager::RemoveFilterContinuation(const ReaderFrontend* reader, const int f return false; } +#ifdef DEBUG + ODesc desc; + i->id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Executed removal of (table|event)-filter %s for stream %s", + (*it).second->name.c_str(), desc.Description()); +#endif + delete (*it).second; i->filters.erase(it); @@ -736,6 +796,13 @@ bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { } i->reader->RemoveFilter(filterId); + +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Queued removal of eventfilter %s for stream %s", + name.c_str(), desc.Description()); +#endif return true; } @@ -948,6 +1015,13 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { assert(i->HasFilter(id)); +#ifdef DEBUG + ODesc desc; + i->id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Got EndCurrentSend for filter %d and stream %s", + id, desc.Description()); +#endif + if ( i->filters[id]->filter_type == EVENT_FILTER ) { // nothing to do.. return; @@ -1018,6 +1092,11 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { filter->lastDict = filter->currDict; filter->currDict = new PDict(InputHash); +#ifdef DEBUG + DBG_LOG(DBG_INPUT, "EndCurrentSend complete for filter %d and stream %s, queueing update_finished event", + id, desc.Description()); +#endif + // Send event that the current update is indeed finished. EventHandler* handler = event_registry->Lookup("Input::update_finished"); if ( handler == 0 ) { @@ -1202,6 +1281,13 @@ void Manager::Clear(const ReaderFrontend* reader, int id) { return; } +#ifdef DEBUG + ODesc desc; + i->id->Describe(&desc); + DBG_LOG(DBG_INPUT, "Got Clear for filter %d and stream %s", + id, desc.Description()); +#endif + assert(i->HasFilter(id)); assert(i->filters[id]->filter_type == TABLE_FILTER);