diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 24bc464daf..a97286162d 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -3,18 +3,20 @@ #include #include "Manager.h" +#include "ReaderFrontend.h" +#include "ReaderBackend.h" +#include "readers/Ascii.h" + #include "Event.h" #include "EventHandler.h" #include "NetVar.h" #include "Net.h" -#include "InputReader.h" - -#include "InputReaderAscii.h" - #include "CompHash.h" +#include "../threading/SerializationTypes.h" + using namespace input; using threading::Value; using threading::Field; @@ -99,7 +101,7 @@ Manager::TableFilter::~TableFilter() { struct Manager::ReaderInfo { EnumVal* id; EnumVal* type; - InputReader* reader; + ReaderFrontend* reader; //list events; // events we fire when "something" happens map filters; // filters that can prevent our actions @@ -132,38 +134,27 @@ bool Manager::ReaderInfo::HasFilter(int id) { } -struct InputReaderDefinition { +struct ReaderDefinition { bro_int_t type; // the type const char *name; // descriptive name for error messages bool (*init)(); // optional one-time inifializing function - InputReader* (*factory)(); // factory function for creating instances + ReaderBackend* (*factory)(ReaderFrontend* frontend); // factory function for creating instances }; -InputReaderDefinition input_readers[] = { - { BifEnum::Input::READER_ASCII, "Ascii", 0, InputReaderAscii::Instantiate }, +ReaderDefinition input_readers[] = { + { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, // End marker - { BifEnum::Input::READER_DEFAULT, "None", 0, (InputReader* (*)())0 } + { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } }; Manager::Manager() { } -// create a new input reader object to be used at whomevers leisure lateron. -InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description) -{ - InputReaderDefinition* ir = input_readers; - - RecordType* rtype = description->Type()->AsRecordType(); - if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) ) - { - reporter->Error("Streamdescription argument not of right type"); - return 0; - } +ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) { + ReaderDefinition* ir = input_readers; - EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); - while ( true ) { if ( ir->type == BifEnum::Input::READER_DEFAULT ) { @@ -171,7 +162,7 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description) return 0; } - if ( ir->type != reader->AsEnum() ) { + if ( ir->type != type ) { // no, didn't find the right one... ++ir; continue; @@ -201,9 +192,30 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description) // all done. break. break; } - assert(ir->factory); - InputReader* reader_obj = (*ir->factory)(); + + ReaderBackend* backend = (*ir->factory)(frontend); + assert(backend); + + frontend->ty_name = ir->name; + return backend; +} + +// create a new input reader object to be used at whomevers leisure lateron. +ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) +{ + ReaderDefinition* ir = input_readers; + + RecordType* rtype = description->Type()->AsRecordType(); + if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) ) + { + reporter->Error("Streamdescription argument not of right type"); + return 0; + } + + EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); + + ReaderFrontend* reader_obj = new ReaderFrontend(id->AsEnum()); assert(reader_obj); // get the source... @@ -217,16 +229,16 @@ InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description) readers.push_back(info); - int success = reader_obj->Init(source); - if ( success == false ) { + reader_obj->Init(source); + /* if ( success == false ) { assert( RemoveStream(id) ); return 0; - } - success = reader_obj->Update(); - if ( success == false ) { + } */ + reader_obj->Update(); + /* if ( success == false ) { assert ( RemoveStream(id) ); return 0; - } + } */ return reader_obj; @@ -306,7 +318,7 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) { } - vector fieldsV; // vector, because UnrollRecordType needs it + vector fieldsV; // vector, because UnrollRecordType needs it bool status = !UnrollRecordType(&fieldsV, fields, ""); @@ -316,7 +328,7 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) { } - LogField** logf = new LogField*[fieldsV.size()]; + Field** logf = new Field*[fieldsV.size()]; for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { logf[i] = fieldsV[i]; } @@ -410,7 +422,7 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) { } - vector fieldsV; // vector, because we don't know the length beforehands + vector fieldsV; // vector, because we don't know the length beforehands bool status = !UnrollRecordType(&fieldsV, idx, ""); @@ -430,7 +442,7 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) { } - LogField** fields = new LogField*[fieldsV.size()]; + Field** fields = new Field*[fieldsV.size()]; for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { fields[i] = fieldsV[i]; } @@ -538,12 +550,12 @@ bool Manager::RemoveStream(const EnumVal* id) { return true; } -bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { +bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { for ( int i = 0; i < rec->NumFields(); i++ ) { if ( !IsCompatibleType(rec->FieldType(i)) ) { - reporter->Error("Incompatible type \"%s\" in table definition for InputReader", type_name(rec->FieldType(i)->Tag())); + reporter->Error("Incompatible type \"%s\" in table definition for ReaderFrontend", type_name(rec->FieldType(i)->Tag())); return false; } @@ -557,7 +569,7 @@ bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, } } else { - LogField* field = new LogField(); + Field* field = new Field(); field->name = nameprepend + rec->FieldName(i); field->type = rec->FieldType(i)->Tag(); if ( field->type == TYPE_TABLE ) { @@ -591,7 +603,9 @@ bool Manager::ForceUpdate(const EnumVal* id) return false; } - return i->reader->Update(); + i->reader->Update(); + + return true; // update is async :( } bool Manager::RemoveTableFilter(EnumVal* id, const string &name) { @@ -638,21 +652,21 @@ bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { return true; } -Val* Manager::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) { +Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) { Val* idxval; int position = 0; if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) { - idxval = LogValToVal(vals[0], type->FieldType(0)); + idxval = ValueToVal(vals[0], type->FieldType(0)); position = 1; } else { ListVal *l = new ListVal(TYPE_ANY); for ( int j = 0 ; j < type->NumFields(); j++ ) { if ( type->FieldType(j)->Tag() == TYPE_RECORD ) { - l->Append(LogValToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position)); + l->Append(ValueToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position)); } else { - l->Append(LogValToVal(vals[position], type->FieldType(j))); + l->Append(ValueToVal(vals[position], type->FieldType(j))); position++; } } @@ -666,7 +680,7 @@ Val* Manager::LogValToIndexVal(int num_fields, const RecordType *type, const Log } -void Manager::SendEntry(const InputReader* reader, int id, const LogVal* const *vals) { +void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -689,7 +703,7 @@ void Manager::SendEntry(const InputReader* reader, int id, const LogVal* const * } -void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* const *vals) { +void Manager::SendEntryTable(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); bool updated = false; @@ -701,12 +715,12 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co TableFilter* filter = (TableFilter*) i->filters[id]; //reporter->Error("Hashing %d index fields", i->num_idx_fields); - HashKey* idxhash = HashLogVals(filter->num_idx_fields, vals); + HashKey* idxhash = HashValues(filter->num_idx_fields, vals); //reporter->Error("Result: %d", (uint64_t) idxhash->Hash()); //reporter->Error("Hashing %d val fields", i->num_val_fields); HashKey* valhash = 0; if ( filter->num_val_fields > 0 ) - HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields); + HashValues(filter->num_val_fields, vals+filter->num_idx_fields); //reporter->Error("Result: %d", (uint64_t) valhash->Hash()); @@ -731,16 +745,16 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co } - Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* valval; int position = filter->num_idx_fields; if ( filter->num_val_fields == 0 ) { valval = 0; } else if ( filter->num_val_fields == 1 && !filter->want_record ) { - valval = LogValToVal(vals[position], filter->rtype->FieldType(0)); + valval = ValueToVal(vals[position], filter->rtype->FieldType(0)); } else { - valval = LogValToRecordVal(vals, filter->rtype, &position); + valval = ValueToRecordVal(vals, filter->rtype, &position); } @@ -757,7 +771,7 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co EnumVal* ev; //Ref(idxval); int startpos = 0; - Val* predidx = LogValToRecordVal(vals, filter->itype, &startpos); + Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); Ref(valval); if ( updated ) { @@ -831,7 +845,7 @@ void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* co } -void Manager::EndCurrentSend(const InputReader* reader, int id) { +void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -914,7 +928,7 @@ void Manager::EndCurrentSend(const InputReader* reader, int id) { filter->currDict = new PDict(InputHash); } -void Manager::Put(const InputReader* reader, int id, const LogVal* const *vals) { +void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -937,7 +951,7 @@ void Manager::Put(const InputReader* reader, int id, const LogVal* const *vals) } -void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int id, const LogVal* const *vals) { +void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); bool updated = false; @@ -956,15 +970,15 @@ void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int int position = 0; if ( filter->want_record ) { - RecordVal * r = LogValToRecordVal(vals, filter->fields, &position); + RecordVal * r = ValueToRecordVal(vals, filter->fields, &position); out_vals.push_back(r); } else { for ( int j = 0; j < filter->fields->NumFields(); j++) { Val* val = 0; if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) { - val = LogValToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position); + val = ValueToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position); } else { - val = LogValToVal(vals[position], filter->fields->FieldType(j)); + val = ValueToVal(vals[position], filter->fields->FieldType(j)); position++; } out_vals.push_back(val); @@ -975,7 +989,7 @@ void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int } -void Manager::PutTable(const InputReader* reader, int id, const LogVal* const *vals) { +void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); assert(i); @@ -984,22 +998,22 @@ void Manager::PutTable(const InputReader* reader, int id, const LogVal* const *v assert(i->filters[id]->filter_type == TABLE_FILTER); TableFilter* filter = (TableFilter*) i->filters[id]; - Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* valval; int position = filter->num_idx_fields; if ( filter->num_val_fields == 0 ) { valval = 0; } else if ( filter->num_val_fields == 1 && !filter->want_record ) { - valval = LogValToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields)); + valval = ValueToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields)); } else { - valval = LogValToRecordVal(vals, filter->rtype, &position); + valval = ValueToRecordVal(vals, filter->rtype, &position); } filter->tab->Assign(idxval, valval); } -void Manager::Clear(const InputReader* reader, int id) { +void Manager::Clear(const ReaderFrontend* reader, int id) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -1014,7 +1028,7 @@ void Manager::Clear(const InputReader* reader, int id) { filter->tab->RemoveAll(); } -bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *vals) { +bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -1025,7 +1039,7 @@ bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *val if ( i->filters[id]->filter_type == TABLE_FILTER ) { TableFilter* filter = (TableFilter*) i->filters[id]; - Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); return( filter->tab->Delete(idxval) != 0 ); } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); @@ -1037,12 +1051,12 @@ bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *val } } -void Manager::Error(InputReader* reader, const char* msg) +void Manager::Error(ReaderFrontend* reader, const char* msg) { reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg); } -bool Manager::SendEvent(const string& name, const int num_vals, const LogVal* const *vals) +bool Manager::SendEvent(const string& name, const int num_vals, const Value* const *vals) { EventHandler* handler = event_registry->Lookup(name.c_str()); if ( handler == 0 ) { @@ -1059,7 +1073,7 @@ bool Manager::SendEvent(const string& name, const int num_vals, const LogVal* co val_list* vl = new val_list; for ( int i = 0; i < num_vals; i++) { - vl->append(LogValToVal(vals[i], type->FieldType(i))); + vl->append(ValueToVal(vals[i], type->FieldType(i))); } mgr.Dispatch(new Event(handler, vl)); @@ -1118,7 +1132,7 @@ RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, -RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) { +RecordVal* Manager::ValueToRecordVal(const Value* const *vals, RecordType *request_type, int* position) { if ( position == 0 ) { reporter->InternalError("Need position"); return 0; @@ -1136,9 +1150,9 @@ RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *req Val* fieldVal = 0; if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) { - fieldVal = LogValToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); + fieldVal = ValueToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); } else { - fieldVal = LogValToVal(vals[*position], request_type->FieldType(i)); + fieldVal = ValueToVal(vals[*position], request_type->FieldType(i)); (*position)++; } @@ -1150,7 +1164,7 @@ RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *req } -int Manager::GetLogValLength(const LogVal* val) { +int Manager::GetValueLength(const Value* val) { int length = 0; switch (val->type) { @@ -1193,7 +1207,7 @@ int Manager::GetLogValLength(const LogVal* val) { case TYPE_TABLE: { for ( int i = 0; i < val->val.set_val.size; i++ ) { - length += GetLogValLength(val->val.set_val.vals[i]); + length += GetValueLength(val->val.set_val.vals[i]); } break; } @@ -1201,20 +1215,20 @@ int Manager::GetLogValLength(const LogVal* val) { case TYPE_VECTOR: { int j = val->val.vector_val.size; for ( int i = 0; i < j; i++ ) { - length += GetLogValLength(val->val.vector_val.vals[i]); + length += GetValueLength(val->val.vector_val.vals[i]); } break; } default: - reporter->InternalError("unsupported type %d for GetLogValLength", val->type); + reporter->InternalError("unsupported type %d for GetValueLength", val->type); } return length; } -int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) { +int Manager::CopyValue(char *data, const int startpos, const Value* val) { switch ( val->type ) { case TYPE_BOOL: case TYPE_INT: @@ -1276,7 +1290,7 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) { case TYPE_TABLE: { int length = 0; for ( int i = 0; i < val->val.set_val.size; i++ ) { - length += CopyLogVal(data, startpos+length, val->val.set_val.vals[i]); + length += CopyValue(data, startpos+length, val->val.set_val.vals[i]); } return length; break; @@ -1286,14 +1300,14 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) { int length = 0; int j = val->val.vector_val.size; for ( int i = 0; i < j; i++ ) { - length += CopyLogVal(data, startpos+length, val->val.vector_val.vals[i]); + length += CopyValue(data, startpos+length, val->val.vector_val.vals[i]); } return length; break; } default: - reporter->InternalError("unsupported type %d for CopyLogVal", val->type); + reporter->InternalError("unsupported type %d for CopyValue", val->type); return 0; } @@ -1302,12 +1316,12 @@ int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) { } -HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals) { +HashKey* Manager::HashValues(const int num_elements, const Value* const *vals) { int length = 0; for ( int i = 0; i < num_elements; i++ ) { - const LogVal* val = vals[i]; - length += GetLogValLength(val); + const Value* val = vals[i]; + length += GetValueLength(val); } //reporter->Error("Length: %d", length); @@ -1318,8 +1332,8 @@ HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals) reporter->InternalError("Could not malloc?"); } for ( int i = 0; i < num_elements; i++ ) { - const LogVal* val = vals[i]; - position += CopyLogVal(data, position, val); + const Value* val = vals[i]; + position += CopyValue(data, position, val); } assert(position == length); @@ -1328,7 +1342,7 @@ HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals) } -Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) { +Val* Manager::ValueToVal(const Value* val, BroType* request_type) { if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { reporter->InternalError("Typetags don't match: %d vs %d", request_type->Tag(), val->type); @@ -1384,7 +1398,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) { SetType* s = new SetType(set_index, 0); TableVal* t = new TableVal(s); for ( int i = 0; i < val->val.set_val.size; i++ ) { - t->Assign(LogValToVal( val->val.set_val.vals[i], type ), 0); + t->Assign(ValueToVal( val->val.set_val.vals[i], type ), 0); } return t; break; @@ -1396,7 +1410,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) { VectorType* vt = new VectorType(type->Ref()); VectorVal* v = new VectorVal(vt); for ( int i = 0; i < val->val.vector_val.size; i++ ) { - v->Assign(i, LogValToVal( val->val.set_val.vals[i], type ), 0); + v->Assign(i, ValueToVal( val->val.set_val.vals[i], type ), 0); } return v; @@ -1425,7 +1439,7 @@ Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) { return NULL; } -Manager::ReaderInfo* Manager::FindReader(const InputReader* reader) +Manager::ReaderInfo* Manager::FindReader(const ReaderFrontend* reader) { for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) { @@ -1460,4 +1474,3 @@ string Manager::Hash(const string &input) { return out; } - diff --git a/src/input/Manager.h b/src/input/Manager.h index fe37efa08b..507af6468f 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -14,6 +14,7 @@ namespace input { class ReaderFrontend; +class ReaderBackend; class Manager { public: @@ -30,7 +31,8 @@ public: bool RemoveEventFilter(EnumVal* id, const string &name); protected: - + friend class ReaderFrontend; + // Reports an error for the given reader. void Error(ReaderFrontend* reader, const char* msg); @@ -42,6 +44,8 @@ protected: // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals); void EndCurrentSend(const ReaderFrontend* reader, int id); + + ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); private: struct ReaderInfo; diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index b2bcedb2ad..8c996db4a1 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -1,13 +1,44 @@ // See the file "COPYING" in the main distribution directory for copyright. -#include "InputReader.h" +#include "ReaderBackend.h" +#include "ReaderFrontend.h" using threading::Value; using threading::Field; -namespace logging { +namespace input { -InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread() +class ErrorMessage : public threading::OutputMessage { +public: + ErrorMessage(ReaderFrontend* reader, string message) + : threading::OutputMessage("Error", reader), + message(message) {} + + virtual bool Process() { + input_mgr->Error(object, message.c_str()); + return true; + } + +private: + string message; +} + +class PutMessage : public threading::OutputMessage { +public: + PutMessage(ReaderFrontend* reader, int id, const Value* const *val) + : threading::OutputMessage("Error", reader), + id(id), val(val) {} + + virtual bool Process() { + return input_mgr->Put(object, id, val); + } + +private: + int id; + Value* val; +} + +ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { buf = 0; buf_len = 1024; @@ -18,38 +49,47 @@ InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread() SetName(frontend->Name()); } -InputReader::~InputReader() +ReaderBackend::~ReaderBackend() { } -void InputReader::Error(const char *msg) +void ReaderBackend::Error(const string &msg) { - input_mgr->Error(this, msg); + SendOut(new ErrorMessage(frontend, msg); } -void InputReader::Error(const string &msg) +void ReaderBackend::Put(int id, const Value* const *val) { - input_mgr->Error(this, msg.c_str()); + SendOut(new PutMessage(frontend, id, val); } -void InputReader::Put(int id, const LogVal* const *val) +void ReaderBackend::Delete(int id, const Value* const *val) { - input_mgr->Put(this, id, val); + SendOut(new DeleteMessage(frontend, id, val); } -void InputReader::Clear(int id) +void ReaderBackend::Clear(int id) { - input_mgr->Clear(this, id); + SendOut(new ClearMessage(frontend, id); } -void InputReader::Delete(int id, const LogVal* const *val) +bool ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals) { - input_mgr->Delete(this, id, val); + SendOut(new SendEventMessage(frontend, name, num_vals, vals); +} + +void ReaderBackend::EndCurrentSend(int id) +{ + SendOut(new EndCurrentSendMessage(frontent, id); } +void ReaderBackend::SendEntry(int id, const Value* const *vals) +{ + SendOut(new SendEntryMessage(frontend, id, vals); +} -bool InputReader::Init(string arg_source) +bool ReaderBackend::Init(string arg_source) { source = arg_source; @@ -58,35 +98,31 @@ bool InputReader::Init(string arg_source) return !disabled; } -bool InputReader::AddFilter(int id, int arg_num_fields, - const LogField* const * arg_fields) +bool ReaderBackend::AddFilter(int id, int arg_num_fields, + const Field* const * arg_fields) { return DoAddFilter(id, arg_num_fields, arg_fields); } -bool InputReader::RemoveFilter(int id) +bool ReaderBackend::RemoveFilter(int id) { return DoRemoveFilter(id); } -void InputReader::Finish() +void ReaderBackend::Finish() { DoFinish(); disabled = true; } -bool InputReader::Update() +bool ReaderBackend::Update() { return DoUpdate(); } -bool InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals) -{ - return input_mgr->SendEvent(name, num_vals, vals); -} // stolen from logwriter -const char* InputReader::Fmt(const char* format, ...) +const char* ReaderBackend::Fmt(const char* format, ...) { if ( ! buf ) buf = (char*) malloc(buf_len); @@ -111,14 +147,5 @@ const char* InputReader::Fmt(const char* format, ...) } -void InputReader::SendEntry(int id, const LogVal* const *vals) -{ - input_mgr->SendEntry(this, id, vals); -} - -void InputReader::EndCurrentSend(int id) -{ - input_mgr->EndCurrentSend(this, id); -} } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 7d2640b4fd..1fe44a09b2 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -1,26 +1,25 @@ // See the file "COPYING" in the main distribution directory for copyright. -// -// Same notes about thread safety as in LogWriter.h apply. - #ifndef INPUT_READERBACKEND_H #define INPUT_READERBACKEND_H -#include "InputMgr.h" #include "BroString.h" -#include "LogMgr.h" +#include "../threading/SerializationTypes.h" +#include "threading/MsgThread.h" namespace input { +class ReaderFrontend; + class ReaderBackend : public threading::MsgThread { public: - ReaderBackend(ReaderFrontend *frontend); + ReaderBackend(ReaderFrontend* frontend); virtual ~ReaderBackend(); bool Init(string arg_source); - bool AddFilter( int id, int arg_num_fields, const LogField* const* fields ); + bool AddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); bool RemoveFilter ( int id ); @@ -32,7 +31,7 @@ protected: // Methods that have to be overwritten by the individual readers virtual bool DoInit(string arg_sources) = 0; - virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0; + virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ) = 0; virtual bool DoRemoveFilter( int id ) = 0; @@ -51,15 +50,15 @@ protected: // A thread-safe version of fmt(). (stolen from logwriter) const char* Fmt(const char* format, ...); - bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals); + bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table - void Put(int id, const LogVal* const *val); - void Delete(int id, const LogVal* const *val); + void Put(int id, const threading::Value* const *val); + void Delete(int id, const threading::Value* const *val); void Clear(int id); // Table-functions (tracking mode): Only changed lines are propagated. - void SendEntry(int id, const LogVal* const *vals); + void SendEntry(int id, const threading::Value* const *vals); void EndCurrentSend(int id); diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index e69de29bb2..984ba30794 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -0,0 +1,54 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef INPUT_READERFRONTEND_H +#define INPUT_READERFRONTEND_H + +#include "Manager.h" + +#include "threading/MsgThread.h" + +namespace input { + +class ReaderBackend; + +class ReaderFrontend { +public: + ReaderFrontend(bro_int_t type); + + virtual ~ReaderFrontend(); + + void Init(string arg_source); + + void Update(); + + void AddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); + + void Finish(); + + /** + * Returns a descriptive name for the reader, including the type of + * the backend and the source used. + * + * This method is safe to call from any thread. + */ + string Name() const; + + +protected: + friend class Manager; + + const string Source() const { return source; } + + string ty_name; // Name of the backend type. Set by the manager. + +private: + string source; + +}; + +} + + +#endif /* INPUT_READERFRONTEND_H */ + + diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index 1747f983e4..a3bf5c21a6 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -1,13 +1,15 @@ // See the file "COPYING" in the main distribution directory for copyright. -#ifndef INPUTREADERASCII_H -#define INPUTREADERASCII_H +#ifndef INPUT_READERS_ASCII_H +#define INPUT_READERS_ASCII_H -#include "InputReader.h" -#include #include #include +#include "../ReaderBackend.h" + +namespace input { namespace reader { + // Description for input field mapping struct FieldMapping { string name; @@ -28,18 +30,18 @@ struct FieldMapping { }; -class InputReaderAscii : public InputReader { +class Ascii : public ReaderBackend { public: - InputReaderAscii(); - ~InputReaderAscii(); + Ascii(ReaderFrontend* frontend); + ~Ascii(); - static InputReader* Instantiate() { return new InputReaderAscii; } + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Ascii(frontend); } protected: virtual bool DoInit(string path); - virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ); + virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ); virtual bool DoRemoveFilter ( int id ); @@ -52,7 +54,7 @@ private: struct Filter { unsigned int num_fields; - const LogField* const * fields; // raw mapping + const threading::Field* const * fields; // raw mapping // map columns in the file to columns to send back to the manager vector columnMap; @@ -64,7 +66,7 @@ private: TransportProto StringToProto(const string &proto); bool ReadHeader(); - LogVal* EntryToVal(string s, FieldMapping type); + threading::Value* EntryToVal(string s, FieldMapping type); bool GetLine(string& str); @@ -85,4 +87,7 @@ private: }; -#endif /* INPUTREADERASCII_H */ +} +} + +#endif /* INPUT_READERS_ASCII_H */ diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index add10b3f10..65a55dee02 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -14,7 +14,7 @@ #include "writers/Ascii.h" #include "writers/None.h" -#include "threading/SerializationTypes.h" +#include "../threading/SerializationTypes.h" using namespace logging; using threading::Value; diff --git a/src/threading/SerializationTypes.cc b/src/threading/SerializationTypes.cc index f74de6ce57..dc5a1a14f9 100644 --- a/src/threading/SerializationTypes.cc +++ b/src/threading/SerializationTypes.cc @@ -12,7 +12,8 @@ bool Field::Read(SerializationFormat* fmt) int t; int st; - bool success = (fmt->Read(&name, "name") && fmt->Read(&t, "type") && fmt->Read(&st, "subtype") ); + bool success = (fmt->Read(&name, "name") && fmt->Read(&secondary_name, "secondary_name") && + fmt->Read(&t, "type") && fmt->Read(&st, "subtype") ); type = (TypeTag) t; subtype = (TypeTag) st; @@ -21,7 +22,8 @@ bool Field::Read(SerializationFormat* fmt) bool Field::Write(SerializationFormat* fmt) const { - return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); + return (fmt->Write(name, "name") && fmt->Write(secondary_name, "secondary_name") && fmt->Write((int)type, "type") && + fmt->Write((int)subtype, "subtype")); } Value::~Value() diff --git a/src/threading/SerializationTypes.h b/src/threading/SerializationTypes.h index 11ceda929c..ffcf774842 100644 --- a/src/threading/SerializationTypes.h +++ b/src/threading/SerializationTypes.h @@ -13,6 +13,8 @@ namespace threading { */ struct Field { string name; //! Name of the field. + // needed by input framework. port fields have two names (one for the port, one for the type) - this specifies the secondary name. + string secondary_name; TypeTag type; //! Type of the field. TypeTag subtype; //! Inner type for sets.