diff --git a/src/InputMgr.cc b/src/InputMgr.cc index 648a933a22..5270c83d5a 100644 --- a/src/InputMgr.cc +++ b/src/InputMgr.cc @@ -13,6 +13,18 @@ #include "InputReaderAscii.h" +#include "CompHash.h" + + +class InputHash { +public: + HashKey* valhash; + HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal. +}; + +declare(PDict, InputHash); + + struct InputMgr::ReaderInfo { EnumVal* id; EnumVal* type; @@ -24,6 +36,9 @@ struct InputMgr::ReaderInfo { RecordType* rtype; RecordType* itype; + PDict(InputHash)* currDict; + PDict(InputHash)* lastDict; + }; struct InputReaderDefinition { @@ -147,10 +162,20 @@ InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description) info->itype = idx; Ref(idx); readers.push_back(info); + info->currDict = new PDict(InputHash); + info->lastDict = new PDict(InputHash); - reader_obj->Init(source, fieldsV.size(), fields); - reader_obj->Update(); + int success = reader_obj->Init(source, fieldsV.size(), idxfields, fields); + if ( success == false ) { + RemoveReader(id); + return 0; + } + success = reader_obj->Update(); + if ( success == false ) { + RemoveReader(id); + return 0; + } return reader_obj; @@ -198,7 +223,6 @@ bool InputMgr::IsCompatibleType(BroType* t) return false; } - bool InputMgr::RemoveReader(EnumVal* id) { ReaderInfo *i = 0; for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) @@ -215,6 +239,9 @@ bool InputMgr::RemoveReader(EnumVal* id) { return false; // not found } + i->reader->Finish(); + + Unref(i->type); Unref(i->tab); Unref(i->itype); @@ -267,8 +294,7 @@ bool InputMgr::ForceUpdate(EnumVal* id) return false; } - i->reader->Update(); - return true; + return i->reader->Update(); } Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) { @@ -297,6 +323,105 @@ Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const Lo } + +void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + HashKey* idxhash = HashLogVals(i->num_idx_fields, vals); + HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields); + + InputHash *h = i->lastDict->Lookup(idxhash); + if ( h != 0 ) { + // seen before + if ( h->valhash->Hash() == valhash->Hash() ) { + // ok, double. + i->lastDict->Remove(idxhash); + i->currDict->Insert(idxhash, h); + return; + } else { + // updated + i->lastDict->Remove(idxhash); + delete(h); + } + } + + + Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals); + Val* valval; + + int position = i->num_idx_fields; + if ( i->num_val_fields == 1 ) { + valval = LogValToVal(vals[i->num_idx_fields]); + } else { + RecordVal * r = new RecordVal(i->rtype); + + /* if ( i->rtype->NumFields() != (int) i->num_val_fields ) { + reporter->InternalError("Type mismatch"); + return; + } */ + + for ( int j = 0; j < i->rtype->NumFields(); j++) { + + Val* val = 0; + if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) { + val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position); + } else { + val = LogValToVal(vals[position], i->rtype->FieldType(j)->Tag()); + position++; + } + + if ( val == 0 ) { + reporter->InternalError("conversion error"); + return; + } + + r->Assign(j,val); + + } + valval = r; + } + + //i->tab->Assign(idxval, valval); + HashKey* k = i->tab->ComputeHash(idxval); + if ( !k ) { + reporter->InternalError("could not hash"); + return; + } + + i->tab->Assign(idxval, k, valval); + InputHash* ih = new InputHash(); + ih->idxkey = k; + ih->valhash = valhash; + + i->currDict->Insert(idxhash, ih); + +} + +void InputMgr::EndCurrentSend(const InputReader* reader) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + // lastdict contains all deleted entries + IterCookie *c = i->lastDict->InitForIteration(); + InputHash* ih; + while ( ( ih = i->lastDict->NextEntry(c )) ) { + i->tab->Delete(ih->idxkey); + } + + i->lastDict->Clear(); + delete(i->lastDict); + + i->lastDict = i->currDict; + i->currDict = new PDict(InputHash); +} + void InputMgr::Put(const InputReader* reader, const LogVal* const *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { @@ -418,6 +543,95 @@ Val* InputMgr::LogValToRecordVal(const LogVal* const *vals, RecordType *request_ } +HashKey* InputMgr::HashLogVals(const int num_elements, const LogVal* const *vals) { + int length = 0; + + for ( int i = 0; i < num_elements; i++ ) { + const LogVal* val = vals[i]; + switch (val->type) { + case TYPE_BOOL: + case TYPE_INT: + length += sizeof(val->val.int_val); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + length += sizeof(val->val.uint_val); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + length += sizeof(val->val.double_val); + break; + + case TYPE_STRING: + { + length += val->val.string_val->size(); + break; + } + + case TYPE_ADDR: + length += NUM_ADDR_WORDS*sizeof(uint32_t); + break; + + default: + reporter->InternalError("unsupported type for hashlogvals"); + } + + } + + int position = 0; + char *data = (char*) malloc(length); + for ( int i = 0; i < num_elements; i++ ) { + const LogVal* val = vals[i]; + switch ( val->type ) { + case TYPE_BOOL: + case TYPE_INT: + *(data+position) = val->val.int_val; + position += sizeof(val->val.int_val); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + *(data+position) = val->val.uint_val; + position += sizeof(val->val.uint_val); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + *(data+position) = val->val.double_val; + position += sizeof(val->val.double_val); + break; + + case TYPE_STRING: + { + memcpy(data+position, val->val.string_val->c_str(), val->val.string_val->length()); + position += val->val.string_val->size(); + break; + } + + case TYPE_ADDR: + memcpy(data+position, val->val.addr_val, NUM_ADDR_WORDS*sizeof(uint32_t)); + position += NUM_ADDR_WORDS*sizeof(uint32_t); + break; + + default: + reporter->InternalError("unsupported type for hashlogvals2"); + } + + + } + + assert(position == length); + return new HashKey(data, length); + + +} + Val* InputMgr::LogValToVal(const LogVal* val, TypeTag request_type) { if ( request_type != TYPE_ANY && request_type != val->type ) { @@ -495,3 +709,11 @@ InputMgr::ReaderInfo* InputMgr::FindReader(const EnumVal* id) } +string InputMgr::Hash(const string &input) { + unsigned char digest[16]; + hash_md5(input.length(), (const unsigned char*) input.c_str(), digest); + string out((const char*) digest, 16); + return out; +} + + diff --git a/src/InputMgr.h b/src/InputMgr.h index d5f732935c..d147fa262a 100644 --- a/src/InputMgr.h +++ b/src/InputMgr.h @@ -15,9 +15,10 @@ class InputReader; + class InputMgr { public: - InputMgr(); + InputMgr(); InputReader* CreateReader(EnumVal* id, RecordVal* description); bool ForceUpdate(EnumVal* id); @@ -32,6 +33,9 @@ protected: void Put(const InputReader* reader, const LogVal* const *vals); void Clear(const InputReader* reader); bool Delete(const InputReader* reader, const LogVal* const *vals); + + void SendEntry(const InputReader* reader, const LogVal* const *vals); + void EndCurrentSend(const InputReader* reader); private: struct ReaderInfo; @@ -40,6 +44,8 @@ private: bool UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend); + HashKey* HashLogVals(const int num_elements, const LogVal* const *vals); + Val* LogValToVal(const LogVal* val, TypeTag request_type = TYPE_ANY); Val* LogValToIndexVal(int num_fields, const RecordType* type, const LogVal* const *vals); Val* LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position); @@ -49,6 +55,9 @@ private: ReaderInfo* FindReader(const EnumVal* id); vector readers; + + string Hash(const string &input); + }; extern InputMgr* input_mgr; diff --git a/src/InputReader.cc b/src/InputReader.cc index 1facc57c7f..494df3fb81 100644 --- a/src/InputReader.cc +++ b/src/InputReader.cc @@ -41,20 +41,22 @@ void InputReader::Delete(const LogVal* const *val) } -bool InputReader::Init(string arg_source, int arg_num_fields, +bool InputReader::Init(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const * arg_fields) { source = arg_source; num_fields = arg_num_fields; + index_fields = arg_idx_fields; fields = arg_fields; // disable if DoInit returns error. - disabled = !DoInit(arg_source, arg_num_fields, arg_fields); + disabled = !DoInit(arg_source, arg_num_fields, arg_idx_fields, arg_fields); return !disabled; } void InputReader::Finish() { DoFinish(); + disabled = true; } bool InputReader::Update() { @@ -91,3 +93,10 @@ const char* InputReader::Fmt(const char* format, ...) } +void InputReader::SendEntry(const LogVal* const *vals) { + input_mgr->SendEntry(this, vals); +} + +void InputReader::EndCurrentSend() { + input_mgr->EndCurrentSend(this); +} diff --git a/src/InputReader.h b/src/InputReader.h index 0e93344e1a..b547d29506 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -15,7 +15,7 @@ public: InputReader(); virtual ~InputReader(); - bool Init(string arg_source, int num_fields, const LogField* const* fields); + bool Init(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const* fields); void Finish(); @@ -23,7 +23,7 @@ public: protected: // Methods that have to be overwritten by the individual readers - virtual bool DoInit(string arg_source, int num_fields, const LogField* const * fields) = 0; + virtual bool DoInit(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const * fields) = 0; virtual void DoFinish() = 0; @@ -46,11 +46,16 @@ protected: void Clear(); void Delete(const LogVal* const *val); + void SendEntry(const LogVal* const *vals); + void EndCurrentSend(); + + private: friend class InputMgr; string source; int num_fields; + int index_fields; const LogField* const * fields; // When an error occurs, this method is called to set a flag marking the diff --git a/src/InputReaderAscii.cc b/src/InputReaderAscii.cc index e4d581c0d8..e434f7e750 100644 --- a/src/InputReaderAscii.cc +++ b/src/InputReaderAscii.cc @@ -21,17 +21,26 @@ InputReaderAscii::InputReaderAscii() { //DBG_LOG(DBG_LOGGING, "input reader initialized"); file = 0; + + //keyMap = new map(); } InputReaderAscii::~InputReaderAscii() { + DoFinish(); } void InputReaderAscii::DoFinish() { + columnMap.empty(); + if ( file != 0 ) { + file->close(); + delete(file); + file = 0; + } } -bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const * fields) +bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const LogField* const * fields) { fname = path; @@ -47,6 +56,9 @@ bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const Error("could not read first line"); return false; } + + this->num_fields = num_fields; + this->idx_fields = idx_fields; // split on tabs... istringstream splitstream(line); @@ -83,12 +95,10 @@ bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const if ( wantFields != num_fields ) { // we did not find all fields? // :( - Error("wantFields != num_fields"); + Error("One of the requested fields could not be found in the input data file"); return false; } - - this->num_fields = num_fields; // well, that seems to have worked... return true; @@ -101,6 +111,9 @@ bool InputReaderAscii::DoUpdate() { // + // new keymap + //map *newKeyMap = new map(); + string line; while ( getline(*file, line ) ) { // split on tabs @@ -109,10 +122,12 @@ bool InputReaderAscii::DoUpdate() { string s; LogVal** fields = new LogVal*[num_fields]; + //string string_fields[num_fields]; unsigned int currTab = 0; unsigned int currField = 0; while ( splitstream ) { + if ( !getline(splitstream, s, '\t') ) break; @@ -146,8 +161,11 @@ bool InputReaderAscii::DoUpdate() { case TYPE_BOOL: if ( s == "T" ) { val->val.int_val = 1; - } else { + } else if ( s == "F" ) { val->val.int_val = 0; + } else { + Error(Fmt("Invalid value for boolean: %s", s.c_str())); + return false; } break; @@ -173,9 +191,15 @@ bool InputReaderAscii::DoUpdate() { val->val.subnet_val.width = atoi(width.c_str()); string addr = s.substr(0, pos); s = addr; - // fallthrough + // NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter. + // Solve this some other time.... + val->val.subnet_val.net = dotted_to_addr(s.c_str()); + break; + } case TYPE_ADDR: { + // NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter. + // Solve this some other time.... addr_type t = dotted_to_addr(s.c_str()); #ifdef BROv6 copy_addr(t, val->val.addr_val); @@ -193,19 +217,57 @@ bool InputReaderAscii::DoUpdate() { } fields[currMapping.position] = val; + //string_fields[currMapping.position] = s; currField++; } if ( currField != num_fields ) { - Error("curr_field != num_fields in DoUpdate"); + Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition."); return false; } - // ok, now we have built our line. send it back to the input manager - Put(fields); + + SendEntry(fields); + + /* + string indexstring = ""; + string valstring = ""; + for ( unsigned int i = 0; i < idx_fields; i++ ) { + indexstring.append(string_fields[i]); + } + + for ( unsigned int i = idx_fields; i < num_fields; i++ ) { + valstring.append(string_fields[i]); + } + + string valhash = Hash(valstring); + string indexhash = Hash(indexstring); + + if ( keyMap->find(indexhash) == keyMap->end() ) { + // new key + Put(fields); + } else if ( (*keyMap)[indexhash] != valhash ) { + // changed key + Put(fields); + keyMap->erase(indexhash); + } else { + // field not changed + keyMap->erase(indexhash); + } + + + (*newKeyMap)[indexhash] = valhash; + */ + + for ( unsigned int i = 0; i < num_fields; i++ ) { + delete fields[i]; + } + delete [] fields; } + + EndCurrentSend(); return true; } diff --git a/src/InputReaderAscii.h b/src/InputReaderAscii.h index 551a08b02e..c26a139dcd 100644 --- a/src/InputReaderAscii.h +++ b/src/InputReaderAscii.h @@ -30,7 +30,7 @@ public: protected: - virtual bool DoInit(string path, int num_fields, + virtual bool DoInit(string path, int arg_num_fields, int arg_idx_fields, const LogField* const * fields); virtual void DoFinish(); @@ -42,9 +42,12 @@ private: string fname; unsigned int num_fields; + unsigned int idx_fields; // map columns in the file to columns to send back to the manager vector columnMap; + + //map *keyMap; }; diff --git a/src/Val.h b/src/Val.h index d851be311b..3ae0bc3334 100644 --- a/src/Val.h +++ b/src/Val.h @@ -841,6 +841,9 @@ public: timer = 0; } + HashKey* ComputeHash(const Val* index) const + { return table_hash->ComputeHash(index, 1); } + protected: friend class Val; friend class StateAccess; @@ -851,8 +854,6 @@ protected: void CheckExpireAttr(attr_tag at); int ExpandCompoundAndInit(val_list* vl, int k, Val* new_val); int CheckAndAssign(Val* index, Val* new_val, Opcode op = OP_ASSIGN); - HashKey* ComputeHash(const Val* index) const - { return table_hash->ComputeHash(index, 1); } bool AddProperties(Properties arg_state); bool RemoveProperties(Properties arg_state);