diff --git a/src/input/Manager.cc b/src/input/Manager.cc index ea4c5643fa..66dadfdb2d 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -818,13 +818,6 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va } - Val* oldval = 0; - if ( updated == true ) { - assert(filter->num_val_fields > 0); - // in that case, we need the old value to send the event (if we send an event). - oldval = filter->tab->Lookup(idxval); - } - // call filter first to determine if we really add / change the entry if ( filter->pred ) { @@ -865,6 +858,13 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va } + Val* oldval = 0; + if ( updated == true ) { + assert(filter->num_val_fields > 0); + // in that case, we need the old value to send the event (if we send an event). + oldval = filter->tab->Lookup(idxval); + } + //i->tab->Assign(idxval, valval); HashKey* k = filter->tab->ComputeHash(idxval); if ( !k ) { @@ -884,21 +884,22 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va if ( filter->event ) { EnumVal* ev; - Ref(idxval); + int startpos = 0; + Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); if ( updated ) { // in case of update send back the old value. assert ( filter->num_val_fields > 0 ); ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); assert ( oldval != 0 ); Ref(oldval); - SendEvent(filter->event, 3, ev, idxval, oldval); + SendEvent(filter->event, 3, ev, predidx, oldval); } else { ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); Ref(valval); if ( filter->num_val_fields == 0 ) { - SendEvent(filter->event, 3, ev, idxval); + SendEvent(filter->event, 3, ev, predidx); } else { - SendEvent(filter->event, 3, ev, idxval, valval); + SendEvent(filter->event, 3, ev, predidx, valval); } } } @@ -973,10 +974,11 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { } if ( filter->event ) { - Ref(idx); + int startpos = 0; + Val* predidx = ListValToRecordVal(idx, filter->itype, &startpos); Ref(val); EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - SendEvent(filter->event, 3, ev, idx, val); + SendEvent(filter->event, 3, ev, predidx, val); } filter->tab->Delete(ih->idxkey); @@ -991,8 +993,6 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { filter->currDict = new PDict(InputHash); // Send event that the current update is indeed finished. - - EventHandler* handler = event_registry->Lookup("Input::update_finished"); if ( handler == 0 ) { reporter->InternalError("Input::update_finished not found!"); @@ -1077,6 +1077,7 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); Val* valval; + int position = filter->num_idx_fields; if ( filter->num_val_fields == 0 ) { @@ -1087,7 +1088,91 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const * valval = ValueToRecordVal(vals, filter->rtype, &position); } - filter->tab->Assign(idxval, valval); + // if we have a subscribed event, we need to figure out, if this is an update or not + // same for predicates + if ( filter->pred || filter->event ) { + bool updated = false; + Val* oldval = 0; + + if ( filter->num_val_fields > 0 ) { + // in that case, we need the old value to send the event (if we send an event). + oldval = filter->tab->Lookup(idxval, false); + } + + if ( oldval != 0 ) { + // it is an update + updated = true; + Ref(oldval); // have to do that, otherwise it may disappear in assign + } + + + // predicate if we want the update or not + if ( filter->pred ) { + EnumVal* ev; + int startpos = 0; + Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + Ref(valval); + + if ( updated ) { + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); + } else { + ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + } + + val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. + vl.append(ev); + vl.append(predidx); + if ( filter->num_val_fields > 0 ) + vl.append(valval); + + Val* v = filter->pred->Call(&vl); + bool result = v->AsBool(); + Unref(v); + + if ( result == false ) { + // do nothing + Unref(idxval); + Unref(valval); + Unref(oldval); + return filter->num_val_fields + filter->num_idx_fields; + } + + } + + + filter->tab->Assign(idxval, valval); + + if ( filter->event ) { + EnumVal* ev; + int startpos = 0; + Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + + if ( updated ) { // in case of update send back the old value. + assert ( filter->num_val_fields > 0 ); + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); + assert ( oldval != 0 ); + SendEvent(filter->event, 3, ev, predidx, oldval); + } else { + ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + Ref(valval); + if ( filter->num_val_fields == 0 ) { + SendEvent(filter->event, 3, ev, predidx); + } else { + SendEvent(filter->event, 3, ev, predidx, valval); + } + } + + } + + + + + + } else { + // no predicates or other stuff + + filter->tab->Assign(idxval, valval); + } return filter->num_idx_fields + filter->num_val_fields; } @@ -1122,8 +1207,52 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { if ( i->filters[id]->filter_type == TABLE_FILTER ) { TableFilter* filter = (TableFilter*) i->filters[id]; Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); + assert(idxval != 0); readVals = filter->num_idx_fields + filter->num_val_fields; - success = ( filter->tab->Delete(idxval) != 0 ); + bool filterresult = true; + + if ( filter->pred || filter->event ) { + Val *val = filter->tab->Lookup(idxval); + + if ( filter->pred ) { + Ref(val); + EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + int startpos = 0; + Val* predidx = ValueToRecordVal(vals, filter->itype, &startpos); + + val_list vl(3); + vl.append(ev); + vl.append(predidx); + vl.append(val); + Val* v = filter->pred->Call(&vl); + filterresult = v->AsBool(); + Unref(v); + + if ( filterresult == false ) { + // keep it. + Unref(idxval); + success = true; + } + + } + + // only if filter = true -> no filtering + if ( filterresult && filter->event ) { + Ref(idxval); + assert(val != 0); + Ref(val); + EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + SendEvent(filter->event, 3, ev, idxval, val); + } + } + + // only if filter = true -> no filtering + if ( filterresult ) { + success = ( filter->tab->Delete(idxval) != 0 ); + if ( !success ) { + reporter->Error("Internal error while deleting values from input table"); + } + } } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); readVals = SendEventFilterEvent(reader, type, id, vals); diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 5a3569a95f..b0b046b75b 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -91,16 +91,22 @@ bool Ascii::DoInit(string path, int arg_mode) mode = arg_mode; mtime = 0; + if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { + Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); + return false; + } + file = new ifstream(path.c_str()); if ( !file->is_open() ) { Error(Fmt("Init: cannot open %s", fname.c_str())); return false; } - - if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { - Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); + + if ( ReadHeader(false) == false ) { + Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str())); + file->close(); return false; - } + } return true; } @@ -114,6 +120,8 @@ bool Ascii::DoStartReading() { started = true; switch ( mode ) { case MANUAL: + case REREAD: + case STREAM: DoUpdate(); break; default: @@ -157,16 +165,25 @@ bool Ascii::HasFilter(int id) { } -bool Ascii::ReadHeader() { +bool Ascii::ReadHeader(bool useCached) { // try to read the header line... string line; - if ( !GetLine(line) ) { - Error("could not read first line"); - return false; - } - map fields; + if ( !useCached ) { + if ( !GetLine(line) ) { + Error("could not read first line"); + return false; + } + + + + headerline = line; + + } else { + line = headerline; + } + // construct list of field names. istringstream splitstream(line); int pos=0; @@ -179,7 +196,7 @@ bool Ascii::ReadHeader() { pos++; } - + //printf("Updating fields from description %s\n", line.c_str()); for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { (*it).second.columnMap.clear(); @@ -433,6 +450,7 @@ bool Ascii::DoUpdate() { if ( file && file->is_open() ) { if ( mode == STREAM ) { file->clear(); // remove end of file evil bits + ReadHeader(true); // in case filters changed break; } file->close(); @@ -444,7 +462,7 @@ bool Ascii::DoUpdate() { } - if ( ReadHeader() == false ) { + if ( ReadHeader(false) == false ) { return false; } @@ -512,9 +530,14 @@ bool Ascii::DoUpdate() { fpos++; } + //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); assert ( (unsigned int) fpos == (*it).second.num_fields ); - SendEntry((*it).first, fields); + if ( mode == STREAM ) { + Put((*it).first, fields); + } else { + SendEntry((*it).first, fields); + } /* Do not do this, ownership changes to other thread * for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { @@ -530,9 +553,12 @@ bool Ascii::DoUpdate() { //file->clear(); // remove end of file evil bits //file->seekg(0, ios::beg); // and seek to start. - for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { - EndCurrentSend((*it).first); + if ( mode != STREAM ) { + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + EndCurrentSend((*it).first); + } } + return true; } diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h index 017e5630d4..d2376e4fe1 100644 --- a/src/input/readers/Ascii.h +++ b/src/input/readers/Ascii.h @@ -69,7 +69,7 @@ private: TransportProto StringToProto(const string &proto); - bool ReadHeader(); + bool ReadHeader(bool useCached); threading::Value* EntryToVal(string s, FieldMapping type); bool GetLine(string& str); @@ -87,6 +87,9 @@ private: string empty_field; string unset_field; + + // keep a copy of the headerline to determine field locations when filters change + string headerline; int mode;