diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 79d42fe71f..6655ae5e82 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -534,7 +534,6 @@ bool Manager::RemoveStream(const EnumVal* id) { if ( (*s)->id == id ) { i = (*s); - readers.erase(s); // remove from vector break; } } @@ -545,11 +544,29 @@ bool Manager::RemoveStream(const EnumVal* id) { i->reader->Finish(); - delete(i); - return true; } +bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) { + ReaderInfo *i = 0; + + + for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) + { + if ( (*s)->reader && (*s)->reader == reader ) + { + i = *s; + delete(i); + readers.erase(s); + return true; + } + } + + reporter->Error("Stream not found in RemoveStreamContinuation"); + return false; + +} + bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { for ( int i = 0; i < rec->NumFields(); i++ ) { @@ -615,20 +632,51 @@ bool Manager::RemoveTableFilter(EnumVal* id, const string &name) { return false; } - map::iterator it = i->filters.find(id->InternalInt()); - if ( it == i->filters.end() ) { + bool found = false; + int filterId; + + for ( map::iterator it = i->filters.begin(); it != i->filters.end(); ++it ) { + if ( (*it).second->name == name ) { + found = true; + filterId = (*it).first; + + if ( (*it).second->filter_type != TABLE_FILTER ) { + reporter->Error("Trying to remove filter %s of wrong type", name.c_str()); + return false; + } + + break; + } + } + + if ( !found ) { + reporter->Error("Trying to remove nonexisting filter %s", name.c_str()); return false; } - if ( i->filters[id->InternalInt()]->filter_type != TABLE_FILTER ) { - // wrong type; + i->reader->RemoveFilter(filterId); + + return true; +} + +bool Manager::RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->Error("Reader not found"); + return false; + } + + map::iterator it = i->filters.find(filterId); + if ( it == i->filters.end() ) { + reporter->Error("Got RemoveFilterContinuation where filter nonexistant for %d", filterId); return false; } delete (*it).second; i->filters.erase(it); + return true; -} +} bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { ReaderInfo *i = FindReader(id); diff --git a/src/input/Manager.h b/src/input/Manager.h index 45c07895f2..9e35dd2199 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -32,13 +32,14 @@ public: protected: friend class ReaderFrontend; - friend class ErrorMessage; friend class PutMessage; friend class DeleteMessage; friend class ClearMessage; friend class SendEventMessage; friend class SendEntryMessage; friend class EndCurrentSendMessage; + friend class FilterRemovedMessage; + friend class ReaderFinishedMessage; // Reports an error for the given reader. void Error(ReaderFrontend* reader, const char* msg); @@ -56,6 +57,8 @@ protected: ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); + bool RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId); + bool RemoveStreamContinuation(const ReaderFrontend* reader); private: struct ReaderInfo; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index f9992f5f0e..5cb4fe34f2 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -9,21 +9,6 @@ using threading::Field; namespace input { -class ErrorMessage : public threading::OutputMessage { -public: - ErrorMessage(ReaderFrontend* reader, string message) - : threading::OutputMessage("Error", reader), - message(message) {} - - virtual bool Process() { - input_mgr->Error(Object(), message.c_str()); - return true; - } - -private: - string message; -}; - class PutMessage : public threading::OutputMessage { public: PutMessage(ReaderFrontend* reader, int id, Value* *val) @@ -104,7 +89,7 @@ private: class EndCurrentSendMessage : public threading::OutputMessage { public: - EndCurrentSendMessage(ReaderFrontend* reader, int id) + EndCurrentSendMessage(ReaderFrontend* reader, const int id) : threading::OutputMessage("EndCurrentSend", reader), id(id) {} @@ -114,9 +99,46 @@ public: } private: - int id; + const int id; }; +class FilterRemovedMessage : public threading::OutputMessage { +public: + FilterRemovedMessage(ReaderFrontend* reader, const int id) + : threading::OutputMessage("FilterRemoved", reader), + id(id) {} + + virtual bool Process() { + return input_mgr->RemoveFilterContinuation(Object(), id); + } + +private: + const int id; +}; + +class ReaderFinishedMessage : public threading::OutputMessage { +public: + ReaderFinishedMessage(ReaderFrontend* reader) + : threading::OutputMessage("ReaderFinished", reader) {} + + virtual bool Process() { + return input_mgr->RemoveStreamContinuation(Object()); + } + +private: +}; + + +class DisableMessage : public threading::OutputMessage +{ +public: + DisableMessage(ReaderFrontend* writer) + : threading::OutputMessage("Disable", writer) {} + + virtual bool Process() { Object()->SetDisable(); return true; } +}; + + ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { buf = 0; @@ -133,18 +155,6 @@ ReaderBackend::~ReaderBackend() } -void ReaderBackend::Error(const string &msg) -{ - SendOut(new ErrorMessage(frontend, msg)); -} - -/* -void ReaderBackend::Error(const char *msg) -{ - SendOut(new ErrorMessage(frontend, string(msg))); -} */ - - void ReaderBackend::Put(int id, Value* *val) { SendOut(new PutMessage(frontend, id, val)); @@ -181,6 +191,11 @@ bool ReaderBackend::Init(string arg_source) // disable if DoInit returns error. disabled = !DoInit(arg_source); + + if ( disabled ) { + DisableFrontend(); + } + return !disabled; } @@ -192,13 +207,17 @@ bool ReaderBackend::AddFilter(int id, int arg_num_fields, bool ReaderBackend::RemoveFilter(int id) { - return DoRemoveFilter(id); + bool success = DoRemoveFilter(id); + SendOut(new FilterRemovedMessage(frontend, id)); + return success; // yes, I know, noone reads this. } void ReaderBackend::Finish() { DoFinish(); disabled = true; + DisableFrontend(); + SendOut(new ReaderFinishedMessage(frontend)); } bool ReaderBackend::Update() @@ -206,32 +225,9 @@ bool ReaderBackend::Update() return DoUpdate(); } - -// stolen from logwriter -const char* ReaderBackend::Fmt(const char* format, ...) - { - if ( ! buf ) - buf = (char*) malloc(buf_len); - - va_list al; - va_start(al, format); - int n = safe_vsnprintf(buf, buf_len, format, al); - va_end(al); - - if ( (unsigned int) n >= buf_len ) - { // Not enough room, grow the buffer. - buf_len = n + 32; - buf = (char*) realloc(buf, buf_len); - - // Is it portable to restart? - va_start(al, format); - n = safe_vsnprintf(buf, buf_len, format, al); - va_end(al); - } - - return buf; - } - - +void ReaderBackend::DisableFrontend() +{ + SendOut(new DisableMessage(frontend)); +} } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index c12d187545..de4a056c22 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -26,6 +26,12 @@ public: void Finish(); bool Update(); + + /** + * Disables the frontend that has instantiated this backend. Once + * disabled,the frontend will not send any further message over. + */ + void DisableFrontend(); protected: // Methods that have to be overwritten by the individual readers @@ -40,16 +46,9 @@ protected: // update file contents to logmgr virtual bool DoUpdate() = 0; - // Reports an error to the user. - void Error(const string &msg); - //void Error(const char *msg); - // The following methods return the information as passed to Init(). const string Source() const { return source; } - // A thread-safe version of fmt(). (stolen from logwriter) - const char* Fmt(const char* format, ...); - void SendEvent(const string& name, const int num_vals, threading::Value* *vals); // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index 0dac33d5e8..0fdf90d9ad 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -5,8 +5,9 @@ #include "ReaderBackend.h" #include "threading/MsgThread.h" -namespace input { +// FIXME: cleanup of disabled inputreaders is missing. we need this, because stuff can e.g. fail in init and might never be removed afterwards. +namespace input { class InitMessage : public threading::InputMessage { @@ -56,6 +57,19 @@ private: const threading::Field* const* fields; }; +class RemoveFilterMessage : public threading::InputMessage +{ +public: + RemoveFilterMessage(ReaderBackend* backend, const int id) + : threading::InputMessage("RemoveFilter", backend), + id(id) { } + + virtual bool Process() { return Object()->RemoveFilter(id); } + +private: + const int id; +}; + ReaderFrontend::ReaderFrontend(bro_int_t type) { disabled = initialized = false; @@ -103,15 +117,20 @@ void ReaderFrontend::AddFilter(const int id, const int arg_num_fields, const thr backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields)); } +void ReaderFrontend::RemoveFilter(const int id) { + if ( disabled ) + return; + + backend->SendIn(new RemoveFilterMessage(backend, id)); +} + string ReaderFrontend::Name() const - { +{ if ( source.size() ) return ty_name; return ty_name + "/" + source; - } - - +} } diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 876082d9a6..97433c8af6 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -10,20 +10,77 @@ namespace input { class Manager; +/** + * Bridge class between the input::Manager and backend input threads. The + * Manager instantiates one \a ReaderFrontend for each open input stream. + * Each frontend in turns instantiates a ReaderBackend-derived class + * internally that's specific to the particular input format. That backend + * spawns a new thread, and it receives messages from the frontend that + * correspond to method called by the manager. + */ class ReaderFrontend { public: + /** + * Constructor. + * + * type: The backend writer type, with the value corresponding to the + * script-level \c Input::Reader enum (e.g., \a READER_ASCII). The + * frontend will internally instantiate a ReaderBackend of the + * corresponding type. + * + * Frontends must only be instantiated by the main thread. + */ ReaderFrontend(bro_int_t type); + /** + * Destructor. + * + * Frontends must only be destroyed by the main thread. + */ virtual ~ReaderFrontend(); + /** + * Initializes the reader. + * + * This method generates a message to the backend reader and triggers + * the corresponding message there. If the backend method fails, it + * sends a message back that will asynchronously call Disable(). + * + * See ReaderBackend::Init() for arguments. + * This method must only be called from the main thread. + */ void Init(string arg_source); void Update(); + /* * The method takes + * ownership of \a fields. */ + void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields ); + void RemoveFilter ( const int id ); + void Finish(); + /** + * Disables the reader frontend. From now on, all method calls that + * would normally send message over to the backend, turn into no-ops. + * Note though that it does not stop the backend itself, use Finsh() + * to do that as well (this method is primarily for use as callback + * when the backend wants to disable the frontend). + * + * Disabled frontend will eventually be discarded by the + * input::Manager. + * + * This method must only be called from the main thread. + */ + void SetDisable() { disabled = true; } + + /** + * Returns true if the reader frontend has been disabled with SetDisable(). + */ + bool Disabled() { return disabled; } + /** * Returns a descriptive name for the reader, including the type of * the backend and the source used.