hashing seems to work _correctly_ now...

This commit is contained in:
Bernhard Amann 2011-11-02 14:00:19 -07:00
parent f20125d22d
commit 638976791e
3 changed files with 72 additions and 16 deletions

View file

@ -290,10 +290,10 @@ bool InputMgr::ForceUpdate(EnumVal* id)
{ {
ReaderInfo *i = FindReader(id); ReaderInfo *i = FindReader(id);
if ( i == 0 ) { if ( i == 0 ) {
reporter->Error("Reader not found"); reporter->InternalError("Reader not found");
return false; return false;
} }
return i->reader->Update(); return i->reader->Update();
} }
@ -302,8 +302,9 @@ Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const Lo
int position = 0; int position = 0;
if ( num_fields == 1 ) { if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) {
idxval = LogValToVal(vals[0]); idxval = LogValToVal(vals[0]);
position = 1;
} else { } else {
ListVal *l = new ListVal(TYPE_ANY); ListVal *l = new ListVal(TYPE_ANY);
for ( int j = 0 ; j < type->NumFields(); j++ ) { for ( int j = 0 ; j < type->NumFields(); j++ ) {
@ -317,6 +318,7 @@ Val* InputMgr::LogValToIndexVal(int num_fields, const RecordType *type, const Lo
idxval = l; idxval = l;
} }
//reporter->Error("Position: %d, num_fields: %d", position, num_fields);
assert ( position == num_fields ); assert ( position == num_fields );
return idxval; return idxval;
@ -331,8 +333,15 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
return; return;
} }
reporter->Error("Hashing %d index fields", i->num_idx_fields);
HashKey* idxhash = HashLogVals(i->num_idx_fields, vals); HashKey* idxhash = HashLogVals(i->num_idx_fields, vals);
reporter->Error("Result: %d", (uint64_t) idxhash->Hash());
reporter->Error("Hashing %d val fields", i->num_val_fields);
HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields); HashKey* valhash = HashLogVals(i->num_val_fields, vals+i->num_idx_fields);
reporter->Error("Result: %d", (uint64_t) valhash->Hash());
//reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash());
InputHash *h = i->lastDict->Lookup(idxhash); InputHash *h = i->lastDict->Lookup(idxhash);
if ( h != 0 ) { if ( h != 0 ) {
@ -393,9 +402,12 @@ void InputMgr::SendEntry(const InputReader* reader, const LogVal* const *vals) {
} }
i->tab->Assign(idxval, k, valval); i->tab->Assign(idxval, k, valval);
InputHash* ih = new InputHash(); InputHash* ih = new InputHash();
k = i->tab->ComputeHash(idxval);
ih->idxkey = k; ih->idxkey = k;
ih->valhash = valhash; ih->valhash = valhash;
//i->tab->Delete(k);
i->currDict->Insert(idxhash, ih); i->currDict->Insert(idxhash, ih);
@ -407,11 +419,12 @@ void InputMgr::EndCurrentSend(const InputReader* reader) {
reporter->InternalError("Unknown reader"); reporter->InternalError("Unknown reader");
return; return;
} }
// lastdict contains all deleted entries and should be empty apart from that
// lastdict contains all deleted entries
IterCookie *c = i->lastDict->InitForIteration(); IterCookie *c = i->lastDict->InitForIteration();
InputHash* ih; InputHash* ih;
while ( ( ih = i->lastDict->NextEntry(c )) ) { reporter->Error("ending");
while ( ( ih = i->lastDict->NextEntry(c) ) ) {
reporter->Error("Expiring element");
i->tab->Delete(ih->idxkey); i->tab->Delete(ih->idxkey);
} }
@ -582,28 +595,37 @@ HashKey* InputMgr::HashLogVals(const int num_elements, const LogVal* const *vals
} }
//reporter->Error("Length: %d", length);
int position = 0; int position = 0;
char *data = (char*) malloc(length); char *data = (char*) malloc(length);
if ( data == 0 ) {
reporter->InternalError("Could not malloc?");
}
for ( int i = 0; i < num_elements; i++ ) { for ( int i = 0; i < num_elements; i++ ) {
const LogVal* val = vals[i]; const LogVal* val = vals[i];
switch ( val->type ) { switch ( val->type ) {
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
*(data+position) = val->val.int_val; //reporter->Error("Adding field content to pos %d: %lld", val->val.int_val, position);
memcpy(data+position, (const void*) &(val->val.int_val), sizeof(val->val.int_val));
//*(data+position) = val->val.int_val;
position += sizeof(val->val.int_val); position += sizeof(val->val.int_val);
break; break;
case TYPE_COUNT: case TYPE_COUNT:
case TYPE_COUNTER: case TYPE_COUNTER:
case TYPE_PORT: case TYPE_PORT:
*(data+position) = val->val.uint_val; //*(data+position) = val->val.uint_val;
memcpy(data+position, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val));
position += sizeof(val->val.uint_val); position += sizeof(val->val.uint_val);
break; break;
case TYPE_DOUBLE: case TYPE_DOUBLE:
case TYPE_TIME: case TYPE_TIME:
case TYPE_INTERVAL: case TYPE_INTERVAL:
*(data+position) = val->val.double_val; //*(data+position) = val->val.double_val;
memcpy(data+position, (const void*) &(val->val.double_val), sizeof(val->val.double_val));
position += sizeof(val->val.double_val); position += sizeof(val->val.double_val);
break; break;
@ -685,7 +707,7 @@ InputMgr::ReaderInfo* InputMgr::FindReader(const InputReader* reader)
{ {
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s ) for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
{ {
if ( (*s)->reader == reader ) if ( (*s)->reader && (*s)->reader == reader )
{ {
return *s; return *s;
} }
@ -699,7 +721,7 @@ InputMgr::ReaderInfo* InputMgr::FindReader(const EnumVal* id)
{ {
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s ) for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
{ {
if ( (*s)->id == id ) if ( (*s)->id && (*s)->id->AsEnum() == id->AsEnum() )
{ {
return *s; return *s;
} }

View file

@ -50,6 +50,16 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
return false; return false;
} }
this->num_fields = num_fields;
this->idx_fields = idx_fields;
this->fields = fields;
return true;
}
bool InputReaderAscii::ReadHeader() {
// try to read the header line... // try to read the header line...
string line; string line;
if ( !getline(*file, line) ) { if ( !getline(*file, line) ) {
@ -57,9 +67,6 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
return false; return false;
} }
this->num_fields = num_fields;
this->idx_fields = idx_fields;
// split on tabs... // split on tabs...
istringstream splitstream(line); istringstream splitstream(line);
unsigned int currTab = 0; unsigned int currTab = 0;
@ -70,7 +77,7 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
break; break;
// current found heading in s... compare if we want it // current found heading in s... compare if we want it
for ( int i = 0; i < num_fields; i++ ) { for ( unsigned int i = 0; i < num_fields; i++ ) {
const LogField* field = fields[i]; const LogField* field = fields[i];
if ( field->name == s ) { if ( field->name == s ) {
// cool, found field. note position // cool, found field. note position
@ -92,7 +99,7 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
currTab++; currTab++;
} }
if ( wantFields != num_fields ) { if ( wantFields != (int) num_fields ) {
// we did not find all fields? // we did not find all fields?
// :( // :(
Error("One of the requested fields could not be found in the input data file"); Error("One of the requested fields could not be found in the input data file");
@ -106,9 +113,30 @@ bool InputReaderAscii::DoInit(string path, int num_fields, int idx_fields, const
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool InputReaderAscii::DoUpdate() { bool InputReaderAscii::DoUpdate() {
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
//
// file->seekg(0, ios::beg); // do not forget clear.
if ( ReadHeader() == false ) {
return false;
}
// TODO: all the stuff we need for a second reading. // TODO: all the stuff we need for a second reading.
// *cough* // *cough*
// //
//
// new keymap // new keymap
@ -152,6 +180,7 @@ bool InputReaderAscii::DoUpdate() {
} }
LogVal* val = new LogVal(currMapping.type, true); LogVal* val = new LogVal(currMapping.type, true);
//bzero(val, sizeof(LogVal));
switch ( currMapping.type ) { switch ( currMapping.type ) {
case TYPE_STRING: case TYPE_STRING:
@ -267,6 +296,8 @@ bool InputReaderAscii::DoUpdate() {
} }
//file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start.
EndCurrentSend(); EndCurrentSend();
return true; return true;

View file

@ -37,6 +37,8 @@ protected:
virtual bool DoUpdate(); virtual bool DoUpdate();
private: private:
bool ReadHeader();
ifstream* file; ifstream* file;
string fname; string fname;
@ -46,6 +48,7 @@ private:
// map columns in the file to columns to send back to the manager // map columns in the file to columns to send back to the manager
vector<FieldMapping> columnMap; vector<FieldMapping> columnMap;
const LogField* const * fields; // raw mapping
//map<string, string> *keyMap; //map<string, string> *keyMap;