...forgotten in last commit.

This commit is contained in:
Bernhard Amann 2012-02-20 13:20:29 -08:00
parent 4126b458ca
commit 4f57817b1a
3 changed files with 191 additions and 33 deletions

View file

@ -818,13 +818,6 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
} }
Val* oldval = 0;
if ( updated == true ) {
assert(filter->num_val_fields > 0);
// in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval);
}
// call filter first to determine if we really add / change the entry // call filter first to determine if we really add / change the entry
if ( filter->pred ) { if ( filter->pred ) {
@ -865,6 +858,13 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
} }
Val* oldval = 0;
if ( updated == true ) {
assert(filter->num_val_fields > 0);
// in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval);
}
//i->tab->Assign(idxval, valval); //i->tab->Assign(idxval, valval);
HashKey* k = filter->tab->ComputeHash(idxval); HashKey* k = filter->tab->ComputeHash(idxval);
if ( !k ) { if ( !k ) {
@ -884,21 +884,22 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
if ( filter->event ) { if ( filter->event ) {
EnumVal* ev; EnumVal* ev;
Ref(idxval); int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
if ( updated ) { // in case of update send back the old value. if ( updated ) { // in case of update send back the old value.
assert ( filter->num_val_fields > 0 ); assert ( filter->num_val_fields > 0 );
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 ); assert ( oldval != 0 );
Ref(oldval); Ref(oldval);
SendEvent(filter->event, 3, ev, idxval, oldval); SendEvent(filter->event, 3, ev, predidx, oldval);
} else { } else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
Ref(valval); Ref(valval);
if ( filter->num_val_fields == 0 ) { if ( filter->num_val_fields == 0 ) {
SendEvent(filter->event, 3, ev, idxval); SendEvent(filter->event, 3, ev, predidx);
} else { } else {
SendEvent(filter->event, 3, ev, idxval, valval); SendEvent(filter->event, 3, ev, predidx, valval);
} }
} }
} }
@ -973,10 +974,11 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
} }
if ( filter->event ) { if ( filter->event ) {
Ref(idx); int startpos = 0;
Val* predidx = ListValToRecordVal(idx, filter->itype, &startpos);
Ref(val); Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(filter->event, 3, ev, idx, val); SendEvent(filter->event, 3, ev, predidx, val);
} }
filter->tab->Delete(ih->idxkey); filter->tab->Delete(ih->idxkey);
@ -991,8 +993,6 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
filter->currDict = new PDict(InputHash); filter->currDict = new PDict(InputHash);
// Send event that the current update is indeed finished. // Send event that the current update is indeed finished.
EventHandler* handler = event_registry->Lookup("Input::update_finished"); EventHandler* handler = event_registry->Lookup("Input::update_finished");
if ( handler == 0 ) { if ( handler == 0 ) {
reporter->InternalError("Input::update_finished not found!"); reporter->InternalError("Input::update_finished not found!");
@ -1077,6 +1077,7 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
Val* valval; Val* valval;
int position = filter->num_idx_fields; int position = filter->num_idx_fields;
if ( filter->num_val_fields == 0 ) { if ( filter->num_val_fields == 0 ) {
@ -1087,7 +1088,91 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
valval = ValueToRecordVal(vals, filter->rtype, &position); valval = ValueToRecordVal(vals, filter->rtype, &position);
} }
filter->tab->Assign(idxval, valval); // if we have a subscribed event, we need to figure out, if this is an update or not
// same for predicates
if ( filter->pred || filter->event ) {
bool updated = false;
Val* oldval = 0;
if ( filter->num_val_fields > 0 ) {
// in that case, we need the old value to send the event (if we send an event).
oldval = filter->tab->Lookup(idxval, false);
}
if ( oldval != 0 ) {
// it is an update
updated = true;
Ref(oldval); // have to do that, otherwise it may disappear in assign
}
// predicate if we want the update or not
if ( filter->pred ) {
EnumVal* ev;
int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
Ref(valval);
if ( updated ) {
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
} else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
}
val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise.
vl.append(ev);
vl.append(predidx);
if ( filter->num_val_fields > 0 )
vl.append(valval);
Val* v = filter->pred->Call(&vl);
bool result = v->AsBool();
Unref(v);
if ( result == false ) {
// do nothing
Unref(idxval);
Unref(valval);
Unref(oldval);
return filter->num_val_fields + filter->num_idx_fields;
}
}
filter->tab->Assign(idxval, valval);
if ( filter->event ) {
EnumVal* ev;
int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
if ( updated ) { // in case of update send back the old value.
assert ( filter->num_val_fields > 0 );
ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event);
assert ( oldval != 0 );
SendEvent(filter->event, 3, ev, predidx, oldval);
} else {
ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
Ref(valval);
if ( filter->num_val_fields == 0 ) {
SendEvent(filter->event, 3, ev, predidx);
} else {
SendEvent(filter->event, 3, ev, predidx, valval);
}
}
}
} else {
// no predicates or other stuff
filter->tab->Assign(idxval, valval);
}
return filter->num_idx_fields + filter->num_val_fields; return filter->num_idx_fields + filter->num_val_fields;
} }
@ -1122,8 +1207,52 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
if ( i->filters[id]->filter_type == TABLE_FILTER ) { if ( i->filters[id]->filter_type == TABLE_FILTER ) {
TableFilter* filter = (TableFilter*) i->filters[id]; TableFilter* filter = (TableFilter*) i->filters[id];
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
assert(idxval != 0);
readVals = filter->num_idx_fields + filter->num_val_fields; readVals = filter->num_idx_fields + filter->num_val_fields;
success = ( filter->tab->Delete(idxval) != 0 ); bool filterresult = true;
if ( filter->pred || filter->event ) {
Val *val = filter->tab->Lookup(idxval);
if ( filter->pred ) {
Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
int startpos = 0;
Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos);
val_list vl(3);
vl.append(ev);
vl.append(predidx);
vl.append(val);
Val* v = filter->pred->Call(&vl);
filterresult = v->AsBool();
Unref(v);
if ( filterresult == false ) {
// keep it.
Unref(idxval);
success = true;
}
}
// only if filter = true -> no filtering
if ( filterresult && filter->event ) {
Ref(idxval);
assert(val != 0);
Ref(val);
EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
SendEvent(filter->event, 3, ev, idxval, val);
}
}
// only if filter = true -> no filtering
if ( filterresult ) {
success = ( filter->tab->Delete(idxval) != 0 );
if ( !success ) {
reporter->Error("Internal error while deleting values from input table");
}
}
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) { } else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
readVals = SendEventFilterEvent(reader, type, id, vals); readVals = SendEventFilterEvent(reader, type, id, vals);

