diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index cac1aca54a..7e581070e6 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -4,11 +4,14 @@ module Input; export { + redef enum Input::ID += { TABLE_READ }; - + ## The default input reader used. Defaults to `READER_ASCII`. const default_reader = READER_ASCII &redef; + const default_mode = MANUAL &redef; + ## Stream decription type used for the `create_stream` method type StreamDescription: record { ## String that allows the reader to find the source. @@ -17,6 +20,9 @@ export { ## Reader to use for this steam reader: Reader &default=default_reader; + + ## Read mode to use for this stream + mode: Mode &default=default_mode; }; ## TableFilter description type used for the `add_tablefilter` method. diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 6655ae5e82..d4e5cdaee9 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -685,18 +685,28 @@ bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { return false; } - map::iterator it = i->filters.find(id->InternalInt()); - if ( it == i->filters.end() ) { + bool found = false; + int filterId; + for ( map::iterator it = i->filters.begin(); it != i->filters.end(); ++it ) { + if ( (*it).second->name == name ) { + found = true; + filterId = (*it).first; + + if ( (*it).second->filter_type != EVENT_FILTER ) { + reporter->Error("Trying to remove filter %s of wrong type", name.c_str()); + return false; + } + + break; + } + } + + if ( !found ) { + reporter->Error("Trying to remove nonexisting filter %s", name.c_str()); return false; } - if ( i->filters[id->InternalInt()]->filter_type != EVENT_FILTER ) { - // wrong type; - return false; - } - - delete (*it).second; - i->filters.erase(it); + i->reader->RemoveFilter(filterId); return true; } @@ -1136,11 +1146,6 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { return success; } -void Manager::Error(ReaderFrontend* reader, const char* msg) -{ - reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg); -} - bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) { EventHandler* handler = event_registry->Lookup(name.c_str()); diff --git a/src/input/Manager.h b/src/input/Manager.h index 9e35dd2199..be84ee416d 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -1,4 +1,6 @@ // See the file "COPYING" in the main distribution directory for copyright. +// +// Class for managing input streams and filters #ifndef INPUT_MANAGER_H #define INPUT_MANAGER_H @@ -16,18 +18,100 @@ namespace input { class ReaderFrontend; class ReaderBackend; +/** + * Singleton class for managing input streams. + */ class Manager { public: + /** + * Constructor. + */ Manager(); + + /** + * Destructor. + */ + ~Manager(); + /** + * Creates a new input stream. + * + * @param id The enum value corresponding the input stream. + * + * @param description A record of script type \c Input:StreamDescription. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ ReaderFrontend* CreateStream(EnumVal* id, RecordVal* description); + + /** + * Force update on a input stream. + * Forces a re-read of the whole input source. + * Usually used, when an input stream is opened in managed mode. + * Otherwise, this can be used to trigger a input source check before a heartbeat message arrives. + * May be ignored by the reader. + * + * @param id The enum value corresponding the input stream. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool ForceUpdate(const EnumVal* id); + + /** + * Deletes an existing input stream + * + * @param id The enum value corresponding the input stream. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool RemoveStream(const EnumVal* id); + /** + * Add a filter to an input source, which will write the data from the data source into + * a Bro table. + * + * @param id The enum value corresponding the input stream. + * + * @param description A record of script type \c Input:TableFilter. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool AddTableFilter(EnumVal *id, RecordVal* filter); + + /** + * Removes a tablefilter from the log stream + * + * @param id The enum value corresponding the input stream. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool RemoveTableFilter(EnumVal* id, const string &name); + /** + * Add a filter to an input source, which sends events for read input data. + * + * @param id The enum value corresponding the input stream. + * + * @param description A record of script type \c Input:EventFilter. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool AddEventFilter(EnumVal *id, RecordVal* filter); + + /** + * Removes a eventfilter from the log stream + * + * @param id The enum value corresponding the input stream. + * + * This method corresponds directly to the internal BiF defined in + * input.bif, which just forwards here. + */ bool RemoveEventFilter(EnumVal* id, const string &name); protected: @@ -41,46 +125,76 @@ protected: friend class FilterRemovedMessage; friend class ReaderFinishedMessage; - // Reports an error for the given reader. - void Error(ReaderFrontend* reader, const char* msg); - - // for readers to write to input stream in direct mode (reporting new/deleted values directly) + // For readers to write to input stream in direct mode (reporting new/deleted values directly) + // Functions take ownership of threading::Value fields void Put(const ReaderFrontend* reader, int id, threading::Value* *vals); void Clear(const ReaderFrontend* reader, int id); bool Delete(const ReaderFrontend* reader, int id, threading::Value* *vals); // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) + // Functions take ownership of threading::Value fields void SendEntry(const ReaderFrontend* reader, const int id, threading::Value* *vals); void EndCurrentSend(const ReaderFrontend* reader, const int id); + // Allows readers to directly send Bro events. + // The num_vals and vals must be the same the named event expects. + // Takes ownership of threading::Value fields bool SendEvent(const string& name, const int num_vals, threading::Value* *vals); + // Instantiates a new ReaderBackend of the given type (note that + // doing so creates a new thread!). ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); + // Functions are called from the ReaderBackend to notify the manager, that a filter has been removed + // or a stream has been closed. + // Used to prevent race conditions where data for a specific filter is still in the queue when the + // RemoveFilter directive is executed by the main thread. + // This makes sure all data that has ben queued for a filter is still received. bool RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId); bool RemoveStreamContinuation(const ReaderFrontend* reader); private: struct ReaderInfo; + // SendEntry implementation for Tablefilter int SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + + // Put implementation for Tablefilter int PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + + // SendEntry and Put implementation for Eventfilter int SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals); + // Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types + // from the log framework bool IsCompatibleType(BroType* t, bool atomic_only=false); + // Check if a record is made up of compatible types and return a list of all fields that are in the record in order. + // Recursively unrolls records bool UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend); + // Send events void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, list events); + // get a hashkey for a set of threading::Values HashKey* HashValues(const int num_elements, const threading::Value* const *vals); + + // Get the memory used by a specific value int GetValueLength(const threading::Value* val); + // Copies the raw data in a specific threading::Value to position sta int CopyValue(char *data, const int startpos, const threading::Value* val); + // Convert Threading::Value to an internal Bro Type (works also with Records) Val* ValueToVal(const threading::Value* val, BroType* request_type); + + // Convert Threading::Value to an internal Bro List type Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals); + + // Converts a threading::value to a record type. mostly used by ValueToVal RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position); + + // Converts a Bro ListVal to a RecordVal given the record type RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); ReaderInfo* FindReader(const ReaderFrontend* reader); diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index de4a056c22..c6fbaac715 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -11,20 +11,90 @@ namespace input { class ReaderFrontend; +/** + * Base class for reader implementation. When the input:Manager creates a + * new input stream, it instantiates a ReaderFrontend. That then in turn + * creates a ReaderBackend of the right type. The frontend then forwards + * message over the backend as its methods are called. + * + * All of this methods must be called only from the corresponding child + * thread (the constructor is the one exception.) + */ class ReaderBackend : public threading::MsgThread { public: + /** + * Constructor. + * + * @param frontend The frontend reader that created this backend. The + * *only* purpose of this value is to be passed back via messages as + * a argument to callbacks. One must not otherwise access the + * frontend, it's running in a different thread. + * + * @param frontend pointer to the reader frontend + */ ReaderBackend(ReaderFrontend* frontend); + /** + * Destructor. + */ virtual ~ReaderBackend(); - + + /** + * One-time initialization of the reader to define the input source. + * + * @param arg_source A string left to the interpretation of the reader + * implementation; it corresponds to the value configured on the + * script-level for the input stream. + * + * @param num_fields The number of log fields for the stream. + * + * @param fields An array of size \a num_fields with the log fields. + * The methods takes ownership of the array. + * + * @return False if an error occured. + */ bool Init(string arg_source); + /** + * Add an input filter to the input stream + * + * @param id identifier of the input stream + * + * @param arg_num_fields number of fields contained in \a fields + * + * @param fields the types and names of the fields to be retrieved from the input source + * + * @return False if an error occured. + */ bool AddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); + + /** + * Remove an input filter to the input stream + * + * @param id identifier of the input stream + * + * @return False if an error occured. + */ bool RemoveFilter ( int id ); + /** + * Finishes reading from this input stream in a regular fashion. Must not be + * called if an error has been indicated earlier. After calling this, + * no further reading from the stream can be performed + * + * @return False if an error occured. + */ void Finish(); + /** + * Force trigger an update of the input stream. + * The action that will be taken depends on the current read mode and the individual input backend + * + * An backend can choose to ignore this. + * + * @return False if an error occured. + */ bool Update(); /** @@ -34,30 +104,126 @@ public: void DisableFrontend(); protected: - // Methods that have to be overwritten by the individual readers + // Methods that have to be overwritten by the individual readers + + /** + * Reader-specific intialization method. + * + * A reader implementation must override this method. If it returns + * false, it will be assumed that a fatal error has occured that + * prevents the reader from further operation; it will then be + * disabled and eventually deleted. When returning false, an + * implementation should also call Error() to indicate what happened. + */ virtual bool DoInit(string arg_sources) = 0; + /** + * Reader-specific method to add a filter. + * + * A reader implementation must override this method. + */ virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ) = 0; + /** + * Reader-specific method to remove a filter. + * + * A reader implementation must override this method. + */ virtual bool DoRemoveFilter( int id ) = 0; + /** + * Reader-specific method implementing input finalization at + * termination. + * + * A reader implementation must override this method but it can just + * ignore calls if an input source must not be closed. + * + * After the method is called, the writer will be deleted. If an error occurs + * during shutdown, an implementation should also call Error() to indicate what + * happened. + */ virtual void DoFinish() = 0; - // update file contents to logmgr + /** + * Reader-specific method implementing the forced update trigger + * + * A reader implementation must override this method but it can just ignore + * calls, if a forced update does not fit the input source or the current input + * reading mode + */ virtual bool DoUpdate() = 0; - // The following methods return the information as passed to Init(). + /** + * Returns the input source as passed into the constructor. + */ const string Source() const { return source; } + /** + * Method allowing a reader to send a specified bro event. + * Vals must match the values expected by the bro event. + * + * @param name name of the bro event to send + * + * @param num_vals number of entries in \a vals + * + * @param vals the values to be given to the event + */ void SendEvent(const string& name, const int num_vals, threading::Value* *vals); - // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table + // Content-sending-functions (simple mode). Including table-specific stuff that simply is not used if we have no table + /** + * Method allowing a reader to send a list of values read for a specific filter back to the manager. + * + * If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised + * + * @param id the input filter id for which the values are sent + * + * @param val list of threading::Values expected by the filter + */ void Put(int id, threading::Value* *val); + + /** + * Method allowing a reader to delete a specific value from a bro table. + * + * If the receiving filter is an event, only a removed event is raised + * + * @param id the input filter id for which the values are sent + * + * @param val list of threading::Values expected by the filter + */ void Delete(int id, threading::Value* *val); + + /** + * Method allowing a reader to clear a value from a bro table. + * + * If the receiving filter is an event, this is ignored. + * + * @param id the input filter id for which the values are sent + */ void Clear(int id); - // Table-functions (tracking mode): Only changed lines are propagated. + // Content-sending-functions (tracking mode): Only changed lines are propagated. + + + /** + * Method allowing a reader to send a list of values read for a specific filter back to the manager. + * + * If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised. + * + * @param id the input filter id for which the values are sent + * + * @param val list of threading::Values expected by the filter + */ void SendEntry(int id, threading::Value* *vals); + + /** + * Method telling the manager, that the current list of entries sent by SendEntry is finished. + * + * For table filters, all entries that were not updated since the last EndCurrentSend will be deleted, because they are no longer + * present in the input source + * + * @param id the input filter id for which the values are sent + */ void EndCurrentSend(int id); @@ -68,11 +234,7 @@ private: string source; - // When an error occurs, this method is called to set a flag marking the - // writer as disabled. - bool disabled; - bool Disabled() { return disabled; } // For implementing Fmt(). char* buf; diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 97433c8af6..c29071612d 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -51,15 +51,37 @@ public: */ void Init(string arg_source); + /** + * Force an update of the current input source. Actual action depends on + * the opening mode and on the input source. + * + * This method generates a message to the backend reader and triggers + * the corresponding message there. + * This method must only be called from the main thread. + */ void Update(); - /* * The method takes - * ownership of \a fields. */ - + /** + * Add a filter to the current input source. + * + * See ReaderBackend::AddFilter for arguments. + * + * The method takes ownership of \a fields + */ void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields ); + /** + * Removes a filter to the current input source. + */ void RemoveFilter ( const int id ); + /** + * Finalizes writing to this tream. + * + * This method generates a message to the backend reader and triggers + * the corresponding message there. + * This method must only be called from the main thread. + */ void Finish(); /** @@ -92,6 +114,9 @@ public: protected: friend class Manager; + /** + * Returns the source as passed into the constructor + */ const string Source() const { return source; }; string ty_name; // Name of the backend type. Set by the manager. diff --git a/src/types.bif b/src/types.bif index 9256fe3bd0..1529319197 100644 --- a/src/types.bif +++ b/src/types.bif @@ -185,4 +185,10 @@ enum ID %{ Unknown, %} +enum Mode %{ + MANUAL, + REREAD, + STREAM, +%} + module GLOBAL;