little snag with hashing functionality...

This commit is contained in:
Bernhard Amann 2011-10-28 13:11:47 -07:00
parent 86730c13dd
commit f20125d22d
7 changed files with 333 additions and 22 deletions

View file

@ -13,6 +13,18 @@
#include "InputReaderAscii.h"
#include "CompHash.h"
class InputHash {
public:
HashKey* valhash;
HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal.
};
declare(PDict, InputHash);
struct InputMgr::ReaderInfo {
EnumVal* id;
EnumVal* type;
@ -24,6 +36,9 @@ struct InputMgr::ReaderInfo {
RecordType* rtype;
RecordType* itype;
PDict(InputHash)* currDict;
PDict(InputHash)* lastDict;
};
struct InputReaderDefinition {
@ -147,10 +162,20 @@ InputReader* InputMgr::CreateReader(EnumVal* id, RecordVal* description)
info->itype = idx;
Ref(idx);
readers.push_back(info);
info->currDict = new PDict(InputHash);
info->lastDict = new PDict(InputHash);
reader_obj->Init(source, fieldsV.size(), fields);
reader_obj->Update();
int success = reader_obj->Init(source, fieldsV.size(), idxfields, fields);
if ( success == false ) {
RemoveReader(id);
return 0;
}
success = reader_obj->Update();
if ( success == false ) {
RemoveReader(id);
return 0;
}
return reader_obj;
@ -198,7 +223,6 @@ bool InputMgr::IsCompatibleType(BroType* t)
return false;
}
bool InputMgr::RemoveReader(EnumVal* id) {
ReaderInfo *i = 0;
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
@ -215,6 +239,9 @@ bool InputMgr::RemoveReader(EnumVal* id) {
return false; // not found
}
i->reader->Finish();
Unref(i->type);
Unref(i->tab);
Unref(i->itype);
@ -267,8 +294,7 @@ bool InputMgr::ForceUpdate(EnumVal* id)
return false;
}
i->reader->Update();
return true;
return i->reader->Update();
}
Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) {
@ -297,6 +323,105 @@ Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const Lo
}
void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
reporter->InternalError("Unknown reader");
return;
}
HashKey* idxhash = HashLogVals(i->num_idx_fields, vals);
HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields);
InputHash *h = i->lastDict->Lookup(idxhash);
if ( h != 0 ) {
// seen before
if ( h->valhash->Hash() == valhash->Hash() ) {
// ok, double.
i->lastDict->Remove(idxhash);
i->currDict->Insert(idxhash, h);
return;
} else {
// updated
i->lastDict->Remove(idxhash);
delete(h);
}
}
Val* idxval = LogValToIndexVal(i->num_idx_fields, i->itype, vals);
Val* valval;
int position = i->num_idx_fields;
if ( i->num_val_fields == 1 ) {
valval = LogValToVal(vals[i->num_idx_fields]);
} else {
RecordVal * r = new RecordVal(i->rtype);
/* if ( i->rtype->NumFields() != (int) i->num_val_fields ) {
reporter->InternalError("Type mismatch");
return;
} */
for ( int j = 0; j < i->rtype->NumFields(); j++) {
Val* val = 0;
if ( i->rtype->FieldType(j)->Tag() == TYPE_RECORD ) {
val = LogValToRecordVal(vals, i->rtype->FieldType(j)->AsRecordType(), &position);
} else {
val = LogValToVal(vals[position], i->rtype->FieldType(j)->Tag());
position++;
}
if ( val == 0 ) {
reporter->InternalError("conversion error");
return;
}
r->Assign(j,val);
}
valval = r;
}
//i->tab->Assign(idxval, valval);
HashKey* k = i->tab->ComputeHash(idxval);
if ( !k ) {
reporter->InternalError("could not hash");
return;
}
i->tab->Assign(idxval, k, valval);
InputHash* ih = new InputHash();
ih->idxkey = k;
ih->valhash = valhash;
i->currDict->Insert(idxhash, ih);
}
void InputMgr::EndCurrentSend(const InputReader* reader) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
reporter->InternalError("Unknown reader");
return;
}
// lastdict contains all deleted entries
IterCookie *c = i->lastDict->InitForIteration();
InputHash* ih;
while ( ( ih = i->lastDict->NextEntry(c )) ) {
i->tab->Delete(ih->idxkey);
}
i->lastDict->Clear();
delete(i->lastDict);
i->lastDict = i->currDict;
i->currDict = new PDict(InputHash);
}
void InputMgr::Put(const InputReader* reader, const LogVal* const *vals) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
@ -418,6 +543,95 @@ Val* InputMgr::LogValToRecordVal(const LogVal* const *vals, RecordType *request_
}
HashKey* InputMgr::HashLogVals(const int num_elements, const LogVal* const *vals) {
int length = 0;
for ( int i = 0; i < num_elements; i++ ) {
const LogVal* val = vals[i];
switch (val->type) {
case TYPE_BOOL:
case TYPE_INT:
length += sizeof(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
length += sizeof(val->val.uint_val);
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
length += sizeof(val->val.double_val);
break;
case TYPE_STRING:
{
length += val->val.string_val->size();
break;
}
case TYPE_ADDR:
length += NUM_ADDR_WORDS*sizeof(uint32_t);
break;
default:
reporter->InternalError("unsupported type for hashlogvals");
}
}
int position = 0;
char *data = (char*) malloc(length);
for ( int i = 0; i < num_elements; i++ ) {
const LogVal* val = vals[i];
switch ( val->type ) {
case TYPE_BOOL:
case TYPE_INT:
*(data+position) = val->val.int_val;
position += sizeof(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
*(data+position) = val->val.uint_val;
position += sizeof(val->val.uint_val);
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
*(data+position) = val->val.double_val;
position += sizeof(val->val.double_val);
break;
case TYPE_STRING:
{
memcpy(data+position, val->val.string_val->c_str(), val->val.string_val->length());
position += val->val.string_val->size();
break;
}
case TYPE_ADDR:
memcpy(data+position, val->val.addr_val, NUM_ADDR_WORDS*sizeof(uint32_t));
position += NUM_ADDR_WORDS*sizeof(uint32_t);
break;
default:
reporter->InternalError("unsupported type for hashlogvals2");
}
}
assert(position == length);
return new HashKey(data, length);
}
Val* InputMgr::LogValToVal(const LogVal* val, TypeTag request_type) {
if ( request_type != TYPE_ANY && request_type != val->type ) {
@ -495,3 +709,11 @@ InputMgr::ReaderInfo* InputMgr::FindReader(const EnumVal* id)
}
string InputMgr::Hash(const string &input) {
unsigned char digest[16];
hash_md5(input.length(), (const unsigned char*) input.c_str(), digest);
string out((const char*) digest, 16);
return out;
}

View file

@ -15,9 +15,10 @@
class InputReader;
class InputMgr {
public:
InputMgr();
InputMgr();
InputReader* CreateReader(EnumVal* id, RecordVal* description);
bool ForceUpdate(EnumVal* id);
@ -33,6 +34,9 @@ protected:
void Clear(const InputReader* reader);
bool Delete(const InputReader* reader, const LogVal* const *vals);
void SendEntry(const InputReader* reader, const LogVal* const *vals);
void EndCurrentSend(const InputReader* reader);
private:
struct ReaderInfo;
@ -40,6 +44,8 @@ private:
bool UnrollRecordType(vector<LogField*> *fields, const RecordType *rec, const string& nameprepend);
HashKey* HashLogVals(const int num_elements, const LogVal* const *vals);
Val* LogValToVal(const LogVal* val, TypeTag request_type = TYPE_ANY);
Val* LogValToIndexVal(int num_fields, const RecordType* type, const LogVal* const *vals);
Val* LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position);
@ -49,6 +55,9 @@ private:
ReaderInfo* FindReader(const EnumVal* id);
vector<ReaderInfo*> readers;
string Hash(const string &input);
};
extern InputMgr* input_mgr;

View file

@ -41,20 +41,22 @@ void InputReader::Delete(const LogVal* const *val)
}
bool InputReader::Init(string arg_source, int arg_num_fields,
bool InputReader::Init(string arg_source, int arg_num_fields, int arg_idx_fields,
const LogField* const * arg_fields)
{
source = arg_source;
num_fields = arg_num_fields;
index_fields = arg_idx_fields;
fields = arg_fields;
// disable if DoInit returns error.
disabled = !DoInit(arg_source, arg_num_fields, arg_fields);
disabled = !DoInit(arg_source, arg_num_fields, arg_idx_fields, arg_fields);
return !disabled;
}
void InputReader::Finish() {
DoFinish();
disabled = true;
}
bool InputReader::Update() {
@ -91,3 +93,10 @@ const char* InputReader::Fmt(const char* format, ...)
}
void InputReader::SendEntry(const LogVal* const *vals) {
input_mgr->SendEntry(this, vals);
}
void InputReader::EndCurrentSend() {
input_mgr->EndCurrentSend(this);
}

View file

@ -15,7 +15,7 @@ public:
InputReader();
virtual ~InputReader();
bool Init(string arg_source, int num_fields, const LogField* const* fields);
bool Init(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const* fields);
void Finish();
@ -23,7 +23,7 @@ public:
protected:
// Methods that have to be overwritten by the individual readers
virtual bool DoInit(string arg_source, int num_fields, const LogField* const * fields) = 0;
virtual bool DoInit(string arg_source, int arg_num_fields, int arg_idx_fields, const LogField* const * fields) = 0;
virtual void DoFinish() = 0;
@ -46,11 +46,16 @@ protected:
void Clear();
void Delete(const LogVal* const *val);
void SendEntry(const LogVal* const *vals);
void EndCurrentSend();
private:
friend class InputMgr;
string source;
int num_fields;
int index_fields;
const LogField* const * fields;
// When an error occurs, this method is called to set a flag marking the

View file

@ -21,17 +21,26 @@ InputReaderAscii::InputReaderAscii()
{
//DBG_LOG(DBG_LOGGING, "input reader initialized");
file = 0;
//keyMap = new map<string, string>();
}
InputReaderAscii::~InputReaderAscii()
{
DoFinish();
}
void InputReaderAscii::DoFinish()
{
columnMap.empty();
if ( file != 0 ) {
file->close();
delete(file);
file = 0;
}
}
bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const * fields)
bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const LogField* const * fields)
{
fname = path;
@ -48,6 +57,9 @@ bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const
return false;
}
this->num_fields = num_fields;
this->idx_fields = idx_fields;
// split on tabs...
istringstream splitstream(line);
unsigned int currTab = 0;
@ -83,13 +95,11 @@ bool InputReaderAscii::DoInit(string path, int num_fields, const LogField* const
if ( wantFields != num_fields ) {
// we did not find all fields?
// :(
Error("wantFields != num_fields");
Error("One of the requested fields could not be found in the input data file");
return false;
}
this->num_fields = num_fields;
// well, that seems to have worked...
return true;
}
@ -101,6 +111,9 @@ bool InputReaderAscii::DoUpdate() {
//
// new keymap
//map<string, string> *newKeyMap = new map<string, string>();
string line;
while ( getline(*file, line ) ) {
// split on tabs
@ -109,10 +122,12 @@ bool InputReaderAscii::DoUpdate() {
string s;
LogVal** fields = new LogVal*[num_fields];
//string string_fields[num_fields];
unsigned int currTab = 0;
unsigned int currField = 0;
while ( splitstream ) {
if ( !getline(splitstream, s, '\t') )
break;
@ -146,8 +161,11 @@ bool InputReaderAscii::DoUpdate() {
case TYPE_BOOL:
if ( s == "T" ) {
val->val.int_val = 1;
} else {
} else if ( s == "F" ) {
val->val.int_val = 0;
} else {
Error(Fmt("Invalid value for boolean: %s", s.c_str()));
return false;
}
break;
@ -173,9 +191,15 @@ bool InputReaderAscii::DoUpdate() {
val->val.subnet_val.width = atoi(width.c_str());
string addr = s.substr(0, pos);
s = addr;
// fallthrough
// NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter.
// Solve this some other time....
val->val.subnet_val.net = dotted_to_addr(s.c_str());
break;
}
case TYPE_ADDR: {
// NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter.
// Solve this some other time....
addr_type t = dotted_to_addr(s.c_str());
#ifdef BROv6
copy_addr(t, val->val.addr_val);
@ -193,19 +217,57 @@ bool InputReaderAscii::DoUpdate() {
}
fields[currMapping.position] = val;
//string_fields[currMapping.position] = s;
currField++;
}
if ( currField != num_fields ) {
Error("curr_field != num_fields in DoUpdate");
Error("curr_field != num_fields in DoUpdate. Columns in file do not match column definition.");
return false;
}
// ok, now we have built our line. send it back to the input manager
Put(fields);
SendEntry(fields);
/*
string indexstring = "";
string valstring = "";
for ( unsigned int i = 0; i < idx_fields; i++ ) {
indexstring.append(string_fields[i]);
}
for ( unsigned int i = idx_fields; i < num_fields; i++ ) {
valstring.append(string_fields[i]);
}
string valhash = Hash(valstring);
string indexhash = Hash(indexstring);
if ( keyMap->find(indexhash) == keyMap->end() ) {
// new key
Put(fields);
} else if ( (*keyMap)[indexhash] != valhash ) {
// changed key
Put(fields);
keyMap->erase(indexhash);
} else {
// field not changed
keyMap->erase(indexhash);
}
(*newKeyMap)[indexhash] = valhash;
*/
for ( unsigned int i = 0; i < num_fields; i++ ) {
delete fields[i];
}
delete [] fields;
}
EndCurrentSend();
return true;
}

View file

@ -30,7 +30,7 @@ public:
protected:
virtual bool DoInit(string path, int num_fields,
virtual bool DoInit(string path, int arg_num_fields, int arg_idx_fields,
const LogField* const * fields);
virtual void DoFinish();
@ -42,10 +42,13 @@ private:
string fname;
unsigned int num_fields;
unsigned int idx_fields;
// map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap;
//map<string, string> *keyMap;
};

View file

@ -841,6 +841,9 @@ public:
timer = 0;
}
HashKey* ComputeHash(const Val* index) const
{ return table_hash->ComputeHash(index, 1); }
protected:
friend class Val;
friend class StateAccess;
@ -851,8 +854,6 @@ protected:
void CheckExpireAttr(attr_tag at);
int ExpandCompoundAndInit(val_list* vl, int k, Val* new_val);
int CheckAndAssign(Val* index, Val* new_val, Opcode op = OP_ASSIGN);
HashKey* ComputeHash(const Val* index) const
{ return table_hash->ComputeHash(index, 1); }
bool AddProperties(Properties arg_state);
bool RemoveProperties(Properties arg_state);