diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index 7e581070e6..445f947106 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -23,6 +23,9 @@ export { ## Read mode to use for this stream mode: Mode &default=default_mode; + + ## Automatically start the input stream after the first filter has been added + autostart: bool &default=T; }; ## TableFilter description type used for the `add_tablefilter` method. diff --git a/src/input/Manager.cc b/src/input/Manager.cc index d4e5cdaee9..4438a07c6c 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -214,6 +214,10 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) } EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); + EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); + Val *autostart = description->LookupWithDefault(rtype->FieldOffset("autostart")); + bool do_autostart = ( autostart->InternalInt() == 1 ); + Unref(autostart); // Ref'd by LookupWithDefault ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt()); assert(reader_obj); @@ -229,16 +233,7 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) readers.push_back(info); - reader_obj->Init(source); - /* if ( success == false ) { - assert( RemoveStream(id) ); - return 0; - } */ - reader_obj->Update(); - /* if ( success == false ) { - assert ( RemoveStream(id) ); - return 0; - } */ + reader_obj->Init(source, mode->InternalInt(), do_autostart); return reader_obj; @@ -785,7 +780,7 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va //reporter->Error("Hashing %d val fields", i->num_val_fields); HashKey* valhash = 0; if ( filter->num_val_fields > 0 ) - HashValues(filter->num_val_fields, vals+filter->num_idx_fields); + valhash = HashValues(filter->num_val_fields, vals+filter->num_idx_fields); //reporter->Error("Result: %d", (uint64_t) valhash->Hash()); @@ -794,6 +789,13 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va InputHash *h = filter->lastDict->Lookup(idxhash); if ( h != 0 ) { // seen before + + valhash->Hash(); + + h->valhash->Hash(); + + + if ( filter->num_val_fields == 0 || h->valhash->Hash() == valhash->Hash() ) { // ok, exact duplicate filter->lastDict->Remove(idxhash); diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 5cb4fe34f2..cfc74d33a8 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -185,24 +185,42 @@ void ReaderBackend::SendEntry(int id, Value* *vals) SendOut(new SendEntryMessage(frontend, id, vals)); } -bool ReaderBackend::Init(string arg_source) +bool ReaderBackend::Init(string arg_source, int mode, bool arg_autostart) { source = arg_source; + autostart = arg_autostart; + SetName("InputReader/"+source); // disable if DoInit returns error. - disabled = !DoInit(arg_source); + disabled = !DoInit(arg_source, mode); if ( disabled ) { + Error("Init failed"); DisableFrontend(); } return !disabled; } +bool ReaderBackend::StartReading() { + int success = DoStartReading(); + + if ( success == false ) { + DisableFrontend(); + } + + return success; +} + bool ReaderBackend::AddFilter(int id, int arg_num_fields, const Field* const * arg_fields) { - return DoAddFilter(id, arg_num_fields, arg_fields); + bool success = DoAddFilter(id, arg_num_fields, arg_fields); + if ( success && autostart) { + autostart = false; + return StartReading(); + } + return success; } bool ReaderBackend::RemoveFilter(int id) @@ -230,4 +248,12 @@ void ReaderBackend::DisableFrontend() SendOut(new DisableMessage(frontend)); } +bool ReaderBackend::DoHeartbeat(double network_time, double current_time) +{ + MsgThread::DoHeartbeat(network_time, current_time); + + return true; +} + + } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index c6fbaac715..e34db3e559 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -51,9 +51,24 @@ public: * @param fields An array of size \a num_fields with the log fields. * The methods takes ownership of the array. * + * @param mode the opening mode for the input source + * + * @param autostart automatically start the input source after the first filter has been added + * * @return False if an error occured. */ - bool Init(string arg_source); + bool Init(string arg_source, int mode, bool autostart); + + /** + * One-time start method of the reader. + * + * This method is called from the scripting layer, after all filters have been added. + * No data should be read before this method is called. + * + * If autostart in Init is set to true, this method is called automatically by the backend after + * the first filter has been added. + */ + bool StartReading(); /** * Add an input filter to the input stream @@ -107,7 +122,8 @@ protected: // Methods that have to be overwritten by the individual readers /** - * Reader-specific intialization method. + * Reader-specific intialization method. Note that data may only be read from the input source + * after the Start function has been called. * * A reader implementation must override this method. If it returns * false, it will be assumed that a fatal error has occured that @@ -115,7 +131,19 @@ protected: * disabled and eventually deleted. When returning false, an * implementation should also call Error() to indicate what happened. */ - virtual bool DoInit(string arg_sources) = 0; + virtual bool DoInit(string arg_sources, int mode) = 0; + + /** + * Reader-specific start method. After this function has been called, data may be read from + * the input source and be sent to the specified filters + * + * A reader implementation must override this method. + * If it returns false, it will be assumed that a fatal error has occured + * that prevents the reader from further operation; it will then be + * disabled and eventually deleted. When returning false, an implementation + * should also call Error to indicate what happened. + */ + virtual bool DoStartReading() = 0; /** * Reader-specific method to add a filter. @@ -225,7 +253,14 @@ protected: * @param id the input filter id for which the values are sent */ void EndCurrentSend(int id); - + + /** + * Triggered by regular heartbeat messages from the main thread. + * + * This method can be overridden but once must call + * ReaderBackend::DoHeartbeat(). + */ + virtual bool DoHeartbeat(double network_time, double current_time); private: // Frontend that instantiated us. This object must not be access from @@ -238,7 +273,8 @@ private: // For implementing Fmt(). char* buf; - unsigned int buf_len; + unsigned int buf_len; + bool autostart; }; } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index 0fdf90d9ad..f7fc23bf72 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -12,14 +12,16 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const string source) + InitMessage(ReaderBackend* backend, const string source, const int mode, const bool autostart) : threading::InputMessage("Init", backend), - source(source) { } + source(source), mode(mode), autostart(autostart) { } - virtual bool Process() { return Object()->Init(source); } + virtual bool Process() { return Object()->Init(source, mode, autostart); } private: const string source; + const int mode; + const bool autostart; }; class UpdateMessage : public threading::InputMessage @@ -42,6 +44,16 @@ public: virtual bool Process() { Object()->Finish(); return true; } }; +class StartReadingMessage : public threading::InputMessage +{ +public: + StartReadingMessage(ReaderBackend* backend) + : threading::InputMessage("StartReading", backend) + { } + + virtual bool Process() { Object()->StartReading(); return true; } +}; + class AddFilterMessage : public threading::InputMessage { public: @@ -83,17 +95,17 @@ ReaderFrontend::ReaderFrontend(bro_int_t type) { ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(string arg_source) { +void ReaderFrontend::Init(string arg_source, int mode, bool autostart) { if ( disabled ) return; if ( initialized ) - reporter->InternalError("writer initialize twice"); + reporter->InternalError("reader initialize twice"); source = arg_source; initialized = true; - backend->SendIn(new InitMessage(backend, arg_source)); + backend->SendIn(new InitMessage(backend, arg_source, mode, autostart)); } void ReaderFrontend::Update() { @@ -132,6 +144,13 @@ string ReaderFrontend::Name() const return ty_name + "/" + source; } +void ReaderFrontend::StartReading() { + if ( disabled ) + return; + + backend->SendIn(new StartReadingMessage(backend)); +} + } diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index c29071612d..d67ca299c0 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -49,7 +49,18 @@ public: * See ReaderBackend::Init() for arguments. * This method must only be called from the main thread. */ - void Init(string arg_source); + void Init(string arg_source, int mode, bool autostart); + + /** + * Start the reader. + * + * This methods starts the reader, after all necessary filters have been added. + * It is not necessary to call this function, if autostart has been set. + * If autostart has been set, the reader will be initialized automatically after the first filter has been added + * + * This method must only be called from the main thread. + */ + void StartReading(); /** * Force an update of the current input source. Actual action depends on diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 095d74bf11..cd1723e5e4 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -8,10 +8,14 @@ #include "../../threading/SerializationTypes.h" +#define MANUAL 0 +#define REREAD 1 + using namespace input::reader; using threading::Value; using threading::Field; + FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) : name(arg_name), type(arg_type) { @@ -75,16 +79,41 @@ void Ascii::DoFinish() } } -bool Ascii::DoInit(string path) +bool Ascii::DoInit(string path, int arg_mode) { + started = false; fname = path; + mode = arg_mode; file = new ifstream(path.c_str()); if ( !file->is_open() ) { - Error(Fmt("cannot open %s", fname.c_str())); + Error(Fmt("Init: cannot open %s", fname.c_str())); return false; } + if ( ( mode != MANUAL ) && (mode != REREAD) ) { + Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); + return false; + } + + return true; +} + +bool Ascii::DoStartReading() { + if ( started == true ) { + Error("Started twice"); + return false; + } + + started = true; + switch ( mode ) { + case MANUAL: + DoUpdate(); + break; + default: + assert(false); + } + return true; } @@ -132,7 +161,7 @@ bool Ascii::ReadHeader() { map fields; - // construcr list of field names. + // construct list of field names. istringstream splitstream(line); int pos=0; while ( splitstream ) { @@ -146,6 +175,7 @@ bool Ascii::ReadHeader() { for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + (*it).second.columnMap.clear(); for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { const Field* field = (*it).second.fields[i]; @@ -372,7 +402,6 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) { // read the entire file and send appropriate thingies back to InputMgr bool Ascii::DoUpdate() { - // dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad) if ( file && file->is_open() ) { file->close(); @@ -418,6 +447,7 @@ bool Ascii::DoUpdate() { fit != (*it).second.columnMap.end(); fit++ ){ + if ( (*fit).position > pos || (*fit).secondary_position > pos ) { Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); return false; @@ -455,6 +485,7 @@ bool Ascii::DoUpdate() { } + //file->clear(); // remove end of file evil bits //file->seekg(0, ios::beg); // and seek to start. @@ -463,3 +494,22 @@ bool Ascii::DoUpdate() { } return true; } + +bool Ascii::DoHeartbeat(double network_time, double current_time) +{ + ReaderBackend::DoHeartbeat(network_time, current_time); + + switch ( mode ) { + case MANUAL: + // yay, we do nothing :) + break; + case REREAD: + + + default: + assert(false); + } + + return true; +} + diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index a3bf5c21a6..766716e29d 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -39,7 +39,7 @@ public: protected: - virtual bool DoInit(string path); + virtual bool DoInit(string path, int mode); virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); @@ -48,9 +48,13 @@ protected: virtual void DoFinish(); virtual bool DoUpdate(); + + virtual bool DoStartReading(); private: + virtual bool DoHeartbeat(double network_time, double current_time); + struct Filter { unsigned int num_fields; @@ -84,6 +88,10 @@ private: string unset_field; + int mode; + + bool started; + }; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 7b571e753c..472d10139a 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -106,7 +106,7 @@ void Manager::Process() Message* msg = t->RetrieveOut(); - if ( msg->Process() && network_time ) + if ( msg->Process() ) //&& network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init. did_process = true; else diff --git a/src/types.bif b/src/types.bif index 1529319197..e2a47a7ece 100644 --- a/src/types.bif +++ b/src/types.bif @@ -186,9 +186,9 @@ enum ID %{ %} enum Mode %{ - MANUAL, - REREAD, - STREAM, + MANUAL = 0, + REREAD = 1, + STREAM = 2, %} module GLOBAL; diff --git a/testing/btest/scripts/base/frameworks/input/basic.bro b/testing/btest/scripts/base/frameworks/input/basic.bro index 3b75220625..156898edca 100644 --- a/testing/btest/scripts/base/frameworks/input/basic.bro +++ b/testing/btest/scripts/base/frameworks/input/basic.bro @@ -47,7 +47,6 @@ event bro_init() # first read in the old stuff into the table... Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]); - Input::force_update(A::INPUT); Input::remove_tablefilter(A::INPUT, "ssh"); Input::remove_stream(A::INPUT); } diff --git a/testing/btest/scripts/base/frameworks/input/event.bro b/testing/btest/scripts/base/frameworks/input/event.bro index a07f0934a0..41eba1613c 100644 --- a/testing/btest/scripts/base/frameworks/input/event.bro +++ b/testing/btest/scripts/base/frameworks/input/event.bro @@ -38,5 +38,4 @@ event bro_init() { Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_eventfilter(A::INPUT, [$name="input", $fields=Val, $ev=line]); - Input::force_update(A::INPUT); } diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro index 712a877960..bcbba05a3e 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro @@ -33,7 +33,6 @@ event bro_init() # first read in the old stuff into the table... Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]); - Input::force_update(A::INPUT); } event Input::update_finished(id: Input::ID) { diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro index 7b62ddcddd..1c532ba6a9 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro @@ -33,7 +33,6 @@ event bro_init() # first read in the old stuff into the table... Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]); - Input::force_update(A::INPUT); } event Input::update_finished(id: Input::ID) { diff --git a/testing/btest/scripts/base/frameworks/input/port.bro b/testing/btest/scripts/base/frameworks/input/port.bro index 65d73c54f7..801d6bac3f 100644 --- a/testing/btest/scripts/base/frameworks/input/port.bro +++ b/testing/btest/scripts/base/frameworks/input/port.bro @@ -32,7 +32,6 @@ event bro_init() # first read in the old stuff into the table... Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]); - Input::force_update(A::INPUT); print servers[1.2.3.4]; print servers[1.2.3.5]; print servers[1.2.3.6]; diff --git a/testing/btest/scripts/base/frameworks/input/predicate.bro b/testing/btest/scripts/base/frameworks/input/predicate.bro index bc1ab89bb2..009911e6a8 100644 --- a/testing/btest/scripts/base/frameworks/input/predicate.bro +++ b/testing/btest/scripts/base/frameworks/input/predicate.bro @@ -41,7 +41,6 @@ event bro_init() Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F, $pred(typ: Input::Event, left: Idx, right: bool) = { return right; } ]); - Input::force_update(A::INPUT); } event Input::update_finished(id: Input::ID) { diff --git a/testing/btest/scripts/base/frameworks/input/tableevent.bro b/testing/btest/scripts/base/frameworks/input/tableevent.bro index 36e8171689..0c86ac94b8 100644 --- a/testing/btest/scripts/base/frameworks/input/tableevent.bro +++ b/testing/btest/scripts/base/frameworks/input/tableevent.bro @@ -44,5 +44,4 @@ event bro_init() { Input::create_stream(A::LOG, [$source="input.log"]); Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]); - Input::force_update(A::LOG); } diff --git a/testing/btest/scripts/base/frameworks/input/twofilters.bro b/testing/btest/scripts/base/frameworks/input/twofilters.bro index d5bff0c5bb..260f73e58f 100644 --- a/testing/btest/scripts/base/frameworks/input/twofilters.bro +++ b/testing/btest/scripts/base/frameworks/input/twofilters.bro @@ -40,7 +40,7 @@ global done: bool = F; event bro_init() { # first read in the old stuff into the table... - Input::create_stream(A::INPUT, [$source="input.log"]); + Input::create_stream(A::INPUT, [$source="input.log", $autostart=F]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=destination1, $want_record=F, $pred(typ: Input::Event, left: Idx, right: bool) = { return right; } ]);