diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 1f5f17bba8..985e67302a 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -74,8 +74,6 @@ public: ReaderBackend::ReaderInfo info; bool removed; - ReaderMode mode; - StreamType stream_type; // to distinguish between event and table streams EnumVal* type; @@ -305,19 +303,21 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); + + ReaderBackend::ReaderInfo readerinfo; switch ( mode->InternalInt() ) { case 0: - info->mode = MODE_MANUAL; + readerinfo.mode = MODE_MANUAL; break; case 1: - info->mode = MODE_REREAD; + readerinfo.mode = MODE_REREAD; break; case 2: - info->mode = MODE_STREAM; + readerinfo.mode = MODE_STREAM; break; default: @@ -331,7 +331,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) info->name = name; info->config = config->AsTableVal(); // ref'd by LookupWithDefault - ReaderBackend::ReaderInfo readerinfo; readerinfo.source = source; Ref(description); @@ -481,7 +480,7 @@ bool Manager::CreateEventStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, stream->mode, stream->num_fields, logf ); + stream->reader->Init(stream->info, stream->num_fields, logf ); readers[stream->reader] = stream; @@ -658,7 +657,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, stream->mode, fieldsV.size(), fields ); + stream->reader->Init(stream->info, fieldsV.size(), fields ); readers[stream->reader] = stream; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 6ed70bced0..94120100ab 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -142,6 +142,10 @@ public: using namespace logging; +/* + * I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode. + + bool ReaderBackend::ReaderInfo::Read(SerializationFormat* fmt) { int size; @@ -184,6 +188,8 @@ bool ReaderBackend::ReaderInfo::Write(SerializationFormat* fmt) const return true; } + */ + ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { disabled = true; // disabled will be set correcty in init. @@ -226,18 +232,17 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(const ReaderInfo& arg_info, ReaderMode arg_mode, const int arg_num_fields, +bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields, const threading::Field* const* arg_fields) { info = arg_info; - mode = arg_mode; num_fields = arg_num_fields; fields = arg_fields; SetName("InputReader/"+info.source); // disable if DoInit returns error. - int success = DoInit(arg_info, mode, arg_num_fields, arg_fields); + int success = DoInit(arg_info, arg_num_fields, arg_fields); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index d7d022d5fa..fd7ac769f2 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -7,8 +7,6 @@ #include "threading/SerialTypes.h" #include "threading/MsgThread.h" -class RemoteSerializer; - namespace input { @@ -87,6 +85,12 @@ public: */ config_map config; + /** + * The opening mode for the input source. + */ + ReaderMode mode; +/* + * I don't think the input framework needs remote serialization. If it doesn't, kill this. If it does add ReaderMode. private: friend class ::RemoteSerializer; @@ -94,16 +98,14 @@ public: // fields. They serialize/deserialize the struct. bool Read(SerializationFormat* fmt); bool Write(SerializationFormat* fmt) const; + + */ }; /** * One-time initialization of the reader to define the input source. * - * @param source A string left to the interpretation of the - * reader implementation; it corresponds to the value configured on - * the script-level for the input stream. - * - * @param mode The opening mode for the input source. + * @param @param info Meta information for the writer. * * @param num_fields Number of fields contained in \a fields. * @@ -115,7 +117,7 @@ public: * * @return False if an error occured. */ - bool Init(const ReaderInfo& info, ReaderMode mode, int num_fields, const threading::Field* const* fields); + bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields); /** * Finishes reading from this input stream in a regular fashion. Must @@ -180,7 +182,7 @@ protected: * provides accessor methods to get them later, and they are passed * in here only for convinience. */ - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields) = 0; + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields) = 0; /** * Reader-specific method implementing input finalization at @@ -209,11 +211,6 @@ protected: */ virtual bool DoUpdate() = 0; - /** - * Returns the reader mode as passed into Init(). - */ - const ReaderMode Mode() const { return mode; } - /** * Method allowing a reader to send a specified Bro event. Vals must * match the values expected by the bro event. @@ -315,7 +312,6 @@ private: ReaderFrontend* frontend; ReaderInfo info; - ReaderMode mode; unsigned int num_fields; const threading::Field* const * fields; // raw mapping diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index f92a8ec80c..2c5d522c2f 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -11,19 +11,18 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, ReaderMode mode, + InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), - info(info), mode(mode), num_fields(num_fields), fields(fields) { } + info(info), num_fields(num_fields), fields(fields) { } virtual bool Process() { - return Object()->Init(info, mode, num_fields, fields); + return Object()->Init(info, num_fields, fields); } private: const ReaderBackend::ReaderInfo info; - const ReaderMode mode; const int num_fields; const threading::Field* const* fields; }; @@ -63,7 +62,7 @@ ReaderFrontend::~ReaderFrontend() { } -void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode mode, const int arg_num_fields, +void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields, const threading::Field* const* arg_fields) { if ( disabled ) @@ -77,7 +76,7 @@ void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, ReaderMode fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, info, mode, num_fields, fields)); + backend->SendIn(new InitMessage(backend, info, num_fields, fields)); } void ReaderFrontend::Update() diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index fadf2cddb5..35235ee2bc 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -52,7 +52,7 @@ public: * * This method must only be called from the main thread. */ - void Init(const ReaderBackend::ReaderInfo& info, ReaderMode mode, const int arg_num_fields, const threading::Field* const* fields); + void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields); /** * Force an update of the current input source. Actual action depends diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 9e3ad28f9c..1731bba872 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -83,7 +83,7 @@ void Ascii::DoClose() } } -bool Ascii::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { mtime = 0; @@ -362,7 +362,7 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) // read the entire file and send appropriate thingies back to InputMgr bool Ascii::DoUpdate() { - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_REREAD: { // check if the file has changed @@ -389,7 +389,7 @@ bool Ascii::DoUpdate() // - this is not that bad) if ( file && file->is_open() ) { - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) { file->clear(); // remove end of file evil bits if ( !ReadHeader(true) ) @@ -492,13 +492,13 @@ bool Ascii::DoUpdate() //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); assert ( fpos == NumFields() ); - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) Put(fields); else SendEntry(fields); } - if ( Mode () != MODE_STREAM ) + if ( Info().mode != MODE_STREAM ) EndCurrentSend(); return true; @@ -508,7 +508,7 @@ bool Ascii::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index bb7e7a1ce2..e1506cbe82 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -38,7 +38,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 1b4d39ddf1..d8dcb543f4 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -36,7 +36,7 @@ void Benchmark::DoClose() { } -bool Benchmark::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { num_lines = atoi(info.source.c_str()); @@ -83,7 +83,7 @@ bool Benchmark::DoUpdate() for (int j = 0; j < NumFields(); j++ ) field[j] = EntryToVal(Fields()[j]->type, Fields()[j]->subtype); - if ( Mode() == MODE_STREAM ) + if ( Info().mode == MODE_STREAM ) // do not do tracking, spread out elements over the second that we have... Put(field); else @@ -109,7 +109,7 @@ bool Benchmark::DoUpdate() } - if ( Mode() != MODE_STREAM ) + if ( Info().mode != MODE_STREAM ) EndCurrentSend(); return true; @@ -227,7 +227,7 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time) num_lines += add; heartbeatstarttime = CurrTime(); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Benchmark.h b/src/input/readers/Benchmark.h index 0f940873e4..bab564b12a 100644 --- a/src/input/readers/Benchmark.h +++ b/src/input/readers/Benchmark.h @@ -18,7 +18,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Benchmark(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time); diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index 2fb7e92c40..d4a761a931 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -66,7 +66,7 @@ bool Raw::OpenInput() // This is defined in input/fdstream.h in = new boost::fdistream(fileno(file)); - if ( execute && Mode() == MODE_STREAM ) + if ( execute && Info().mode == MODE_STREAM ) fcntl(fileno(file), F_SETFL, O_NONBLOCK); return true; @@ -100,7 +100,7 @@ bool Raw::CloseInput() return true; } -bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const Field* const* fields) +bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { fname = info.source; mtime = 0; @@ -135,10 +135,10 @@ bool Raw::DoInit(const ReaderInfo& info, ReaderMode mode, int num_fields, const execute = true; fname = info.source.substr(0, fname.length() - 1); - if ( (mode != MODE_MANUAL) ) + if ( (info.mode != MODE_MANUAL) ) { Error(Fmt("Unsupported read mode %d for source %s in execution mode", - mode, fname.c_str())); + info.mode, fname.c_str())); return false; } @@ -187,7 +187,7 @@ bool Raw::DoUpdate() else { - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_REREAD: { // check if the file has changed @@ -210,7 +210,7 @@ bool Raw::DoUpdate() case MODE_MANUAL: case MODE_STREAM: - if ( Mode() == MODE_STREAM && file != NULL && in != NULL ) + if ( Info().mode == MODE_STREAM && file != NULL && in != NULL ) { //fpurge(file); in->clear(); // remove end of file evil bits @@ -254,7 +254,7 @@ bool Raw::DoHeartbeat(double network_time, double current_time) { ReaderBackend::DoHeartbeat(network_time, current_time); - switch ( Mode() ) { + switch ( Info().mode ) { case MODE_MANUAL: // yay, we do nothing :) break; diff --git a/src/input/readers/Raw.h b/src/input/readers/Raw.h index 7d1351e728..48912b70a7 100644 --- a/src/input/readers/Raw.h +++ b/src/input/readers/Raw.h @@ -22,7 +22,7 @@ public: static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } protected: - virtual bool DoInit(const ReaderInfo& info, ReaderMode mode, int arg_num_fields, const threading::Field* const* fields); + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); virtual void DoClose(); virtual bool DoUpdate(); virtual bool DoHeartbeat(double network_time, double current_time);