diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f9979fbe6e..1f5f17bba8 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -71,7 +71,7 @@ declare(PDict, InputHash); class Manager::Stream { public: string name; - string source; + ReaderBackend::ReaderInfo info; bool removed; ReaderMode mode; @@ -81,7 +81,6 @@ public: EnumVal* type; ReaderFrontend* reader; TableVal* config; - std::map configmap; RecordVal* description; @@ -330,8 +329,11 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->reader = reader_obj; info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->name = name; - info->source = source; info->config = config->AsTableVal(); // ref'd by LookupWithDefault + + ReaderBackend::ReaderInfo readerinfo; + readerinfo.source = source; + Ref(description); info->description = description; @@ -345,13 +347,15 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ListVal* index = info->config->RecoverIndex(k); string key = index->Index(0)->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString(); - info->configmap.insert(std::make_pair(key, value)); + info->info.config.insert(std::make_pair(key, value)); Unref(index); delete k; } } + info->info = readerinfo; + DBG_LOG(DBG_INPUT, "Successfully created new input stream %s", name.c_str()); @@ -477,7 +481,7 @@ bool Manager::CreateEventStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, stream->num_fields, logf, stream->configmap ); + stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf ); readers[stream->reader] = stream; @@ -654,7 +658,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->source, stream->mode, fieldsV.size(), fields, stream->configmap ); + stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields ); readers[stream->reader] = stream; @@ -1234,7 +1238,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) #endif // Send event that the current update is indeed finished. - SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->source.c_str())); + SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str())); } void Manager::Put(ReaderFrontend* reader, Value* *vals) diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 276b5d25b0..6ed70bced0 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -140,6 +140,49 @@ public: } }; +using namespace logging; + +bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt) + { + int size; + + if ( ! (fmt->Read(&source, "source") && + fmt->Read(&size, "config_size")) ) + return false; + + config.clear(); + + while ( size ) + { + string value; + string key; + + if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) ) + return false; + + config.insert(std::make_pair(value, key)); + } + + return true; + } + + +bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const + { + int size = config.size(); + + if ( ! (fmt->Write(source, "source") && + fmt->Write(size, "config_size")) ) + return false; + + for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) + { + if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) ) + return false; + } + + return true; + } ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { @@ -183,18 +226,18 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_num_fields, - const threading::Field* const* arg_fields, const std::map config) +bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields, + const threading::Field* const* arg_fields) { - source = arg_source; + info = arg_info; mode = arg_mode; num_fields = arg_num_fields; fields = arg_fields; - SetName("InputReader/"+source); + SetName("InputReader/"+info.source); // disable if DoInit returns error. - int success = DoInit(arg_source, mode, arg_num_fields, arg_fields, config); + int success = DoInit(arg_info, mode, arg_num_fields, arg_fields); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index c23c68bf7e..d7d022d5fa 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -7,6 +7,8 @@ #include "threading/SerialTypes.h" #include "threading/MsgThread.h" +class RemoteSerializer; + namespace input { @@ -65,6 +67,35 @@ public: */ virtual ~ReaderBackend(); + /** + * A struct passing information to the reader at initialization time. + */ + struct ReaderInfo + { + typedef std::map config_map; + + /** + * A string left to the interpretation of the reader + * implementation; it corresponds to the value configured on + * the script-level for the logging filter. + */ + string source; + + /** + * A map of key/value pairs corresponding to the relevant + * filter's "config" table. + */ + config_map config; + + private: + friend class ::RemoteSerializer; + + // Note, these need to be adapted when changing the struct's + // fields. They serialize/deserialize the struct. + bool Read(SerializationFormat* fmt); + bool Write(SerializationFormat* fmt) const; + }; + /** * One-time initialization of the reader to define the input source. * @@ -84,7 +115,7 @@ public: * * @return False if an error occured. */ - bool Init(string source, ReaderMode mode, int num_fields, const threading::Field* const* fields, std::map config); + bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields); /** * Finishes reading from this input stream in a regular fashion. Must @@ -112,6 +143,22 @@ public: */ void DisableFrontend(); + /** + * Returns the log fields as passed into the constructor. + */ + const threading::Field* const * Fields() const { return fields; } + + /** + * Returns the additional reader information into the constructor. + */ + const ReaderInfo& Info() const { return info; } + + /** + * Returns the number of log fields as passed into the constructor. + */ + int NumFields() const { return num_fields; } + + protected: // Methods that have to be overwritten by the individual readers @@ -133,7 +180,7 @@ protected: * provides accessor methods to get them later, and they are passed * in here only for convinience. */ - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config) = 0; + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; /** * Reader-specific method implementing input finalization at @@ -162,26 +209,11 @@ protected: */ virtual bool DoUpdate() = 0; - /** - * Returns the input source as passed into Init()/. - */ - const string Source() const { return source; } - /** * Returns the reader mode as passed into Init(). */ const ReaderMode Mode() const { return mode; } - /** - * Returns the number of log fields as passed into Init(). - */ - unsigned int NumFields() const { return num_fields; } - - /** - * Returns the log fields as passed into Init(). - */ - const threading::Field* const * Fields() const { return fields; } - /** * Method allowing a reader to send a specified Bro event. Vals must * match the values expected by the bro event. @@ -282,7 +314,7 @@ private: // from this class, it's running in a different thread! ReaderFrontend* frontend; - string source; + ReaderInfo info; ReaderMode mode; unsigned int num_fields; const threading::Field* const * fields; // raw mapping diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index ec1630cd88..f92a8ec80c 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -11,22 +11,21 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const string source, ReaderMode mode, - const int num_fields, const threading::Field* const* fields, const std::map config) + InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode, + const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), - source(source), mode(mode), num_fields(num_fields), fields(fields), config(config) { } + info(info), mode(mode), num_fields(num_fields), fields(fields) { } virtual bool Process() { - return Object()->Init(source, mode, num_fields, fields, config); + return Object()->Init(info, mode, num_fields, fields); } private: - const string source; + const ReaderBackend::ReaderInfo info; const ReaderMode mode; const int num_fields; const threading::Field* const* fields; - const std::map config; }; class UpdateMessage : public threading::InputMessage @@ -64,8 +63,8 @@ ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fields, - const threading::Field* const* fields, const std::map config) +void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields, + const threading::Field* const* arg_fields) { if ( disabled ) return; @@ -73,10 +72,12 @@ void ReaderFrontend::Init(string arg_source, ReaderMode mode, const int num_fiel if ( initialized ) reporter->InternalError("reader initialize twice"); - source = arg_source; + info = arg_info; + num_fields = arg_num_fields; + fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields, config)); + backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields)); } void ReaderFrontend::Update() @@ -110,10 +111,10 @@ void ReaderFrontend::Close() string ReaderFrontend::Name() const { - if ( source.size() ) + if ( info.source.size() ) return ty_name; - return ty_name + "/" + source; + return ty_name + "/" + info.source; } } diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 1240831ee6..fadf2cddb5 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -52,7 +52,7 @@ public: * * This method must only be called from the main thread. */ - void Init(string arg_source, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields, const std::map config); + void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); /** * Force an update of the current input source. Actual action depends @@ -102,13 +102,23 @@ public: */ string Name() const; -protected: - friend class Manager; + /** + * Returns the additional reader information into the constructor. + */ + const ReaderBackend::ReaderInfo& Info() const { return info; } /** - * Returns the source as passed into the constructor. + * Returns the number of log fields as passed into the constructor. */ - const string& Source() const { return source; }; + int NumFields() const { return num_fields; } + + /** + * Returns the log fields as passed into the constructor. + */ + const threading::Field* const * Fields() const { return fields; } + +protected: + friend class Manager; /** * Returns the name of the backend's type. @@ -117,7 +127,9 @@ protected: private: ReaderBackend* backend; // The backend we have instanatiated. - string source; + ReaderBackend::ReaderInfo info; // Meta information as passed to Init(). + const threading::Field* const* fields; // The log fields. + int num_fields; // Information as passed to init(); string ty_name; // Backend type, set by manager. bool disabled; // True if disabled. bool initialized; // True if initialized. diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 47bbe2a207..9e3ad28f9c 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -83,14 +83,14 @@ void Ascii::DoClose() } } -bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { mtime = 0; - file = new ifstream(path.c_str()); + file = new ifstream(info.source.c_str()); if ( ! file->is_open() ) { - Error(Fmt("Init: cannot open %s", path.c_str())); + Error(Fmt("Init: cannot open %s", info.source.c_str())); delete(file); file = 0; return false; @@ -98,7 +98,7 @@ bool Ascii::DoInit(string path, ReaderMode mode, int num_fields, const Field* co if ( ReadHeader(false) == false ) { - Error(Fmt("Init: cannot open %s; headers are incorrect", path.c_str())); + Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str())); file->close(); delete(file); file = 0; @@ -147,7 +147,7 @@ bool Ascii::ReadHeader(bool useCached) //printf("Updating fields from description %s\n", line.c_str()); columnMap.clear(); - for ( unsigned int i = 0; i < NumFields(); i++ ) + for ( int i = 0; i < NumFields(); i++ ) { const Field* field = Fields()[i]; @@ -164,7 +164,7 @@ bool Ascii::ReadHeader(bool useCached) } Error(Fmt("Did not find requested field %s in input data file %s.", - field->name.c_str(), Source().c_str())); + field->name.c_str(), Info().source.c_str())); return false; } @@ -367,9 +367,9 @@ bool Ascii::DoUpdate() { // check if the file has changed struct stat sb; - if ( stat(Source().c_str(), &sb) == -1 ) + if ( stat(Info().source.c_str(), &sb) == -1 ) { - Error(Fmt("Could not get stat for %s", Source().c_str())); + Error(Fmt("Could not get stat for %s", Info().source.c_str())); return false; } @@ -403,10 +403,10 @@ bool Ascii::DoUpdate() file = 0; } - file = new ifstream(Source().c_str()); + file = new ifstream(Info().source.c_str()); if ( ! file->is_open() ) { - Error(Fmt("cannot open %s", Source().c_str())); + Error(Fmt("cannot open %s", Info().source.c_str())); return false; } @@ -490,7 +490,7 @@ bool Ascii::DoUpdate() } //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); - assert ( (unsigned int) fpos == NumFields() ); + assert ( fpos == NumFields() ); if ( Mode() == MODE_STREAM ) Put(fields); diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index c17c5220ed..bb7e7a1ce2 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -38,7 +38,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 37888b095f..1b4d39ddf1 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -36,9 +36,9 @@ void Benchmark::DoClose() { } -bool Benchmark::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { - num_lines = atoi(path.c_str()); + num_lines = atoi(info.source.c_str()); if ( autospread != 0.0 ) autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); @@ -80,7 +80,7 @@ bool Benchmark::DoUpdate() for ( int i = 0; i < linestosend; i++ ) { Value** field = new Value*[NumFields()]; - for (unsigned int j = 0; j < NumFields(); j++ ) + for (int j = 0; j < NumFields(); j++ ) field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); if ( Mode() == MODE_STREAM ) diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index e806b9ca4a..0f940873e4 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -18,7 +18,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 9971aa1aa3..2fb7e92c40 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -100,15 +100,15 @@ bool Raw::CloseInput() return true; } -bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* const* fields, const std::map config) +bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) { - fname = path; + fname = info.source; mtime = 0; execute = false; firstrun = true; bool result; - if ( path.length() == 0 ) + if ( info.source.length() == 0 ) { Error("No source path provided"); return false; @@ -129,11 +129,11 @@ bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* cons } // do Initialization - char last = path[path.length()-1]; + char last = info.source[info.source.length()-1]; if ( last == '|' ) { execute = true; - fname = path.substr(0, fname.length() - 1); + fname = info.source.substr(0, fname.length() - 1); if ( (mode != MODE_MANUAL) ) { diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index fb6b94410b..7d1351e728 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -22,7 +22,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } protected: - virtual bool DoInit(string path, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields, const std::map config); + virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time);