View file

@ -91,16 +91,22 @@ bool Ascii::DoInit(string path, int arg_mode)
mode = arg_mode; mode = arg_mode;
mtime = 0; mtime = 0;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
file = new ifstream(path.c_str()); file = new ifstream(path.c_str());
if ( !file->is_open() ) { if ( !file->is_open() ) {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
return false; return false;
} }
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { if ( ReadHeader(false) == false ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str()));
file->close();
return false; return false;
} }
return true; return true;
} }
@ -114,6 +120,8 @@ bool Ascii::DoStartReading() {
started = true; started = true;
switch ( mode ) { switch ( mode ) {
case MANUAL: case MANUAL:
case REREAD:
case STREAM:
DoUpdate(); DoUpdate();
break; break;
default: default:
@ -157,16 +165,25 @@ bool Ascii::HasFilter(int id) {
} }
bool Ascii::ReadHeader() { bool Ascii::ReadHeader(bool useCached) {
// try to read the header line... // try to read the header line...
string line; string line;
if ( !GetLine(line) ) {
Error("could not read first line");
return false;
}
map<string, uint32_t> fields; map<string, uint32_t> fields;
if ( !useCached ) {
if ( !GetLine(line) ) {
Error("could not read first line");
return false;
}
headerline = line;
} else {
line = headerline;
}
// construct list of field names. // construct list of field names.
istringstream splitstream(line); istringstream splitstream(line);
int pos=0; int pos=0;
@ -179,7 +196,7 @@ bool Ascii::ReadHeader() {
pos++; pos++;
} }
//printf("Updating fields from description %s\n", line.c_str());
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) { for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
(*it).second.columnMap.clear(); (*it).second.columnMap.clear();
@ -433,6 +450,7 @@ bool Ascii::DoUpdate() {
if ( file && file->is_open() ) { if ( file && file->is_open() ) {
if ( mode == STREAM ) { if ( mode == STREAM ) {
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
ReadHeader(true); // in case filters changed
break; break;
} }
file->close(); file->close();
@ -444,7 +462,7 @@ bool Ascii::DoUpdate() {
} }
if ( ReadHeader() == false ) { if ( ReadHeader(false) == false ) {
return false; return false;
} }
@ -512,9 +530,14 @@ bool Ascii::DoUpdate() {
fpos++; fpos++;
} }
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == (*it).second.num_fields ); assert ( (unsigned int) fpos == (*it).second.num_fields );
SendEntry((*it).first, fields); if ( mode == STREAM ) {
Put((*it).first, fields);
} else {
SendEntry((*it).first, fields);
}
/* Do not do this, ownership changes to other thread /* Do not do this, ownership changes to other thread
* for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { * for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
@ -530,9 +553,12 @@ bool Ascii::DoUpdate() {
//file->clear(); // remove end of file evil bits //file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start. //file->seekg(0, ios::beg); // and seek to start.
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) { if ( mode != STREAM ) {
EndCurrentSend((*it).first); for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
EndCurrentSend((*it).first);
}
} }
return true; return true;
} }

View file

@ -69,7 +69,7 @@ private:
TransportProto StringToProto(const string &proto); TransportProto StringToProto(const string &proto);
bool ReadHeader(); bool ReadHeader(bool useCached);
threading::Value* EntryToVal(string s, FieldMapping type); threading::Value* EntryToVal(string s, FieldMapping type);
bool GetLine(string& str); bool GetLine(string& str);
@ -87,6 +87,9 @@ private:
string empty_field; string empty_field;
string unset_field; string unset_field;
// keep a copy of the headerline to determine field locations when filters change
string headerline;
int mode; int mode;