diff --git a/doc/input.rst b/doc/input.rst index 6a089c0635..2945918733 100644 --- a/doc/input.rst +++ b/doc/input.rst @@ -98,12 +98,12 @@ been completed. Because of this, it is, for example, possible to call will remain queued until the first read has been completed. Once the input framework finishes reading from a data source, it fires -the ``update_finished`` event. Once this event has been received all data +the ``end_of_data`` event. Once this event has been received all data from the input file is available in the table. .. code:: bro - event Input::update_finished(name: string, source: string) { + event Input::end_of_data(name: string, source: string) { # now all data is in the table print blacklist; } @@ -129,7 +129,7 @@ deal with changing data files. The first, very basic method is an explicit refresh of an input stream. When an input stream is open, the function ``force_update`` can be called. This will trigger a complete refresh of the table; any changed elements from the -file will be updated. After the update is finished the ``update_finished`` +file will be updated. After the update is finished the ``end_of_data`` event will be raised. In our example the call would look like: @@ -142,7 +142,7 @@ The input framework also supports two automatic refresh modes. The first mode continually checks if a file has been changed. If the file has been changed, it is re-read and the data in the Bro table is updated to reflect the current state. Each time a change has been detected and all the new data has been -read into the table, the ``update_finished`` event is raised. +read into the table, the ``end_of_data`` event is raised. The second mode is a streaming mode. This mode assumes that the source data file is an append-only file to which new data is continually appended. Bro @@ -150,7 +150,7 @@ continually checks for new data at the end of the file and will add the new data to the table. If newer lines in the file have the same index as previous lines, they will overwrite the values in the output table. Because of the nature of streaming reads (data is continually added to the table), -the ``update_finished`` event is never raised when using streaming reads. +the ``end_of_data`` event is never raised when using streaming reads. The reading mode can be selected by setting the ``mode`` option of the add_table call. Valid values are ``MANUAL`` (the default), ``REREAD`` diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index b5f44669c2..08ab0defb0 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -125,8 +125,8 @@ export { ## id: string value identifying the stream global force_update: function(id: string) : bool; - ## Event that is called, when the update of a specific source is finished - global update_finished: event(name: string, source:string); + ## Event that is called, when the end of a data source has been reached, usually after an update + global end_of_data: event(name: string, source:string); } @load base/input.bif diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 83e9dc9bc5..b3906ff73c 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -196,7 +196,7 @@ Manager::TableStream::~TableStream() Manager::Manager() { - update_finished = internal_handler("Input::update_finished"); + end_of_data = internal_handler("Input::end_of_data"); } Manager::~Manager() @@ -1169,8 +1169,12 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str()); #endif - if ( i->stream_type == EVENT_STREAM ) // nothing to do.. + if ( i->stream_type == EVENT_STREAM ) + { + // just signal the end of the data source + SendEndOfData(i); return; + } assert(i->stream_type == TABLE_STREAM); TableStream* stream = (TableStream*) i; @@ -1251,14 +1255,29 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) stream->currDict->SetDeleteFunc(input_hash_delete_func); #ifdef DEBUG - DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", + DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s", i->name.c_str()); #endif - // Send event that the current update is indeed finished. - SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); + SendEndOfData(i); } +void Manager::SendEndOfData(ReaderFrontend* reader) { + Stream *i = FindStream(reader); + + if ( i == 0 ) + { + reporter->InternalError("Unknown reader in SendEndOfData"); + return; + } + + SendEndOfData(i); +} + +void Manager::SendEndOfData(const Stream *i) { + SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); +} + void Manager::Put(ReaderFrontend* reader, Value* *vals) { Stream *i = FindStream(reader); diff --git a/src/input/Manager.h b/src/input/Manager.h index cc81df38b7..b7650d33c6 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -78,7 +78,7 @@ public: * input.bif, which just forwards here. */ bool RemoveStream(const string &id); - + protected: friend class ReaderFrontend; friend class PutMessage; @@ -89,6 +89,7 @@ protected: friend class EndCurrentSendMessage; friend class ReaderClosedMessage; friend class DisableMessage; + friend class EndOfDataMessage; // For readers to write to input stream in direct mode (reporting // new/deleted values directly). Functions take ownership of @@ -96,6 +97,9 @@ protected: void Put(ReaderFrontend* reader, threading::Value* *vals); void Clear(ReaderFrontend* reader); bool Delete(ReaderFrontend* reader, threading::Value* *vals); + // Trigger sending the End-of-Data event when the input source has + // finished reading. Just use in direct mode. + void SendEndOfData(ReaderFrontend* reader); // For readers to write to input stream in indirect mode (manager is // monitoring new/deleted values) Functions take ownership of @@ -119,7 +123,7 @@ protected: // main thread. This makes sure all data that has ben queued for a // stream is still received. bool RemoveStreamContinuation(ReaderFrontend* reader); - + /** * Deletes an existing input stream. * @@ -154,15 +158,18 @@ private: // equivalend in threading cannot be used, because we have support // different types from the log framework bool IsCompatibleType(BroType* t, bool atomic_only=false); - // Check if a record is made up of compatible types and return a list // of all fields that are in the record in order. Recursively unrolls // records bool UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend, bool allow_file_func); + // Send events void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, list events); + + // Implementation of SendEndOfData (send end_of_data event) + void SendEndOfData(const Stream *i); // Call predicate function and return result. bool CallPred(Func* pred_func, const int numvals, ...); @@ -193,6 +200,7 @@ private: // Converts a Bro ListVal to a RecordVal given the record type. RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); + Stream* FindStream(const string &name); Stream* FindStream(ReaderFrontend* reader); @@ -200,7 +208,7 @@ private: map readers; - EventHandlerPtr update_finished; + EventHandlerPtr end_of_data; }; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 81060be7d5..74f5306271 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -108,6 +108,20 @@ public: private: }; +class EndOfDataMessage : public threading::OutputMessage { +public: + EndOfDataMessage(ReaderFrontend* reader) + : threading::OutputMessage("EndOfData", reader) {} + + virtual bool Process() + { + input_mgr->SendEndOfData(Object()); + return true; + } + +private: +}; + class ReaderClosedMessage : public threading::OutputMessage { public: ReaderClosedMessage(ReaderFrontend* reader) @@ -183,6 +197,11 @@ void ReaderBackend::EndCurrentSend() SendOut(new EndCurrentSendMessage(frontend)); } +void ReaderBackend::EndOfData() + { + SendOut(new EndOfDataMessage(frontend)); + } + void ReaderBackend::SendEntry(Value* *vals) { SendOut(new SendEntryMessage(frontend, vals)); diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 8ee14c808a..32c668fb0d 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -280,6 +280,16 @@ protected: * */ void Clear(); + + /** + * Method telling the manager that we finished reading the current + * data source. Will trigger an end_of_data event. + * + * Note: When using SendEntry/the tracking mode this is triggered + * automatically by EndCurrentSend(). Only use if not using the + * tracking mode. Otherwise the event will be sent twice. + */ + void EndOfData(); // Content-sending-functions (tracking mode): Only changed lines are propagated. diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.event/out b/testing/btest/Baseline/scripts.base.frameworks.input.event/out index 49c1015198..c3f6d1ceba 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.event/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.event/out @@ -4,13 +4,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -23,13 +16,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -42,13 +28,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -61,13 +40,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -80,13 +52,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -99,13 +64,6 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] @@ -118,16 +76,10 @@ print outfile, A::description; print outfile, A::tpe; print outfile, A::i; print outfile, A::b; -try = try + 1; -if (7 == try) -{ -close(outfile); -terminate(); -} - }, config={ }] Input::EVENT_NEW 7 T +End-of-data diff --git a/testing/btest/scripts/base/frameworks/input/basic.bro b/testing/btest/scripts/base/frameworks/input/basic.bro index faab303534..dfac84d062 100644 --- a/testing/btest/scripts/base/frameworks/input/basic.bro +++ b/testing/btest/scripts/base/frameworks/input/basic.bro @@ -55,7 +55,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; print outfile, to_count(servers[-42]$ns); # try to actually use a string. If null-termination is wrong this will fail. diff --git a/testing/btest/scripts/base/frameworks/input/bignumber.bro b/testing/btest/scripts/base/frameworks/input/bignumber.bro index 250f84bbb2..5b93472551 100644 --- a/testing/btest/scripts/base/frameworks/input/bignumber.bro +++ b/testing/btest/scripts/base/frameworks/input/bignumber.bro @@ -37,7 +37,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/empty-values-hashing.bro b/testing/btest/scripts/base/frameworks/input/empty-values-hashing.bro index b66febba82..c8760b467e 100644 --- a/testing/btest/scripts/base/frameworks/input/empty-values-hashing.bro +++ b/testing/btest/scripts/base/frameworks/input/empty-values-hashing.bro @@ -73,7 +73,7 @@ event bro_init() } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, "==========SERVERS============"; print outfile, servers; diff --git a/testing/btest/scripts/base/frameworks/input/emptyvals.bro b/testing/btest/scripts/base/frameworks/input/emptyvals.bro index a2a9ba3070..94b0f1b620 100644 --- a/testing/btest/scripts/base/frameworks/input/emptyvals.bro +++ b/testing/btest/scripts/base/frameworks/input/emptyvals.bro @@ -40,7 +40,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/event.bro b/testing/btest/scripts/base/frameworks/input/event.bro index d0088472e7..ba47d5e3f2 100644 --- a/testing/btest/scripts/base/frameworks/input/event.bro +++ b/testing/btest/scripts/base/frameworks/input/event.bro @@ -22,7 +22,6 @@ @load frameworks/communication/listen global outfile: file; -global try: count; module A; @@ -37,18 +36,18 @@ event line(description: Input::EventDescription, tpe: Input::Event, i: int, b: b print outfile, tpe; print outfile, i; print outfile, b; - try = try + 1; - if ( try == 7 ) - { - close(outfile); - terminate(); - } } event bro_init() { - try = 0; outfile = open("../out"); Input::add_event([$source="../input.log", $name="input", $fields=Val, $ev=line, $want_record=F]); Input::remove("input"); } + +event Input::end_of_data(name: string, source:string) + { + print outfile, "End-of-data"; + close(outfile); + terminate(); + } diff --git a/testing/btest/scripts/base/frameworks/input/invalidnumbers.bro b/testing/btest/scripts/base/frameworks/input/invalidnumbers.bro index 3c755f1d08..1deec605ae 100644 --- a/testing/btest/scripts/base/frameworks/input/invalidnumbers.bro +++ b/testing/btest/scripts/base/frameworks/input/invalidnumbers.bro @@ -41,7 +41,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; terminate(); diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro index 9707af7f94..c08b1420fb 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro @@ -38,7 +38,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro index 18349f1515..9e420e75fe 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro @@ -38,7 +38,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/optional.bro b/testing/btest/scripts/base/frameworks/input/optional.bro index 23e0b1e4d1..2fe0e5c86f 100644 --- a/testing/btest/scripts/base/frameworks/input/optional.bro +++ b/testing/btest/scripts/base/frameworks/input/optional.bro @@ -48,7 +48,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/port.bro b/testing/btest/scripts/base/frameworks/input/port.bro index 2f061e9507..081c59559b 100644 --- a/testing/btest/scripts/base/frameworks/input/port.bro +++ b/testing/btest/scripts/base/frameworks/input/port.bro @@ -43,7 +43,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers[1.2.3.4]; print outfile, servers[1.2.3.5]; diff --git a/testing/btest/scripts/base/frameworks/input/predicate.bro b/testing/btest/scripts/base/frameworks/input/predicate.bro index fcd986c9a6..8fb33242e8 100644 --- a/testing/btest/scripts/base/frameworks/input/predicate.bro +++ b/testing/btest/scripts/base/frameworks/input/predicate.bro @@ -47,7 +47,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { if ( 1 in servers ) print outfile, "VALID"; diff --git a/testing/btest/scripts/base/frameworks/input/predicatemodify.bro b/testing/btest/scripts/base/frameworks/input/predicatemodify.bro index 1d6a54fe38..17467bbc27 100644 --- a/testing/btest/scripts/base/frameworks/input/predicatemodify.bro +++ b/testing/btest/scripts/base/frameworks/input/predicatemodify.bro @@ -51,7 +51,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/predicatemodifyandreread.bro b/testing/btest/scripts/base/frameworks/input/predicatemodifyandreread.bro index 9b8758bf3f..5a9e993651 100644 --- a/testing/btest/scripts/base/frameworks/input/predicatemodifyandreread.bro +++ b/testing/btest/scripts/base/frameworks/input/predicatemodifyandreread.bro @@ -94,7 +94,7 @@ event bro_init() ]); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { try = try + 1; print outfile, fmt("Update_finished for %s, try %d", name, try); diff --git a/testing/btest/scripts/base/frameworks/input/predicaterefusesecondsamerecord.bro b/testing/btest/scripts/base/frameworks/input/predicaterefusesecondsamerecord.bro index d572b30090..ba0b468cdc 100644 --- a/testing/btest/scripts/base/frameworks/input/predicaterefusesecondsamerecord.bro +++ b/testing/btest/scripts/base/frameworks/input/predicaterefusesecondsamerecord.bro @@ -48,7 +48,7 @@ event bro_init() Input::remove("input"); } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/repeat.bro b/testing/btest/scripts/base/frameworks/input/repeat.bro index a5a914932c..a966ac064e 100644 --- a/testing/btest/scripts/base/frameworks/input/repeat.bro +++ b/testing/btest/scripts/base/frameworks/input/repeat.bro @@ -45,7 +45,7 @@ event bro_init() } } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, name; print outfile, source; diff --git a/testing/btest/scripts/base/frameworks/input/reread.bro b/testing/btest/scripts/base/frameworks/input/reread.bro index 2db58fc6b0..11aa873f9d 100644 --- a/testing/btest/scripts/base/frameworks/input/reread.bro +++ b/testing/btest/scripts/base/frameworks/input/reread.bro @@ -123,7 +123,7 @@ event bro_init() } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print outfile, "==========SERVERS============"; print outfile, servers; diff --git a/testing/btest/scripts/base/frameworks/input/set.bro b/testing/btest/scripts/base/frameworks/input/set.bro index 5215523ee3..b2b5cea323 100644 --- a/testing/btest/scripts/base/frameworks/input/set.bro +++ b/testing/btest/scripts/base/frameworks/input/set.bro @@ -38,7 +38,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/setseparator.bro b/testing/btest/scripts/base/frameworks/input/setseparator.bro index 44b9d08d54..b7148d80bd 100644 --- a/testing/btest/scripts/base/frameworks/input/setseparator.bro +++ b/testing/btest/scripts/base/frameworks/input/setseparator.bro @@ -38,7 +38,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/setspecialcases.bro b/testing/btest/scripts/base/frameworks/input/setspecialcases.bro index 239bdfe7e7..022eac9731 100644 --- a/testing/btest/scripts/base/frameworks/input/setspecialcases.bro +++ b/testing/btest/scripts/base/frameworks/input/setspecialcases.bro @@ -42,7 +42,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/subrecord.bro b/testing/btest/scripts/base/frameworks/input/subrecord.bro index 8c845a1842..512b8ec58f 100644 --- a/testing/btest/scripts/base/frameworks/input/subrecord.bro +++ b/testing/btest/scripts/base/frameworks/input/subrecord.bro @@ -62,7 +62,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile); diff --git a/testing/btest/scripts/base/frameworks/input/twotables.bro b/testing/btest/scripts/base/frameworks/input/twotables.bro index f404416049..83ae86cd46 100644 --- a/testing/btest/scripts/base/frameworks/input/twotables.bro +++ b/testing/btest/scripts/base/frameworks/input/twotables.bro @@ -113,7 +113,7 @@ event bro_init() } -event Input::update_finished(name: string, source: string) +event Input::end_of_data(name: string, source: string) { print fin_out, "==========SERVERS============"; #print fin_out, servers; diff --git a/testing/btest/scripts/base/frameworks/input/unsupported_types.bro b/testing/btest/scripts/base/frameworks/input/unsupported_types.bro index 7affa4065d..e1350f61a9 100644 --- a/testing/btest/scripts/base/frameworks/input/unsupported_types.bro +++ b/testing/btest/scripts/base/frameworks/input/unsupported_types.bro @@ -56,7 +56,7 @@ event bro_init() Input::remove("ssh"); } -event Input::update_finished(name: string, source:string) +event Input::end_of_data(name: string, source:string) { print outfile, servers; close(outfile);