diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro index c76eba80b9..cac1aca54a 100644 --- a/scripts/base/frameworks/input/main.bro +++ b/scripts/base/frameworks/input/main.bro @@ -114,6 +114,8 @@ export { ## filter: the `TableFilter` record describing the filter. global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool; + global update_finished: event(id: Input::ID); + } @load base/input.bif diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 189a034b0f..79d42fe71f 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -215,7 +215,7 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description) EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); - ReaderFrontend* reader_obj = new ReaderFrontend(id->AsEnum()); + ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt()); assert(reader_obj); // get the source... @@ -680,7 +680,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu } -void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value* const *vals) { +void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -692,18 +692,25 @@ void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value* return; } + int readFields; if ( i->filters[id]->filter_type == TABLE_FILTER ) { - SendEntryTable(reader, id, vals); + readFields = SendEntryTable(reader, id, vals); } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); - SendEventFilterEvent(reader, type, id, vals); + readFields = SendEventFilterEvent(reader, type, id, vals); } else { assert(false); } + for ( int i = 0; i < readFields; i++ ) { + delete vals[i]; + } + delete [] vals; + + } -void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) { +int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); bool updated = false; @@ -733,7 +740,7 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V // ok, exact duplicate filter->lastDict->Remove(idxhash); filter->currDict->Insert(idxhash, h); - return; + return filter->num_val_fields + filter->num_idx_fields; } else { assert( filter->num_val_fields > 0 ); // updated @@ -794,11 +801,11 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V if ( !updated ) { // throw away. Hence - we quit. And remove the entry from the current dictionary... delete(filter->currDict->RemoveEntry(idxhash)); - return; + return filter->num_val_fields + filter->num_idx_fields; } else { // keep old one filter->currDict->Insert(idxhash, h); - return; + return filter->num_val_fields + filter->num_idx_fields; } } @@ -809,7 +816,7 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V HashKey* k = filter->tab->ComputeHash(idxval); if ( !k ) { reporter->InternalError("could not hash"); - return; + return filter->num_val_fields + filter->num_idx_fields; } filter->tab->Assign(idxval, k, valval); @@ -842,6 +849,9 @@ void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const V } } } + + + return filter->num_val_fields + filter->num_idx_fields; } @@ -926,9 +936,21 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) { filter->lastDict = filter->currDict; filter->currDict = new PDict(InputHash); + + // Send event that the current update is indeed finished. + + + EventHandler* handler = event_registry->Lookup("Input::update_finished"); + if ( handler == 0 ) { + reporter->InternalError("Input::update_finished not found!"); + } + + + Ref(i->id); + SendEvent(handler, 1, i->id); } -void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals) { +void Manager::Put(const ReaderFrontend* reader, int id, Value* *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -951,7 +973,7 @@ void Manager::Put(const ReaderFrontend* reader, int id, const Value* const *vals } -void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) { +int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); bool updated = false; @@ -985,11 +1007,13 @@ void Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, } } - SendEvent(filter->event, out_vals); + SendEvent(filter->event, out_vals); + + return filter->fields->NumFields(); } -void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) { +int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) { ReaderInfo *i = FindReader(reader); assert(i); @@ -1011,6 +1035,8 @@ void Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const } filter->tab->Assign(idxval, valval); + + return filter->num_idx_fields + filter->num_val_fields; } void Manager::Clear(const ReaderFrontend* reader, int id) { @@ -1028,7 +1054,7 @@ void Manager::Clear(const ReaderFrontend* reader, int id) { filter->tab->RemoveAll(); } -bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *vals) { +bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) { ReaderInfo *i = FindReader(reader); if ( i == 0 ) { reporter->InternalError("Unknown reader"); @@ -1037,18 +1063,29 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, const Value* const *v assert(i->HasFilter(id)); + bool success = false; + int readVals = 0; + if ( i->filters[id]->filter_type == TABLE_FILTER ) { TableFilter* filter = (TableFilter*) i->filters[id]; Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals); - return( filter->tab->Delete(idxval) != 0 ); + readVals = filter->num_idx_fields + filter->num_val_fields; + success = ( filter->tab->Delete(idxval) != 0 ); } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); - SendEventFilterEvent(reader, type, id, vals); - return true; + readVals = SendEventFilterEvent(reader, type, id, vals); + success = true; } else { assert(false); return false; } + + for ( int i = 0; i < readVals; i++ ) { + delete vals[i]; + } + delete [] vals; + + return success; } void Manager::Error(ReaderFrontend* reader, const char* msg) @@ -1056,7 +1093,7 @@ void Manager::Error(ReaderFrontend* reader, const char* msg) reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg); } -bool Manager::SendEvent(const string& name, const int num_vals, const Value* const *vals) +bool Manager::SendEvent(const string& name, const int num_vals, Value* *vals) { EventHandler* handler = event_registry->Lookup(name.c_str()); if ( handler == 0 ) { @@ -1078,6 +1115,11 @@ bool Manager::SendEvent(const string& name, const int num_vals, const Value* con mgr.Dispatch(new Event(handler, vl)); + for ( int i = 0; i < num_vals; i++ ) { + delete vals[i]; + } + delete [] vals; + return true; } diff --git a/src/input/Manager.h b/src/input/Manager.h index a0b98294ca..45c07895f2 100644 --- a/src/input/Manager.h +++ b/src/input/Manager.h @@ -44,15 +44,15 @@ protected: void Error(ReaderFrontend* reader, const char* msg); // for readers to write to input stream in direct mode (reporting new/deleted values directly) - void Put(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + void Put(const ReaderFrontend* reader, int id, threading::Value* *vals); void Clear(const ReaderFrontend* reader, int id); - bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + bool Delete(const ReaderFrontend* reader, int id, threading::Value* *vals); // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) - void SendEntry(const ReaderFrontend* reader, const int id, const threading::Value* const *vals); + void SendEntry(const ReaderFrontend* reader, const int id, threading::Value* *vals); void EndCurrentSend(const ReaderFrontend* reader, const int id); - bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); + bool SendEvent(const string& name, const int num_vals, threading::Value* *vals); ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type); @@ -60,9 +60,9 @@ protected: private: struct ReaderInfo; - void SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); - void PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); - void SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals); + int SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + int PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + int SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals); bool IsCompatibleType(BroType* t, bool atomic_only=false); diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 72c8f95d8e..f9992f5f0e 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -26,7 +26,7 @@ private: class PutMessage : public threading::OutputMessage { public: - PutMessage(ReaderFrontend* reader, int id, const Value* const *val) + PutMessage(ReaderFrontend* reader, int id, Value* *val) : threading::OutputMessage("Put", reader), id(id), val(val) {} @@ -37,12 +37,12 @@ public: private: int id; - const Value* const *val; + Value* *val; }; class DeleteMessage : public threading::OutputMessage { public: - DeleteMessage(ReaderFrontend* reader, int id, const Value* const *val) + DeleteMessage(ReaderFrontend* reader, int id, Value* *val) : threading::OutputMessage("Delete", reader), id(id), val(val) {} @@ -52,7 +52,7 @@ public: private: int id; - const Value* const *val; + Value* *val; }; class ClearMessage : public threading::OutputMessage { @@ -72,7 +72,7 @@ private: class SendEventMessage : public threading::OutputMessage { public: - SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, const Value* const *val) + SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, Value* *val) : threading::OutputMessage("SendEvent", reader), name(name), num_vals(num_vals), val(val) {} @@ -83,14 +83,14 @@ public: private: const string name; const int num_vals; - const Value* const *val; + Value* *val; }; class SendEntryMessage : public threading::OutputMessage { public: - SendEntryMessage(ReaderFrontend* reader, const int id, const Value* const *val) + SendEntryMessage(ReaderFrontend* reader, const int id, Value* *val) : threading::OutputMessage("SendEntry", reader), - id(id), val(val) {} + id(id), val(val) { } virtual bool Process() { input_mgr->SendEntry(Object(), id, val); @@ -99,13 +99,13 @@ public: private: const int id; - const Value* const *val; + Value* *val; }; class EndCurrentSendMessage : public threading::OutputMessage { public: EndCurrentSendMessage(ReaderFrontend* reader, int id) - : threading::OutputMessage("SendEntry", reader), + : threading::OutputMessage("EndCurrentSend", reader), id(id) {} virtual bool Process() { @@ -145,12 +145,12 @@ void ReaderBackend::Error(const char *msg) } */ -void ReaderBackend::Put(int id, const Value* const *val) +void ReaderBackend::Put(int id, Value* *val) { SendOut(new PutMessage(frontend, id, val)); } -void ReaderBackend::Delete(int id, const Value* const *val) +void ReaderBackend::Delete(int id, Value* *val) { SendOut(new DeleteMessage(frontend, id, val)); } @@ -160,7 +160,7 @@ void ReaderBackend::Clear(int id) SendOut(new ClearMessage(frontend, id)); } -void ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals) +void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) { SendOut(new SendEventMessage(frontend, name, num_vals, vals)); } @@ -170,7 +170,7 @@ void ReaderBackend::EndCurrentSend(int id) SendOut(new EndCurrentSendMessage(frontend, id)); } -void ReaderBackend::SendEntry(int id, const Value* const *vals) +void ReaderBackend::SendEntry(int id, Value* *vals) { SendOut(new SendEntryMessage(frontend, id, vals)); } diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index a37daaf4b6..c12d187545 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -50,15 +50,15 @@ protected: // A thread-safe version of fmt(). (stolen from logwriter) const char* Fmt(const char* format, ...); - void SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); + void SendEvent(const string& name, const int num_vals, threading::Value* *vals); // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table - void Put(int id, const threading::Value* const *val); - void Delete(int id, const threading::Value* const *val); + void Put(int id, threading::Value* *val); + void Delete(int id, threading::Value* *val); void Clear(int id); // Table-functions (tracking mode): Only changed lines are propagated. - void SendEntry(int id, const threading::Value* const *vals); + void SendEntry(int id, threading::Value* *vals); void EndCurrentSend(int id); diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index a7f9a4d2f6..0dac33d5e8 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -56,6 +56,7 @@ private: const threading::Field* const* fields; }; + ReaderFrontend::ReaderFrontend(bro_int_t type) { disabled = initialized = false; ty_name = ""; diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index e798f69a36..095d74bf11 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -445,10 +445,12 @@ bool Ascii::DoUpdate() { SendEntry((*it).first, fields); - for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { + /* Do not do this, ownership changes to other thread + * for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { delete fields[i]; } delete [] fields; + */ } } diff --git a/src/main.cc b/src/main.cc index 9df06aa0a0..8205b6de0b 100644 --- a/src/main.cc +++ b/src/main.cc @@ -295,6 +295,7 @@ void terminate_bro() log_mgr->Terminate(); thread_mgr->Terminate(); + mgr.Drain(); delete timer_mgr; delete dns_mgr; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index d008d2e5e8..7b571e753c 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -111,7 +111,7 @@ void Manager::Process() else { - string s = msg->Name() + " failed, terminating thread"; + string s = msg->Name() + " failed, terminating thread " + t->Name() + " (in ThreadManager)"; reporter->Error("%s", s.c_str()); t->Stop(); } diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index b7782b9a05..5f77a1c9f8 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -267,7 +267,7 @@ void MsgThread::Run() if ( ! result ) { - string s = msg->Name() + " failed, terminating thread"; + string s = msg->Name() + " failed, terminating thread (MsgThread)"; Error(s.c_str()); Stop(); break; diff --git a/testing/btest/scripts/base/frameworks/input/basic.bro b/testing/btest/scripts/base/frameworks/input/basic.bro index d1b6659eb6..3b75220625 100644 --- a/testing/btest/scripts/base/frameworks/input/basic.bro +++ b/testing/btest/scripts/base/frameworks/input/basic.bro @@ -48,7 +48,10 @@ event bro_init() Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]); Input::force_update(A::INPUT); - print servers; Input::remove_tablefilter(A::INPUT, "ssh"); Input::remove_stream(A::INPUT); } + +event Input::update_finished(id: Input::ID) { + print servers; +} diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro index 88838cc8d6..712a877960 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro @@ -34,5 +34,9 @@ event bro_init() Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]); Input::force_update(A::INPUT); +} + +event Input::update_finished(id: Input::ID) { print servers; } + diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro index fc4d862cd3..7b62ddcddd 100644 --- a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro @@ -34,5 +34,9 @@ event bro_init() Input::create_stream(A::INPUT, [$source="input.log"]); Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]); Input::force_update(A::INPUT); +} + +event Input::update_finished(id: Input::ID) { print servers; } + diff --git a/testing/btest/scripts/base/frameworks/input/port.bro b/testing/btest/scripts/base/frameworks/input/port.bro index c14892ae36..65d73c54f7 100644 --- a/testing/btest/scripts/base/frameworks/input/port.bro +++ b/testing/btest/scripts/base/frameworks/input/port.bro @@ -39,3 +39,10 @@ event bro_init() Input::remove_tablefilter(A::INPUT, "input"); Input::remove_stream(A::INPUT); } + +event Input::update_finished(id: Input::ID) { + print servers[1.2.3.4]; + print servers[1.2.3.5]; + print servers[1.2.3.6]; +} + diff --git a/testing/btest/scripts/base/frameworks/input/predicate.bro b/testing/btest/scripts/base/frameworks/input/predicate.bro index 5e6bae7b62..bc1ab89bb2 100644 --- a/testing/btest/scripts/base/frameworks/input/predicate.bro +++ b/testing/btest/scripts/base/frameworks/input/predicate.bro @@ -42,6 +42,9 @@ event bro_init() $pred(typ: Input::Event, left: Idx, right: bool) = { return right; } ]); Input::force_update(A::INPUT); +} + +event Input::update_finished(id: Input::ID) { if ( 1 in servers ) { print "VALID"; } diff --git a/testing/btest/scripts/base/frameworks/input/twofilters.bro b/testing/btest/scripts/base/frameworks/input/twofilters.bro index 5af664e0e9..d5bff0c5bb 100644 --- a/testing/btest/scripts/base/frameworks/input/twofilters.bro +++ b/testing/btest/scripts/base/frameworks/input/twofilters.bro @@ -35,6 +35,8 @@ type Val: record { global destination1: table[int] of Val = table(); global destination2: table[int] of Val = table(); +global done: bool = F; + event bro_init() { # first read in the old stuff into the table... @@ -45,6 +47,15 @@ event bro_init() Input::add_tablefilter(A::INPUT, [$name="input2",$idx=Idx, $val=Val, $destination=destination2]); Input::force_update(A::INPUT); +} + +event Input::update_finished(id: Input::ID) { + if ( done == T ) { + return; + } + + done = T; + if ( 1 in destination1 ) { print "VALID"; } @@ -90,6 +101,4 @@ event bro_init() if ( 7 in destination2 ) { print "VALID"; } - - }