From 227159fd04e19758ee4f96e825b420039b793e10 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Thu, 28 Jun 2012 15:08:35 -0700 Subject: [PATCH 1/5] make writer-info work when debugging is enabled --- src/logging/Manager.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 23b6f070a1..69a38b1067 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1261,14 +1261,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo) timer_mgr->Add(winfo->rotation_timer); DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f", - winfo->writer->Path().c_str(), winfo->rotation_timer->Time()); + winfo->writer->Name().c_str(), winfo->rotation_timer->Time()); } } void Manager::Rotate(WriterInfo* winfo) { DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", - winfo->writer->Path().c_str(), network_time); + winfo->writer->Name().c_str(), network_time); // Build a temporary path for the writer to move the file to. struct tm tm; @@ -1297,7 +1297,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o return true; DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", - writer->Path().c_str(), network_time, new_name.c_str()); + writer->Name().c_str(), network_time, new_name.c_str()); WriterInfo* winfo = FindWriter(writer); if ( ! winfo ) From f820ee9f5c31a4b1baae9fd164d335d2ca689568 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Thu, 28 Jun 2012 16:16:48 -0700 Subject: [PATCH 2/5] Introduce support for a table of key/value pairs with further configuration options, with the same userinterface as in the logging interface. Not really tested, but tests still work. --- scripts/base/frameworks/input/main.bro | 7 ++++ src/input/Manager.cc | 32 +++++++++++++++++-- src/input/ReaderBackend.cc | 4 +-- src/input/ReaderBackend.h | 7 ++-- src/input/ReaderFrontend.cc | 11 ++++--- src/input/ReaderFrontend.h | 2 +- src/input/readers/Ascii.cc | 2 +- src/input/readers/Ascii.h | 2 +- src/input/readers/Benchmark.cc | 2 +- src/input/readers/Benchmark.h | 2 +- src/input/readers/Raw.cc | 2 +- src/input/readers/Raw.h | 2 +- .../scripts.base.frameworks.input.event/out | 14 ++++++++ .../out | 2 ++ .../scripts.base.frameworks.input.raw/out | 16 ++++++++++ .../scripts.base.frameworks.input.reread/out | 16 ++++++++++ .../out | 32 +++++++++++++++++++ .../out | 16 ++++++++++ .../out | 28 ++++++++++++---- 19 files changed, 173 insertions(+), 26 deletions(-) diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index f5df72473f..68b291c2e9 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -53,6 +53,10 @@ export { ## really be executed. Parameters are the same as for the event. If true is ## returned, the update is performed. If false is returned, it is skipped. pred: function(typ: Input::Event, left: any, right: any): bool &optional; + + ## A key/value table that will be passed on the reader. + ## Interpretation of the values is left to the reader. + config: table[string] of string &default=table(); }; ## EventFilter description type used for the `event` method. @@ -85,6 +89,9 @@ export { ## The event will receive an Input::Event enum as the first element, and the fields as the following arguments. ev: any; + ## A key/value table that will be passed on the reader. + ## Interpretation of the values is left to the reader. + config: table[string] of string &default=table(); }; ## Create a new table input from a given source. Returns true on success. diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 63fa59d0bc..f9979fbe6e 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -80,6 +80,8 @@ public: EnumVal* type; ReaderFrontend* reader; + TableVal* config; + std::map configmap; RecordVal* description; @@ -103,6 +105,9 @@ Manager::Stream::~Stream() if ( description ) Unref(description); + if ( config ) + Unref(config); + if ( reader ) delete(reader); } @@ -300,6 +305,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) Unref(sourceval); EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); + Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); switch ( mode->InternalInt() ) { @@ -325,8 +331,27 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->name = name; info->source = source; + info->config = config->AsTableVal(); // ref'd by LookupWithDefault Ref(description); - info->description = description; + info->description = description; + + { + HashKey* k; + IterCookie* c = info->config->AsTable()->InitForIteration(); + + TableEntryVal* v; + while ( (v = info->config->AsTable()->NextEntry(k, c)) ) + { + ListVal* index = info->config->RecoverIndex(k); + string key = index->Index(0)->AsString()->CheckString(); + string value = v->Value()->AsString()->CheckString(); + info->configmap.insert(std::make_pair(key, value)); + Unref(index); + delete k; + } + + } + DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", name.c_str()); @@ -451,7 +476,8 @@ bool Manager::CreateEventStream(RecordVal* fval) Unref(want_record); // ref'd by lookupwithdefault assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf ); + + stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf, stream->configmap ); readers[stream->reader] = stream; @@ -628,7 +654,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields ); + stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields, stream->configmap ); readers[stream->reader] = stream; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index e7626084a6..276b5d25b0 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -184,7 +184,7 @@ void ReaderBackend::SendEntry(Value* *vals) } bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields, - const threading::Field* const* arg_fields) + const threading::Field* const* arg_fields, const std::map config) { source = arg_source; mode = arg_mode; @@ -194,7 +194,7 @@ bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_n SetName("InputReader/"+source); // disable if DoInit returns error. - int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); + int success = DoInit(arg_source, mode, arg_num_fields, arg_fields, config); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index a04508d252..c23c68bf7e 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -79,9 +79,12 @@ public: * @param fields The types and names of the fields to be retrieved * from the input source. * + * @param config A string map containing additional configuration options + * for the reader. + * * @return False if an error occured. */ - bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields); + bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields, std::map config); /** * Finishes reading from this input stream in a regular fashion. Must @@ -130,7 +133,7 @@ protected: * provides accessor methods to get them later, and they are passed * in here only for convinience. */ - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; + virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config) = 0; /** * Reader-specific method implementing input finalization at diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index a9a4c778dd..ec1630cd88 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -12,13 +12,13 @@ class InitMessage : public threading::InputMessage { public: InitMessage(ReaderBackend* backend, const string source, ReaderMode mode, - const int num_fields, const threading::Field* const* fields) + const int num_fields, const threading::Field* const* fields, const std::map config) : threading::InputMessage("Init", backend), - source(source), mode(mode), num_fields(num_fields), fields(fields) { } + source(source), mode(mode), num_fields(num_fields), fields(fields), config(config) { } virtual bool Process() { - return Object()->Init(source, mode, num_fields, fields); + return Object()->Init(source, mode, num_fields, fields, config); } private: @@ -26,6 +26,7 @@ private: const ReaderMode mode; const int num_fields; const threading::Field* const* fields; + const std::map config; }; class UpdateMessage : public threading::InputMessage @@ -64,7 +65,7 @@ ReaderFrontend::~ReaderFrontend() } void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields, - const threading::Field* const* fields) + const threading::Field* const* fields, const std::map config) { if ( disabled ) return; @@ -75,7 +76,7 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel source = arg_source; initialized = true; - backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); + backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields, config)); } void ReaderFrontend::Update() diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index c61b194e24..1240831ee6 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -52,7 +52,7 @@ public: * * This method must only be called from the main thread. */ - void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); + void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields, const std::map config); /** * Force an update of the current input source. Actual action depends diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 186d765d21..47bbe2a207 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -83,7 +83,7 @@ void Ascii::DoClose() } } -bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) +bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) { mtime = 0; diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index 335616abfb..c17c5220ed 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -38,7 +38,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 5e4ef090f7..37888b095f 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -36,7 +36,7 @@ void Benchmark::DoClose() { } -bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) +bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) { num_lines = atoi(path.c_str()); diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index 6bb70781fd..e806b9ca4a 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -18,7 +18,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 59899f32fc..9971aa1aa3 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -100,7 +100,7 @@ bool Raw::CloseInput() return true; } -bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields) +bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) { fname = path; mtime = 0; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index b9b45f0b20..fb6b94410b 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -22,7 +22,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.event/out b/testing/btest/Baseline/scripts.base.frameworks.input.event/out index bb3b6d0a9e..5ccc9c0d1e 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.event/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.event/out @@ -4,6 +4,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 1 @@ -14,6 +16,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 2 @@ -24,6 +28,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 3 @@ -34,6 +40,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 4 @@ -44,6 +52,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 5 @@ -54,6 +64,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 6 @@ -64,6 +76,8 @@ print A::description; print A::tpe; print A::i; print A::b; +}, config={ + }] Input::EVENT_NEW 7 diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out index a38f3fce84..51543e143c 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.executeraw/out @@ -4,6 +4,8 @@ print outfile, description; print outfile, tpe; print outfile, s; close(outfile); +}, config={ + }] Input::EVENT_NEW 8 ../input.log diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out b/testing/btest/Baseline/scripts.base.frameworks.input.raw/out index 55e7610e1e..611e5ec378 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.raw/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw/out @@ -3,6 +3,8 @@ print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF @@ -11,6 +13,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF @@ -19,6 +23,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW q3r3057fdf @@ -27,6 +33,8 @@ q3r3057fdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfs\d @@ -35,6 +43,8 @@ sdfs\d print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW @@ -43,6 +53,8 @@ Input::EVENT_NEW print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW dfsdf @@ -51,6 +63,8 @@ dfsdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdf @@ -59,6 +73,8 @@ sdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW 3rw43wRRERLlL#RWERERERE. diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.reread/out b/testing/btest/Baseline/scripts.base.frameworks.input.reread/out index 5cce15f6c7..8b55ced2ac 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.reread/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.reread/out @@ -46,6 +46,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -139,6 +141,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -244,6 +248,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_CHANGED @@ -469,6 +475,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -592,6 +600,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -715,6 +725,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -838,6 +850,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW @@ -961,6 +975,8 @@ print A::outfile, A::typ; print A::outfile, A::left; print A::outfile, A::right; return (T); +}, config={ + }] Type Input::EVENT_NEW diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out index 9d62fdbef4..7dc81ba80d 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.rereadraw/out @@ -3,6 +3,8 @@ print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF @@ -11,6 +13,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF @@ -19,6 +23,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW q3r3057fdf @@ -27,6 +33,8 @@ q3r3057fdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfs\d @@ -35,6 +43,8 @@ sdfs\d print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW @@ -43,6 +53,8 @@ Input::EVENT_NEW print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW dfsdf @@ -51,6 +63,8 @@ dfsdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdf @@ -59,6 +73,8 @@ sdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW 3rw43wRRERLlL#RWERERERE. @@ -67,6 +83,8 @@ Input::EVENT_NEW print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF @@ -75,6 +93,8 @@ sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF @@ -83,6 +103,8 @@ DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW q3r3057fdf @@ -91,6 +113,8 @@ q3r3057fdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdfs\d @@ -99,6 +123,8 @@ sdfs\d print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW @@ -107,6 +133,8 @@ Input::EVENT_NEW print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW dfsdf @@ -115,6 +143,8 @@ dfsdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW sdf @@ -123,6 +153,8 @@ sdf print A::description; print A::tpe; print A::s; +}, config={ + }] Input::EVENT_NEW 3rw43wRRERLlL#RWERERERE. diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out b/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out index 07a3ffdba5..1bf8d4cfef 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.streamraw/out @@ -10,6 +10,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF @@ -25,6 +27,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF @@ -40,6 +44,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW q3r3057fdf @@ -55,6 +61,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW sdfs\d @@ -70,6 +78,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW @@ -85,6 +95,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW dfsdf @@ -100,6 +112,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW sdf @@ -115,6 +129,8 @@ close(A::outfile); Input::remove(input); } +}, config={ + }] Input::EVENT_NEW 3rw43wRRERLlL#RWERERERE. diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out b/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out index a1bbb9bbe4..28bf77f057 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out +++ b/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out @@ -12,7 +12,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=1] T @@ -30,7 +32,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=2] T @@ -48,7 +52,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=3] F @@ -66,7 +72,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=4] F @@ -84,7 +92,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=5] F @@ -102,7 +112,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=6] F @@ -120,7 +132,9 @@ print description; print tpe; print left; print right; -}, pred=] +}, pred=, config={ + +}] Input::EVENT_NEW [i=7] T From 3559a39d59b43b040d5fb6cd5a6c081990e902c0 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 2 Jul 2012 10:03:28 -0700 Subject: [PATCH 3/5] introduce reader-info struct analogous to writer-info. All tests still pass. --- src/input/Manager.cc | 18 +++++---- src/input/ReaderBackend.cc | 53 +++++++++++++++++++++++--- src/input/ReaderBackend.h | 68 +++++++++++++++++++++++++--------- src/input/ReaderFrontend.cc | 25 +++++++------ src/input/ReaderFrontend.h | 24 +++++++++--- src/input/readers/Ascii.cc | 22 +++++------ src/input/readers/Ascii.h | 2 +- src/input/readers/Benchmark.cc | 6 +-- src/input/readers/Benchmark.h | 2 +- src/input/readers/Raw.cc | 10 ++--- src/input/readers/Raw.h | 2 +- 11 files changed, 162 insertions(+), 70 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f9979fbe6e..1f5f17bba8 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -71,7 +71,7 @@ declare(PDict, InputHash); class Manager::Stream { public: string name; - string source; + ReaderBackend::ReaderInfo info; bool removed; ReaderMode mode; @@ -81,7 +81,6 @@ public: EnumVal* type; ReaderFrontend* reader; TableVal* config; - std::map configmap; RecordVal* description; @@ -330,8 +329,11 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->reader = reader_obj; info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->name = name; - info->source = source; info->config = config->AsTableVal(); // ref'd by LookupWithDefault + + ReaderBackend::ReaderInfo readerinfo; + readerinfo.source = source; + Ref(description); info->description = description; @@ -345,13 +347,15 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ListVal* index = info->config->RecoverIndex(k); string key = index->Index(0)->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString(); - info->configmap.insert(std::make_pair(key, value)); + info->info.config.insert(std::make_pair(key, value)); Unref(index); delete k; } } + info->info = readerinfo; + DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", name.c_str()); @@ -477,7 +481,7 @@ bool Manager::CreateEventStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf, stream->configmap ); + stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf ); readers[stream->reader] = stream; @@ -654,7 +658,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields, stream->configmap ); + stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields ); readers[stream->reader] = stream; @@ -1234,7 +1238,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) #endif // Send event that the current update is indeed finished. - SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); + SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str())); } void Manager::Put(ReaderFrontend* reader, Value* *vals) diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 276b5d25b0..6ed70bced0 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -140,6 +140,49 @@ public: } }; +using namespace logging; + +bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt) + { + int size; + + if ( ! (fmt->Read(&source, "source") && + fmt->Read(&size, "config_size")) ) + return false; + + config.clear(); + + while ( size ) + { + string value; + string key; + + if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) ) + return false; + + config.insert(std::make_pair(value, key)); + } + + return true; + } + + +bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const + { + int size = config.size(); + + if ( ! (fmt->Write(source, "source") && + fmt->Write(size, "config_size")) ) + return false; + + for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) + { + if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) ) + return false; + } + + return true; + } ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { @@ -183,18 +226,18 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields, - const threading::Field* const* arg_fields, const std::map config) +bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields, + const threading::Field* const* arg_fields) { - source = arg_source; + info = arg_info; mode = arg_mode; num_fields = arg_num_fields; fields = arg_fields; - SetName("InputReader/"+source); + SetName("InputReader/"+info.source); // disable if DoInit returns error. - int success = DoInit(arg_source, mode, arg_num_fields, arg_fields, config); + int success = DoInit(arg_info, mode, arg_num_fields, arg_fields); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index c23c68bf7e..d7d022d5fa 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -7,6 +7,8 @@ #include "threading/SerialTypes.h" #include "threading/MsgThread.h" +class RemoteSerializer; + namespace input { @@ -65,6 +67,35 @@ public: */ virtual ~ReaderBackend(); + /** + * A struct passing information to the reader at initialization time. + */ + struct ReaderInfo + { + typedef std::map config_map; + + /** + * A string left to the interpretation of the reader + * implementation; it corresponds to the value configured on + * the script-level for the logging filter. + */ + string source; + + /** + * A map of key/value pairs corresponding to the relevant + * filter's "config" table. + */ + config_map config; + + private: + friend class ::RemoteSerializer; + + // Note, these need to be adapted when changing the struct's + // fields. They serialize/deserialize the struct. + bool Read(SerializationFormat* fmt); + bool Write(SerializationFormat* fmt) const; + }; + /** * One-time initialization of the reader to define the input source. * @@ -84,7 +115,7 @@ public: * * @return False if an error occured. */ - bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields, std::map config); + bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields); /** * Finishes reading from this input stream in a regular fashion. Must @@ -112,6 +143,22 @@ public: */ void DisableFrontend(); + /** + * Returns the log fields as passed into the constructor. + */ + const threading::Field* const * Fields() const { return fields; } + + /** + * Returns the additional reader information into the constructor. + */ + const ReaderInfo& Info() const { return info; } + + /** + * Returns the number of log fields as passed into the constructor. + */ + int NumFields() const { return num_fields; } + + protected: // Methods that have to be overwritten by the individual readers @@ -133,7 +180,7 @@ protected: * provides accessor methods to get them later, and they are passed * in here only for convinience. */ - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config) = 0; + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; /** * Reader-specific method implementing input finalization at @@ -162,26 +209,11 @@ protected: */ virtual bool DoUpdate() = 0; - /** - * Returns the input source as passed into Init()/. - */ - const string Source() const { return source; } - /** * Returns the reader mode as passed into Init(). */ const ReaderMode Mode() const { return mode; } - /** - * Returns the number of log fields as passed into Init(). - */ - unsigned int NumFields() const { return num_fields; } - - /** - * Returns the log fields as passed into Init(). - */ - const threading::Field* const * Fields() const { return fields; } - /** * Method allowing a reader to send a specified Bro event. Vals must * match the values expected by the bro event. @@ -282,7 +314,7 @@ private: // from this class, it's running in a different thread! ReaderFrontend* frontend; - string source; + ReaderInfo info; ReaderMode mode; unsigned int num_fields; const threading::Field* const * fields; // raw mapping diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index ec1630cd88..f92a8ec80c 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -11,22 +11,21 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const string source, ReaderMode mode, - const int num_fields, const threading::Field* const* fields, const std::map config) + InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode, + const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), - source(source), mode(mode), num_fields(num_fields), fields(fields), config(config) { } + info(info), mode(mode), num_fields(num_fields), fields(fields) { } virtual bool Process() { - return Object()->Init(source, mode, num_fields, fields, config); + return Object()->Init(info, mode, num_fields, fields); } private: - const string source; + const ReaderBackend::ReaderInfo info; const ReaderMode mode; const int num_fields; const threading::Field* const* fields; - const std::map config; }; class UpdateMessage : public threading::InputMessage @@ -64,8 +63,8 @@ ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields, - const threading::Field* const* fields, const std::map config) +void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields, + const threading::Field* const* arg_fields) { if ( disabled ) return; @@ -73,10 +72,12 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel if ( initialized ) reporter->InternalError("reader initialize twice"); - source = arg_source; + info = arg_info; + num_fields = arg_num_fields; + fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields, config)); + backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields)); } void ReaderFrontend::Update() @@ -110,10 +111,10 @@ void ReaderFrontend::Close() string ReaderFrontend::Name() const { - if ( source.size() ) + if ( info.source.size() ) return ty_name; - return ty_name + "/" + source; + return ty_name + "/" + info.source; } } diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 1240831ee6..fadf2cddb5 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -52,7 +52,7 @@ public: * * This method must only be called from the main thread. */ - void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields, const std::map config); + void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); /** * Force an update of the current input source. Actual action depends @@ -102,13 +102,23 @@ public: */ string Name() const; -protected: - friend class Manager; + /** + * Returns the additional reader information into the constructor. + */ + const ReaderBackend::ReaderInfo& Info() const { return info; } /** - * Returns the source as passed into the constructor. + * Returns the number of log fields as passed into the constructor. */ - const string& Source() const { return source; }; + int NumFields() const { return num_fields; } + + /** + * Returns the log fields as passed into the constructor. + */ + const threading::Field* const * Fields() const { return fields; } + +protected: + friend class Manager; /** * Returns the name of the backend's type. @@ -117,7 +127,9 @@ protected: private: ReaderBackend* backend; // The backend we have instanatiated. - string source; + ReaderBackend::ReaderInfo info; // Meta information as passed to Init(). + const threading::Field* const* fields; // The log fields. + int num_fields; // Information as passed to init(); string ty_name; // Backend type, set by manager. bool disabled; // True if disabled. bool initialized; // True if initialized. diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 47bbe2a207..9e3ad28f9c 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -83,14 +83,14 @@ void Ascii::DoClose() } } -bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { mtime = 0; - file = new ifstream(path.c_str()); + file = new ifstream(info.source.c_str()); if ( ! file->is_open() ) { - Error(Fmt("Init: cannot open %s", path.c_str())); + Error(Fmt("Init: cannot open %s", info.source.c_str())); delete(file); file = 0; return false; @@ -98,7 +98,7 @@ bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* co if ( ReadHeader(false) == false ) { - Error(Fmt("Init: cannot open %s; headers are incorrect", path.c_str())); + Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str())); file->close(); delete(file); file = 0; @@ -147,7 +147,7 @@ bool Ascii::ReadHeader(bool useCached) //printf("Updating fields from description %s\n", line.c_str()); columnMap.clear(); - for ( unsigned int i = 0; i < NumFields(); i++ ) + for ( int i = 0; i < NumFields(); i++ ) { const Field* field = Fields()[i]; @@ -164,7 +164,7 @@ bool Ascii::ReadHeader(bool useCached) } Error(Fmt("Did not find requested field %s in input data file %s.", - field->name.c_str(), Source().c_str())); + field->name.c_str(), Info().source.c_str())); return false; } @@ -367,9 +367,9 @@ bool Ascii::DoUpdate() { // check if the file has changed struct stat sb; - if ( stat(Source().c_str(), &sb) == -1 ) + if ( stat(Info().source.c_str(), &sb) == -1 ) { - Error(Fmt("Could not get stat for %s", Source().c_str())); + Error(Fmt("Could not get stat for %s", Info().source.c_str())); return false; } @@ -403,10 +403,10 @@ bool Ascii::DoUpdate() file = 0; } - file = new ifstream(Source().c_str()); + file = new ifstream(Info().source.c_str()); if ( ! file->is_open() ) { - Error(Fmt("cannot open %s", Source().c_str())); + Error(Fmt("cannot open %s", Info().source.c_str())); return false; } @@ -490,7 +490,7 @@ bool Ascii::DoUpdate() } //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); - assert ( (unsigned int) fpos == NumFields() ); + assert ( fpos == NumFields() ); if ( Mode() == MODE_STREAM ) Put(fields); diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index c17c5220ed..bb7e7a1ce2 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -38,7 +38,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 37888b095f..1b4d39ddf1 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -36,9 +36,9 @@ void Benchmark::DoClose() { } -bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { - num_lines = atoi(path.c_str()); + num_lines = atoi(info.source.c_str()); if ( autospread != 0.0 ) autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); @@ -80,7 +80,7 @@ bool Benchmark::DoUpdate() for ( int i = 0; i < linestosend; i++ ) { Value** field = new Value*[NumFields()]; - for (unsigned int j = 0; j < NumFields(); j++ ) + for (int j = 0; j < NumFields(); j++ ) field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); if ( Mode() == MODE_STREAM ) diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index e806b9ca4a..0f940873e4 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -18,7 +18,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 9971aa1aa3..2fb7e92c40 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -100,15 +100,15 @@ bool Raw::CloseInput() return true; } -bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { - fname = path; + fname = info.source; mtime = 0; execute = false; firstrun = true; bool result; - if ( path.length() == 0 ) + if ( info.source.length() == 0 ) { Error("No source path provided"); return false; @@ -129,11 +129,11 @@ bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* cons } // do Initialization - char last = path[path.length()-1]; + char last = info.source[info.source.length()-1]; if ( last == '|' ) { execute = true; - fname = path.substr(0, fname.length() - 1); + fname = info.source.substr(0, fname.length() - 1); if ( (mode != MODE_MANUAL) ) { diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index fb6b94410b..7d1351e728 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -22,7 +22,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); From 7f83f157fcfe9c56ffb4a88065add8b303a99875 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 2 Jul 2012 10:41:02 -0700 Subject: [PATCH 4/5] add mode to readerinfo - no need to have it separately everywhere anymore. Disable remoteserialization of readerinfo - in contrast to the logging framework this is not needed here (I think). --- src/input/Manager.cc | 15 +++++++-------- src/input/ReaderBackend.cc | 11 ++++++++--- src/input/ReaderBackend.h | 26 +++++++++++--------------- src/input/ReaderFrontend.cc | 11 +++++------ src/input/ReaderFrontend.h | 2 +- src/input/readers/Ascii.cc | 12 ++++++------ src/input/readers/Ascii.h | 2 +- src/input/readers/Benchmark.cc | 8 ++++---- src/input/readers/Benchmark.h | 2 +- src/input/readers/Raw.cc | 14 +++++++------- src/input/readers/Raw.h | 2 +- 11 files changed, 52 insertions(+), 53 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 1f5f17bba8..985e67302a 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -74,8 +74,6 @@ public: ReaderBackend::ReaderInfo info; bool removed; - ReaderMode mode; - StreamType stream_type; // to distinguish between event and table streams EnumVal* type; @@ -305,19 +303,21 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); + + ReaderBackend::ReaderInfo readerinfo; switch ( mode->InternalInt() ) { case 0: - info->mode = MODE_MANUAL; + readerinfo.mode = MODE_MANUAL; break; case 1: - info->mode = MODE_REREAD; + readerinfo.mode = MODE_REREAD; break; case 2: - info->mode = MODE_STREAM; + readerinfo.mode = MODE_STREAM; break; default: @@ -331,7 +331,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->name = name; info->config = config->AsTableVal(); // ref'd by LookupWithDefault - ReaderBackend::ReaderInfo readerinfo; readerinfo.source = source; Ref(description); @@ -481,7 +480,7 @@ bool Manager::CreateEventStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf ); + stream->reader->Init(stream->info, stream->num_fields, logf ); readers[stream->reader] = stream; @@ -658,7 +657,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields ); + stream->reader->Init(stream->info, fieldsV.size(), fields ); readers[stream->reader] = stream; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 6ed70bced0..94120100ab 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -142,6 +142,10 @@ public: using namespace logging; +/* + * I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode. + + bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt) { int size; @@ -184,6 +188,8 @@ bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const return true; } + */ + ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { disabled = true; // disabled will be set correcty in init. @@ -226,18 +232,17 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields, +bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields, const threading::Field* const* arg_fields) { info = arg_info; - mode = arg_mode; num_fields = arg_num_fields; fields = arg_fields; SetName("InputReader/"+info.source); // disable if DoInit returns error. - int success = DoInit(arg_info, mode, arg_num_fields, arg_fields); + int success = DoInit(arg_info, arg_num_fields, arg_fields); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index d7d022d5fa..fd7ac769f2 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -7,8 +7,6 @@ #include "threading/SerialTypes.h" #include "threading/MsgThread.h" -class RemoteSerializer; - namespace input { @@ -87,6 +85,12 @@ public: */ config_map config; + /** + * The opening mode for the input source. + */ + ReaderMode mode; +/* + * I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode. private: friend class ::RemoteSerializer; @@ -94,16 +98,14 @@ public: // fields. They serialize/deserialize the struct. bool Read(SerializationFormat* fmt); bool Write(SerializationFormat* fmt) const; + + */ }; /** * One-time initialization of the reader to define the input source. * - * @param source A string left to the interpretation of the - * reader implementation; it corresponds to the value configured on - * the script-level for the input stream. - * - * @param mode The opening mode for the input source. + * @param @param info Meta information for the writer. * * @param num_fields Number of fields contained in \a fields. * @@ -115,7 +117,7 @@ public: * * @return False if an error occured. */ - bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields); + bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields); /** * Finishes reading from this input stream in a regular fashion. Must @@ -180,7 +182,7 @@ protected: * provides accessor methods to get them later, and they are passed * in here only for convinience. */ - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields) = 0; /** * Reader-specific method implementing input finalization at @@ -209,11 +211,6 @@ protected: */ virtual bool DoUpdate() = 0; - /** - * Returns the reader mode as passed into Init(). - */ - const ReaderMode Mode() const { return mode; } - /** * Method allowing a reader to send a specified Bro event. Vals must * match the values expected by the bro event. @@ -315,7 +312,6 @@ private: ReaderFrontend* frontend; ReaderInfo info; - ReaderMode mode; unsigned int num_fields; const threading::Field* const * fields; // raw mapping diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index f92a8ec80c..2c5d522c2f 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -11,19 +11,18 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode, + InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), - info(info), mode(mode), num_fields(num_fields), fields(fields) { } + info(info), num_fields(num_fields), fields(fields) { } virtual bool Process() { - return Object()->Init(info, mode, num_fields, fields); + return Object()->Init(info, num_fields, fields); } private: const ReaderBackend::ReaderInfo info; - const ReaderMode mode; const int num_fields; const threading::Field* const* fields; }; @@ -63,7 +62,7 @@ ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields, +void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields, const threading::Field* const* arg_fields) { if ( disabled ) @@ -77,7 +76,7 @@ void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields)); + backend->SendIn(new InitMessage(backend, info, num_fields, fields)); } void ReaderFrontend::Update() diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index fadf2cddb5..35235ee2bc 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -52,7 +52,7 @@ public: * * This method must only be called from the main thread. */ - void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); + void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields); /** * Force an update of the current input source. Actual action depends diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 9e3ad28f9c..1731bba872 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -83,7 +83,7 @@ void Ascii::DoClose() } } -bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { mtime = 0; @@ -362,7 +362,7 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) // read the entire file and send appropriate thingies back to InputMgr bool Ascii::DoUpdate() { - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_REREAD: { // check if the file has changed @@ -389,7 +389,7 @@ bool Ascii::DoUpdate() // - this is not that bad) if ( file && file->is_open() ) { - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) { file->clear(); // remove end of file evil bits if ( !ReadHeader(true) ) @@ -492,13 +492,13 @@ bool Ascii::DoUpdate() //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); assert ( fpos == NumFields() ); - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) Put(fields); else SendEntry(fields); } - if ( Mode () != MODE_STREAM ) + if ( Info().mode != MODE_STREAM ) EndCurrentSend(); return true; @@ -508,7 +508,7 @@ bool Ascii::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index bb7e7a1ce2..e1506cbe82 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -38,7 +38,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 1b4d39ddf1..d8dcb543f4 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -36,7 +36,7 @@ void Benchmark::DoClose() { } -bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { num_lines = atoi(info.source.c_str()); @@ -83,7 +83,7 @@ bool Benchmark::DoUpdate() for (int j = 0; j < NumFields(); j++ ) field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) // do not do tracking, spread out elements over the second that we have... Put(field); else @@ -109,7 +109,7 @@ bool Benchmark::DoUpdate() } - if ( Mode() != MODE_STREAM ) + if ( Info().mode != MODE_STREAM ) EndCurrentSend(); return true; @@ -227,7 +227,7 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) num_lines += add; heartbeatstarttime = CurrTime(); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index 0f940873e4..bab564b12a 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -18,7 +18,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 2fb7e92c40..d4a761a931 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -66,7 +66,7 @@ bool Raw::OpenInput() // This is defined in input/fdstream.h in = new boost::fdistream(fileno(file)); - if ( execute && Mode() == MODE_STREAM ) + if ( execute && Info().mode == MODE_STREAM ) fcntl(fileno(file), F_SETFL, O_NONBLOCK); return true; @@ -100,7 +100,7 @@ bool Raw::CloseInput() return true; } -bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { fname = info.source; mtime = 0; @@ -135,10 +135,10 @@ bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const execute = true; fname = info.source.substr(0, fname.length() - 1); - if ( (mode != MODE_MANUAL) ) + if ( (info.mode != MODE_MANUAL) ) { Error(Fmt("Unsupported read mode %d for source %s in execution mode", - mode, fname.c_str())); + info.mode, fname.c_str())); return false; } @@ -187,7 +187,7 @@ bool Raw::DoUpdate() else { - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_REREAD: { // check if the file has changed @@ -210,7 +210,7 @@ bool Raw::DoUpdate() case MODE_MANUAL: case MODE_STREAM: - if ( Mode() == MODE_STREAM && file != NULL && in != NULL ) + if ( Info().mode == MODE_STREAM && file != NULL && in != NULL ) { //fpurge(file); in->clear(); // remove end of file evil bits @@ -254,7 +254,7 @@ bool Raw::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 7d1351e728..48912b70a7 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -22,7 +22,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); From f65e3f5b9f55c3009ec81b2b9636074887f551d0 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Mon, 2 Jul 2012 11:07:50 -0700 Subject: [PATCH 5/5] fix small bug - now configuration actually is passed. --- src/input/Manager.cc | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 985e67302a..7b356fc05e 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -304,20 +304,18 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); - ReaderBackend::ReaderInfo readerinfo; - switch ( mode->InternalInt() ) { case 0: - readerinfo.mode = MODE_MANUAL; + info->info.mode = MODE_MANUAL; break; case 1: - readerinfo.mode = MODE_REREAD; + info->info.mode = MODE_REREAD; break; case 2: - readerinfo.mode = MODE_STREAM; + info->info.mode = MODE_STREAM; break; default: @@ -331,7 +329,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->name = name; info->config = config->AsTableVal(); // ref'd by LookupWithDefault - readerinfo.source = source; + info->info.source = source; Ref(description); info->description = description; @@ -353,9 +351,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) } - info->info = readerinfo; - - DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", name.c_str());