diff --git a/src/Event.cc b/src/Event.cc index d461d4e670..8e8db10ed9 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -162,10 +162,9 @@ void EventMgr::Drain() // do after draining events. draining = false; - // We evaluate Triggers here. While this is somewhat unrelated to event - // processing, we ensure that it's done at a regular basis by checking - // them here. - Trigger::EvaluatePending(); + // Make sure all of the triggers get processed every time the events + // drain. + trigger_mgr->Process(); } void EventMgr::Describe(ODesc* d) const diff --git a/src/Expr.cc b/src/Expr.cc index f5db8ff0fb..1049f12929 100644 --- a/src/Expr.cc +++ b/src/Expr.cc @@ -4258,7 +4258,7 @@ Val* CallExpr::Eval(Frame* f) const // Check for that. if ( f ) { - Trigger* trigger = f->GetTrigger(); + trigger::Trigger* trigger = f->GetTrigger(); if ( trigger ) { diff --git a/src/Frame.cc b/src/Frame.cc index 249d988b7b..369e28db47 100644 --- a/src/Frame.cc +++ b/src/Frame.cc @@ -515,7 +515,7 @@ void Frame::CaptureClosure(Frame* c, id_list arg_outer_ids) // if (c) closure = c->SelectiveClone(outer_ids); } -void Frame::SetTrigger(Trigger* arg_trigger) +void Frame::SetTrigger(trigger::Trigger* arg_trigger) { ClearTrigger(); diff --git a/src/Frame.h b/src/Frame.h index 133ab45978..21f411ba1f 100644 --- a/src/Frame.h +++ b/src/Frame.h @@ -11,7 +11,7 @@ #include "Val.h" -class Trigger; +namespace trigger { class Trigger; } class CallExpr; class Frame : public BroObj { @@ -207,9 +207,9 @@ public: // If the frame is run in the context of a trigger condition evaluation, // the trigger needs to be registered. - void SetTrigger(Trigger* arg_trigger); + void SetTrigger(trigger::Trigger* arg_trigger); void ClearTrigger(); - Trigger* GetTrigger() const { return trigger; } + trigger::Trigger* GetTrigger() const { return trigger; } void SetCall(const CallExpr* arg_call) { call = arg_call; } void ClearCall() { call = 0; } @@ -293,7 +293,7 @@ private: bool break_before_next_stmt; bool break_on_return; - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; bool delayed; diff --git a/src/Stats.cc b/src/Stats.cc index d352d19793..fed8a5259a 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -188,8 +188,8 @@ void ProfileLogger::Log() dstats.requests, dstats.successful, dstats.failed, dstats.pending, dstats.cached_hosts, dstats.cached_addresses)); - Trigger::Stats tstats; - Trigger::GetStats(&tstats); + trigger::Manager::Stats tstats; + trigger_mgr->GetStats(&tstats); file->Write(fmt("%.06f Triggers: total=%lu pending=%lu\n", network_time, tstats.total, tstats.pending)); diff --git a/src/Stmt.cc b/src/Stmt.cc index d777e61b97..945bd80faf 100644 --- a/src/Stmt.cc +++ b/src/Stmt.cc @@ -1790,7 +1790,7 @@ Val* WhenStmt::Exec(Frame* f, stmt_flow_type& flow) const ::Ref(timeout); // The new trigger object will take care of its own deletion. - new Trigger(cond, s1, s2, timeout, f, is_return, location); + new trigger::Trigger(cond, s1, s2, timeout, f, is_return, location); return 0; } @@ -1857,4 +1857,3 @@ TraversalCode WhenStmt::Traverse(TraversalCallback* cb) const tc = cb->PostStmt(this); HANDLE_TC_STMT_POST(tc); } - diff --git a/src/Trigger.cc b/src/Trigger.cc index 3df2e9e43e..271edf24aa 100644 --- a/src/Trigger.cc +++ b/src/Trigger.cc @@ -2,10 +2,15 @@ #include "Trigger.h" #include "Traverse.h" +#include "iosource/Manager.h" + +using namespace trigger; // Callback class to traverse an expression, registering all relevant IDs and // Vals for change notifications. +namespace trigger { + class TriggerTraversalCallback : public TraversalCallback { public: TriggerTraversalCallback(Trigger *arg_trigger) @@ -20,6 +25,8 @@ private: Trigger* trigger; }; +} + TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr) { // We catch all expressions here which in some way reference global @@ -67,6 +74,8 @@ TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr) return TC_CONTINUE; } +namespace trigger { + class TriggerTimer : public Timer { public: TriggerTimer(double arg_timeout, Trigger* arg_trigger) @@ -102,13 +111,12 @@ protected: double time; }; +} + Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, Expr* arg_timeout, Frame* arg_frame, bool arg_is_return, const Location* arg_location) { - if ( ! pending ) - pending = new list; - cond = arg_cond; body = arg_body; timeout_stmts = arg_timeout_stmts; @@ -122,8 +130,6 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, location = arg_location; timeout_value = -1; - ++total_triggers; - DBG_LOG(DBG_NOTIFIERS, "%s: instantiating", Name()); if ( is_return ) @@ -215,9 +221,6 @@ void Trigger::Init() cond->Traverse(&cb); } -Trigger::TriggerList* Trigger::pending = 0; -unsigned long Trigger::total_triggers = 0; - bool Trigger::Eval() { if ( disabled ) @@ -330,47 +333,6 @@ bool Trigger::Eval() return true; } -void Trigger::QueueTrigger(Trigger* trigger) - { - assert(! trigger->disabled); - assert(pending); - if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) - { - Ref(trigger); - pending->push_back(trigger); - } - } - -void Trigger::EvaluatePending() - { - DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers"); - - if ( ! pending ) - return; - - // While we iterate over the list, executing statements, we may - // in fact trigger new triggers and thereby modify the list. - // Therefore, we create a new temporary list which will receive - // triggers triggered during this time. - TriggerList* orig = pending; - TriggerList tmp; - pending = &tmp; - - for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i ) - { - Trigger* t = *i; - (*i)->Eval(); - Unref(t); - } - - pending = orig; - orig->clear(); - - // Sigh... Is this really better than a for-loop? - std::copy(tmp.begin(), tmp.end(), - insert_iterator(*pending, pending->begin())); - } - void Trigger::Timeout() { if ( disabled ) @@ -484,7 +446,7 @@ void Trigger::Cache(const CallExpr* expr, Val* v) Ref(v); - QueueTrigger(this); + trigger_mgr->Queue(this); } @@ -502,6 +464,11 @@ void Trigger::Disable() disabled = true; } +void Trigger::Modified(notifier::Modifiable* m) + { + trigger_mgr->Queue(this); + } + const char* Trigger::Name() const { assert(location); @@ -509,8 +476,62 @@ const char* Trigger::Name() const location->first_line, location->last_line); } -void Trigger::GetStats(Stats* stats) + + +Manager::Manager() : IOSource() + { + pending = new TriggerList(); + iosource_mgr->Register(this, true); + } + +Manager::~Manager() + { + delete pending; + } + +double Manager::GetNextTimeout() + { + return pending->empty() ? -1 : network_time + 0.100; + } + +void Manager::Process() + { + DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers"); + + // While we iterate over the list, executing statements, we may + // in fact trigger new triggers and thereby modify the list. + // Therefore, we create a new temporary list which will receive + // triggers triggered during this time. + TriggerList* orig = pending; + TriggerList tmp; + pending = &tmp; + + for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i ) + { + Trigger* t = *i; + (*i)->Eval(); + Unref(t); + } + + pending = orig; + orig->clear(); + + std::swap(tmp, *pending); + } + +void Manager::Queue(Trigger* trigger) + { + if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) + { + Ref(trigger); + pending->push_back(trigger); + total_triggers++; + iosource_mgr->Wakeup(Tag()); + } + } + +void Manager::GetStats(Stats* stats) { stats->total = total_triggers; - stats->pending = pending ? pending->size() : 0; + stats->pending = pending->size(); } diff --git a/src/Trigger.h b/src/Trigger.h index e9fcc087a2..c3efd62138 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -5,7 +5,9 @@ #include "Notifier.h" #include "Traverse.h" +#include "iosource/IOSource.h" +namespace trigger { // Triggers are the heart of "when" statements: expressions that when // they become true execute a body of statements. @@ -58,10 +60,11 @@ public: void Describe(ODesc* d) const override { d->Add(""); } + // Overidden from Notifier. We queue the trigger and evaluate it // later to avoid race conditions. - void Modified(notifier::Modifiable* m) override - { QueueTrigger(this); } + void Modified(notifier::Modifiable* m) override; + // Overridden from notifer::Receiver. If we're still waiting // on an ID/Val to be modified at termination time, we can't hope // for any further progress to be made, so just Unref ourselves. @@ -69,18 +72,6 @@ public: const char* Name() const; - static void QueueTrigger(Trigger* trigger); - - // Evaluates all queued Triggers. - static void EvaluatePending(); - - struct Stats { - unsigned long total; - unsigned long pending; - }; - - static void GetStats(Stats* stats); - private: friend class TriggerTraversalCallback; friend class TriggerTimer; @@ -107,11 +98,36 @@ private: std::vector> objs; - typedef map ValCache; + using ValCache = map; ValCache cache; - - typedef list TriggerList; - static TriggerList* pending; - - static unsigned long total_triggers; }; + +class Manager : public iosource::IOSource { +public: + + Manager(); + ~Manager(); + + double GetNextTimeout() override; + void Process() override; + const char* Tag() override { return "TriggerMgr"; } + + void Queue(Trigger* trigger); + + struct Stats { + unsigned long total; + unsigned long pending; + }; + + void GetStats(Stats* stats); + +private: + + using TriggerList = std::list; + TriggerList* pending; + unsigned long total_triggers = 0; + }; + +} + +extern trigger::Manager* trigger_mgr; diff --git a/src/broker/Store.h b/src/broker/Store.h index e1a20775f0..0ba3d2ad5d 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -66,7 +66,7 @@ inline RecordVal* query_result(RecordVal* data) */ class StoreQueryCallback { public: - StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call, + StoreQueryCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call, broker::store store) : trigger(arg_trigger), call(arg_call), store(move(store)) { @@ -101,7 +101,7 @@ public: private: - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; broker::store store; }; diff --git a/src/main.cc b/src/main.cc index 6e9ad81d05..00ad77a2e6 100644 --- a/src/main.cc +++ b/src/main.cc @@ -41,6 +41,7 @@ extern "C" { #include "Stats.h" #include "Brofiler.h" #include "Traverse.h" +#include "Trigger.h" #include "supervisor/Supervisor.h" #include "threading/Manager.h" @@ -93,6 +94,7 @@ zeekygen::Manager* zeekygen_mgr = 0; iosource::Manager* iosource_mgr = 0; bro_broker::Manager* broker_mgr = 0; zeek::Supervisor* zeek::supervisor_mgr = 0; +trigger::Manager* trigger_mgr = 0; std::vector zeek_script_prefixes; Stmt* stmts; @@ -594,6 +596,7 @@ int main(int argc, char** argv) input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); broker_mgr = new bro_broker::Manager(! options.pcap_file.empty()); + trigger_mgr = new trigger::Manager(); plugin_mgr->InitPreScript(); analyzer_mgr->InitPreScript(); diff --git a/src/zeek.bif b/src/zeek.bif index 89f181601f..a42491daee 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -3503,7 +3503,7 @@ function dump_packet%(pkt: pcap_packet, file_name: string%) : bool class LookupHostCallback : public DNS_Mgr::LookupCallback { public: - LookupHostCallback(Trigger* arg_trigger, const CallExpr* arg_call, + LookupHostCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call, bool arg_lookup_name) { Ref(arg_trigger); @@ -3556,7 +3556,7 @@ public: } private: - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; bool lookup_name; }; @@ -3575,7 +3575,7 @@ function lookup_addr%(host: addr%) : string %{ // FIXME: It should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) { @@ -3604,7 +3604,7 @@ function lookup_hostname_txt%(host: string%) : string %{ // FIXME: Is should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) { @@ -3633,7 +3633,7 @@ function lookup_hostname%(host: string%) : addr_set %{ // FIXME: Is should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) {