diff --git a/src/input/Manager.cc b/src/input/Manager.cc index a97286162d..189a034b0f 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -680,7 +680,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu } -void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const *vals) { +void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -703,7 +703,7 @@ void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const } -void Manager::SendEntryTable(const ReaderFrontend* reader, int id, const Value* const *vals) { +void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); bool updated = false; diff --git a/src/input/Manager.h b/src/input/Manager.h index 507af6468f..a0b98294ca 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -14,7 +14,7 @@ namespace input { class ReaderFrontend; -class ReaderBackend; +class ReaderBackend; class Manager { public: @@ -32,6 +32,13 @@ public: protected: friend class ReaderFrontend; + friend class ErrorMessage; + friend class PutMessage; + friend class DeleteMessage; + friend class ClearMessage; + friend class SendEventMessage; + friend class SendEntryMessage; + friend class EndCurrentSendMessage; // Reports an error for the given reader. void Error(ReaderFrontend* reader, const char* msg); @@ -42,11 +49,14 @@ protected: bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals); // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) - void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals); - void EndCurrentSend(const ReaderFrontend* reader, int id); + void SendEntry(const ReaderFrontend* reader, const int id, const threading::Value* const *vals); + void EndCurrentSend(const ReaderFrontend* reader, const int id); + + bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); + private: struct ReaderInfo; @@ -60,7 +70,6 @@ private: void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, list events); - bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); HashKey* HashValues(const int num_elements, const threading::Value* const *vals); int GetValueLength(const threading::Value* val); diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 8c996db4a1..72c8f95d8e 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -2,6 +2,7 @@ #include "ReaderBackend.h" #include "ReaderFrontend.h" +#include "Manager.h" using threading::Value; using threading::Field; @@ -15,28 +16,106 @@ public: message(message) {} virtual bool Process() { - input_mgr->Error(object, message.c_str()); + input_mgr->Error(Object(), message.c_str()); return true; } private: string message; -} +}; class PutMessage : public threading::OutputMessage { public: PutMessage(ReaderFrontend* reader, int id, const Value* const *val) - : threading::OutputMessage("Error", reader), + : threading::OutputMessage("Put", reader), id(id), val(val) {} virtual bool Process() { - return input_mgr->Put(object, id, val); + input_mgr->Put(Object(), id, val); + return true; } private: int id; - Value* val; -} + const Value* const *val; +}; + +class DeleteMessage : public threading::OutputMessage { +public: + DeleteMessage(ReaderFrontend* reader, int id, const Value* const *val) + : threading::OutputMessage("Delete", reader), + id(id), val(val) {} + + virtual bool Process() { + return input_mgr->Delete(Object(), id, val); + } + +private: + int id; + const Value* const *val; +}; + +class ClearMessage : public threading::OutputMessage { +public: + ClearMessage(ReaderFrontend* reader, int id) + : threading::OutputMessage("Clear", reader), + id(id) {} + + virtual bool Process() { + input_mgr->Clear(Object(), id); + return true; + } + +private: + int id; +}; + +class SendEventMessage : public threading::OutputMessage { +public: + SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, const Value* const *val) + : threading::OutputMessage("SendEvent", reader), + name(name), num_vals(num_vals), val(val) {} + + virtual bool Process() { + return input_mgr->SendEvent(name, num_vals, val); + } + +private: + const string name; + const int num_vals; + const Value* const *val; +}; + +class SendEntryMessage : public threading::OutputMessage { +public: + SendEntryMessage(ReaderFrontend* reader, const int id, const Value* const *val) + : threading::OutputMessage("SendEntry", reader), + id(id), val(val) {} + + virtual bool Process() { + input_mgr->SendEntry(Object(), id, val); + return true; + } + +private: + const int id; + const Value* const *val; +}; + +class EndCurrentSendMessage : public threading::OutputMessage { +public: + EndCurrentSendMessage(ReaderFrontend* reader, int id) + : threading::OutputMessage("SendEntry", reader), + id(id) {} + + virtual bool Process() { + input_mgr->EndCurrentSend(Object(), id); + return true; + } + +private: + int id; +}; ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { @@ -56,37 +135,44 @@ ReaderBackend::~ReaderBackend() void ReaderBackend::Error(const string &msg) { - SendOut(new ErrorMessage(frontend, msg); + SendOut(new ErrorMessage(frontend, msg)); } +/* +void ReaderBackend::Error(const char *msg) +{ + SendOut(new ErrorMessage(frontend, string(msg))); +} */ + + void ReaderBackend::Put(int id, const Value* const *val) { - SendOut(new PutMessage(frontend, id, val); + SendOut(new PutMessage(frontend, id, val)); } void ReaderBackend::Delete(int id, const Value* const *val) { - SendOut(new DeleteMessage(frontend, id, val); + SendOut(new DeleteMessage(frontend, id, val)); } void ReaderBackend::Clear(int id) { - SendOut(new ClearMessage(frontend, id); + SendOut(new ClearMessage(frontend, id)); } -bool ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals) +void ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals) { - SendOut(new SendEventMessage(frontend, name, num_vals, vals); + SendOut(new SendEventMessage(frontend, name, num_vals, vals)); } void ReaderBackend::EndCurrentSend(int id) { - SendOut(new EndCurrentSendMessage(frontent, id); + SendOut(new EndCurrentSendMessage(frontend, id)); } void ReaderBackend::SendEntry(int id, const Value* const *vals) { - SendOut(new SendEntryMessage(frontend, id, vals); + SendOut(new SendEntryMessage(frontend, id, vals)); } bool ReaderBackend::Init(string arg_source) diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 1fe44a09b2..a37daaf4b6 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -42,7 +42,7 @@ protected: // Reports an error to the user. void Error(const string &msg); - void Error(const char *msg); + //void Error(const char *msg); // The following methods return the information as passed to Init(). const string Source() const { return source; } @@ -50,7 +50,7 @@ protected: // A thread-safe version of fmt(). (stolen from logwriter) const char* Fmt(const char* format, ...); - bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); + void SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table void Put(int id, const threading::Value* const *val); diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index 44638d90b3..a7f9a4d2f6 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -1,28 +1,117 @@ // See the file "COPYING" in the main distribution directory for copyright. -#ifndef INPUT_READERFRONTEND_H -#define INPUT_READERFRONTEND_H - #include "Manager.h" - +#include "ReaderFrontend.h" +#include "ReaderBackend.h" #include "threading/MsgThread.h" -namespace logging { - -class ReaderBackend; - -class ReaderFrontend { - - ReaderFrontend(bro_int_t type); - - virtual ~ReaderFrontend(); +namespace input { -protected: - friend class Manager; +class InitMessage : public threading::InputMessage +{ +public: + InitMessage(ReaderBackend* backend, const string source) + : threading::InputMessage("Init", backend), + source(source) { } + + virtual bool Process() { return Object()->Init(source); } + +private: + const string source; }; +class UpdateMessage : public threading::InputMessage +{ +public: + UpdateMessage(ReaderBackend* backend) + : threading::InputMessage("Update", backend) + { } + + virtual bool Process() { return Object()->Update(); } +}; + +class FinishMessage : public threading::InputMessage +{ +public: + FinishMessage(ReaderBackend* backend) + : threading::InputMessage("Finish", backend) + { } + + virtual bool Process() { Object()->Finish(); return true; } +}; + +class AddFilterMessage : public threading::InputMessage +{ +public: + AddFilterMessage(ReaderBackend* backend, const int id, const int num_fields, const threading::Field* const* fields) + : threading::InputMessage("AddFilter", backend), + id(id), num_fields(num_fields), fields(fields) { } + + virtual bool Process() { return Object()->AddFilter(id, num_fields, fields); } + +private: + const int id; + const int num_fields; + const threading::Field* const* fields; +}; + +ReaderFrontend::ReaderFrontend(bro_int_t type) { + disabled = initialized = false; + ty_name = ""; + backend = input_mgr->CreateBackend(this, type); + + assert(backend); + backend->Start(); +} + +ReaderFrontend::~ReaderFrontend() { +} + +void ReaderFrontend::Init(string arg_source) { + if ( disabled ) + return; + + if ( initialized ) + reporter->InternalError("writer initialize twice"); + + source = arg_source; + initialized = true; + + backend->SendIn(new InitMessage(backend, arg_source)); +} + +void ReaderFrontend::Update() { + if ( disabled ) + return; + + backend->SendIn(new UpdateMessage(backend)); +} + +void ReaderFrontend::Finish() { + if ( disabled ) + return; + + backend->SendIn(new FinishMessage(backend)); +} + +void ReaderFrontend::AddFilter(const int id, const int arg_num_fields, const threading::Field* const* fields) { + if ( disabled ) + return; + + backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields)); +} + +string ReaderFrontend::Name() const + { + if ( source.size() ) + return ty_name; + + return ty_name + "/" + source; + } + + + } -#endif /* INPUT_READERFRONTEND_H */ diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 984ba30794..876082d9a6 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -3,13 +3,12 @@ #ifndef INPUT_READERFRONTEND_H #define INPUT_READERFRONTEND_H -#include "Manager.h" +#include "../threading/MsgThread.h" +#include "../threading/SerializationTypes.h" -#include "threading/MsgThread.h" +namespace input { -namespace input { - -class ReaderBackend; +class Manager; class ReaderFrontend { public: @@ -21,7 +20,7 @@ public: void Update(); - void AddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); + void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields ); void Finish(); @@ -32,17 +31,19 @@ public: * This method is safe to call from any thread. */ string Name() const; - protected: friend class Manager; - const string Source() const { return source; } + const string Source() const { return source; }; string ty_name; // Name of the backend type. Set by the manager. private: + ReaderBackend* backend; // The backend we have instanatiated. string source; + bool disabled; // True if disabled. + bool initialized; // True if initialized. }; diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 257cb4cf71..e798f69a36 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -1,11 +1,17 @@ // See the file "COPYING" in the main distribution directory for copyright. -#include "InputReaderAscii.h" -#include "DebugLogger.h" +#include "Ascii.h" #include "NetVar.h" +#include #include +#include "../../threading/SerializationTypes.h" + +using namespace input::reader; +using threading::Value; +using threading::Field; + FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) : name(arg_name), type(arg_type) { @@ -31,7 +37,7 @@ FieldMapping FieldMapping::subType() { return FieldMapping(name, subtype, position); } -InputReaderAscii::InputReaderAscii() +Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) { file = 0; @@ -53,13 +59,13 @@ InputReaderAscii::InputReaderAscii() } -InputReaderAscii::~InputReaderAscii() +Ascii::~Ascii() { DoFinish(); } -void InputReaderAscii::DoFinish() +void Ascii::DoFinish() { filters.empty(); if ( file != 0 ) { @@ -69,7 +75,7 @@ void InputReaderAscii::DoFinish() } } -bool InputReaderAscii::DoInit(string path) +bool Ascii::DoInit(string path) { fname = path; @@ -82,7 +88,7 @@ bool InputReaderAscii::DoInit(string path) return true; } -bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) { +bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) { if ( HasFilter(id) ) { return false; // no, we don't want to add this a second time } @@ -96,7 +102,7 @@ bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* return true; } -bool InputReaderAscii::DoRemoveFilter ( int id ) { +bool Ascii::DoRemoveFilter ( int id ) { if (!HasFilter(id) ) { return false; } @@ -107,7 +113,7 @@ bool InputReaderAscii::DoRemoveFilter ( int id ) { } -bool InputReaderAscii::HasFilter(int id) { +bool Ascii::HasFilter(int id) { map::iterator it = filters.find(id); if ( it == filters.end() ) { return false; @@ -116,7 +122,7 @@ bool InputReaderAscii::HasFilter(int id) { } -bool InputReaderAscii::ReadHeader() { +bool Ascii::ReadHeader() { // try to read the header line... string line; if ( !GetLine(line) ) { @@ -142,7 +148,7 @@ bool InputReaderAscii::ReadHeader() { for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { - const LogField* field = (*it).second.fields[i]; + const Field* field = (*it).second.fields[i]; map::iterator fit = fields.find(field->name); if ( fit == fields.end() ) { @@ -169,7 +175,7 @@ bool InputReaderAscii::ReadHeader() { return true; } -bool InputReaderAscii::GetLine(string& str) { +bool Ascii::GetLine(string& str) { while ( getline(*file, str) ) { if ( str[0] != '#' ) { return true; @@ -184,7 +190,7 @@ bool InputReaderAscii::GetLine(string& str) { return false; } -TransportProto InputReaderAscii::StringToProto(const string &proto) { +TransportProto Ascii::StringToProto(const string &proto) { if ( proto == "unknown" ) { return TRANSPORT_UNKNOWN; } else if ( proto == "tcp" ) { @@ -202,12 +208,12 @@ TransportProto InputReaderAscii::StringToProto(const string &proto) { return TRANSPORT_UNKNOWN; } -LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) { +Value* Ascii::EntryToVal(string s, FieldMapping field) { - LogVal* val = new LogVal(field.type, true); + Value* val = new Value(field.type, true); if ( s.compare(unset_field) == 0 ) { // field is not set... - return new LogVal(field.type, false); + return new Value(field.type, false); } switch ( field.type ) { @@ -306,7 +312,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) { if ( s.compare(empty_field) == 0 ) length = 0; - LogVal** lvals = new LogVal* [length]; + Value** lvals = new Value* [length]; if ( field.type == TYPE_TABLE ) { val->val.set_val.vals = lvals; @@ -333,7 +339,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) { break; } - LogVal* newval = EntryToVal(element, field.subType()); + Value* newval = EntryToVal(element, field.subType()); if ( newval == 0 ) { Error("Error while reading set"); return 0; @@ -365,7 +371,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) { } // read the entire file and send appropriate thingies back to InputMgr -bool InputReaderAscii::DoUpdate() { +bool Ascii::DoUpdate() { // dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad) if ( file && file->is_open() ) { @@ -405,7 +411,7 @@ bool InputReaderAscii::DoUpdate() { for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { - LogVal** fields = new LogVal*[(*it).second.num_fields]; + Value** fields = new Value*[(*it).second.num_fields]; int fpos = 0; for ( vector::iterator fit = (*it).second.columnMap.begin(); @@ -417,7 +423,7 @@ bool InputReaderAscii::DoUpdate() { return false; } - LogVal* val = EntryToVal(stringfields[(*fit).position], *fit); + Value* val = EntryToVal(stringfields[(*fit).position], *fit); if ( val == 0 ) { return false; }