From a159d075cfab3b9e84e61c53c8fca176ba14c0f2 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 26 Nov 2019 12:47:33 -0700 Subject: [PATCH] Add Trigger manager for managing triggers created by things like 'when' statements - Adds new trigger namespace - Adds trigger::Manager class as a new IOSource for keeping track of triggers and integrating them into the loop. Previously the loop relied on the event manager Drain() method to process all triggers on every loop, but now that the loop actively waits for events to occur, triggers would not fire when they needed to. Adding them as part of the loop ensures they're checked. --- src/Event.cc | 7 ++- src/Expr.cc | 2 +- src/Frame.cc | 2 +- src/Frame.h | 8 +-- src/Stats.cc | 4 +- src/Stmt.cc | 3 +- src/Trigger.cc | 125 ++++++++++++++++++++++++++------------------- src/Trigger.h | 56 ++++++++++++-------- src/broker/Store.h | 4 +- src/main.cc | 3 ++ src/zeek.bif | 10 ++-- 11 files changed, 131 insertions(+), 93 deletions(-) diff --git a/src/Event.cc b/src/Event.cc index d461d4e670..8e8db10ed9 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -162,10 +162,9 @@ void EventMgr::Drain() // do after draining events. draining = false; - // We evaluate Triggers here. While this is somewhat unrelated to event - // processing, we ensure that it's done at a regular basis by checking - // them here. - Trigger::EvaluatePending(); + // Make sure all of the triggers get processed every time the events + // drain. + trigger_mgr->Process(); } void EventMgr::Describe(ODesc* d) const diff --git a/src/Expr.cc b/src/Expr.cc index f5db8ff0fb..1049f12929 100644 --- a/src/Expr.cc +++ b/src/Expr.cc @@ -4258,7 +4258,7 @@ Val* CallExpr::Eval(Frame* f) const // Check for that. if ( f ) { - Trigger* trigger = f->GetTrigger(); + trigger::Trigger* trigger = f->GetTrigger(); if ( trigger ) { diff --git a/src/Frame.cc b/src/Frame.cc index 249d988b7b..369e28db47 100644 --- a/src/Frame.cc +++ b/src/Frame.cc @@ -515,7 +515,7 @@ void Frame::CaptureClosure(Frame* c, id_list arg_outer_ids) // if (c) closure = c->SelectiveClone(outer_ids); } -void Frame::SetTrigger(Trigger* arg_trigger) +void Frame::SetTrigger(trigger::Trigger* arg_trigger) { ClearTrigger(); diff --git a/src/Frame.h b/src/Frame.h index 133ab45978..21f411ba1f 100644 --- a/src/Frame.h +++ b/src/Frame.h @@ -11,7 +11,7 @@ #include "Val.h" -class Trigger; +namespace trigger { class Trigger; } class CallExpr; class Frame : public BroObj { @@ -207,9 +207,9 @@ public: // If the frame is run in the context of a trigger condition evaluation, // the trigger needs to be registered. - void SetTrigger(Trigger* arg_trigger); + void SetTrigger(trigger::Trigger* arg_trigger); void ClearTrigger(); - Trigger* GetTrigger() const { return trigger; } + trigger::Trigger* GetTrigger() const { return trigger; } void SetCall(const CallExpr* arg_call) { call = arg_call; } void ClearCall() { call = 0; } @@ -293,7 +293,7 @@ private: bool break_before_next_stmt; bool break_on_return; - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; bool delayed; diff --git a/src/Stats.cc b/src/Stats.cc index d352d19793..fed8a5259a 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -188,8 +188,8 @@ void ProfileLogger::Log() dstats.requests, dstats.successful, dstats.failed, dstats.pending, dstats.cached_hosts, dstats.cached_addresses)); - Trigger::Stats tstats; - Trigger::GetStats(&tstats); + trigger::Manager::Stats tstats; + trigger_mgr->GetStats(&tstats); file->Write(fmt("%.06f Triggers: total=%lu pending=%lu\n", network_time, tstats.total, tstats.pending)); diff --git a/src/Stmt.cc b/src/Stmt.cc index d777e61b97..945bd80faf 100644 --- a/src/Stmt.cc +++ b/src/Stmt.cc @@ -1790,7 +1790,7 @@ Val* WhenStmt::Exec(Frame* f, stmt_flow_type& flow) const ::Ref(timeout); // The new trigger object will take care of its own deletion. - new Trigger(cond, s1, s2, timeout, f, is_return, location); + new trigger::Trigger(cond, s1, s2, timeout, f, is_return, location); return 0; } @@ -1857,4 +1857,3 @@ TraversalCode WhenStmt::Traverse(TraversalCallback* cb) const tc = cb->PostStmt(this); HANDLE_TC_STMT_POST(tc); } - diff --git a/src/Trigger.cc b/src/Trigger.cc index 3df2e9e43e..271edf24aa 100644 --- a/src/Trigger.cc +++ b/src/Trigger.cc @@ -2,10 +2,15 @@ #include "Trigger.h" #include "Traverse.h" +#include "iosource/Manager.h" + +using namespace trigger; // Callback class to traverse an expression, registering all relevant IDs and // Vals for change notifications. +namespace trigger { + class TriggerTraversalCallback : public TraversalCallback { public: TriggerTraversalCallback(Trigger *arg_trigger) @@ -20,6 +25,8 @@ private: Trigger* trigger; }; +} + TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr) { // We catch all expressions here which in some way reference global @@ -67,6 +74,8 @@ TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr) return TC_CONTINUE; } +namespace trigger { + class TriggerTimer : public Timer { public: TriggerTimer(double arg_timeout, Trigger* arg_trigger) @@ -102,13 +111,12 @@ protected: double time; }; +} + Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, Expr* arg_timeout, Frame* arg_frame, bool arg_is_return, const Location* arg_location) { - if ( ! pending ) - pending = new list; - cond = arg_cond; body = arg_body; timeout_stmts = arg_timeout_stmts; @@ -122,8 +130,6 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, location = arg_location; timeout_value = -1; - ++total_triggers; - DBG_LOG(DBG_NOTIFIERS, "%s: instantiating", Name()); if ( is_return ) @@ -215,9 +221,6 @@ void Trigger::Init() cond->Traverse(&cb); } -Trigger::TriggerList* Trigger::pending = 0; -unsigned long Trigger::total_triggers = 0; - bool Trigger::Eval() { if ( disabled ) @@ -330,47 +333,6 @@ bool Trigger::Eval() return true; } -void Trigger::QueueTrigger(Trigger* trigger) - { - assert(! trigger->disabled); - assert(pending); - if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) - { - Ref(trigger); - pending->push_back(trigger); - } - } - -void Trigger::EvaluatePending() - { - DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers"); - - if ( ! pending ) - return; - - // While we iterate over the list, executing statements, we may - // in fact trigger new triggers and thereby modify the list. - // Therefore, we create a new temporary list which will receive - // triggers triggered during this time. - TriggerList* orig = pending; - TriggerList tmp; - pending = &tmp; - - for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i ) - { - Trigger* t = *i; - (*i)->Eval(); - Unref(t); - } - - pending = orig; - orig->clear(); - - // Sigh... Is this really better than a for-loop? - std::copy(tmp.begin(), tmp.end(), - insert_iterator(*pending, pending->begin())); - } - void Trigger::Timeout() { if ( disabled ) @@ -484,7 +446,7 @@ void Trigger::Cache(const CallExpr* expr, Val* v) Ref(v); - QueueTrigger(this); + trigger_mgr->Queue(this); } @@ -502,6 +464,11 @@ void Trigger::Disable() disabled = true; } +void Trigger::Modified(notifier::Modifiable* m) + { + trigger_mgr->Queue(this); + } + const char* Trigger::Name() const { assert(location); @@ -509,8 +476,62 @@ const char* Trigger::Name() const location->first_line, location->last_line); } -void Trigger::GetStats(Stats* stats) + + +Manager::Manager() : IOSource() + { + pending = new TriggerList(); + iosource_mgr->Register(this, true); + } + +Manager::~Manager() + { + delete pending; + } + +double Manager::GetNextTimeout() + { + return pending->empty() ? -1 : network_time + 0.100; + } + +void Manager::Process() + { + DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers"); + + // While we iterate over the list, executing statements, we may + // in fact trigger new triggers and thereby modify the list. + // Therefore, we create a new temporary list which will receive + // triggers triggered during this time. + TriggerList* orig = pending; + TriggerList tmp; + pending = &tmp; + + for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i ) + { + Trigger* t = *i; + (*i)->Eval(); + Unref(t); + } + + pending = orig; + orig->clear(); + + std::swap(tmp, *pending); + } + +void Manager::Queue(Trigger* trigger) + { + if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) + { + Ref(trigger); + pending->push_back(trigger); + total_triggers++; + iosource_mgr->Wakeup(Tag()); + } + } + +void Manager::GetStats(Stats* stats) { stats->total = total_triggers; - stats->pending = pending ? pending->size() : 0; + stats->pending = pending->size(); } diff --git a/src/Trigger.h b/src/Trigger.h index e9fcc087a2..c3efd62138 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -5,7 +5,9 @@ #include "Notifier.h" #include "Traverse.h" +#include "iosource/IOSource.h" +namespace trigger { // Triggers are the heart of "when" statements: expressions that when // they become true execute a body of statements. @@ -58,10 +60,11 @@ public: void Describe(ODesc* d) const override { d->Add(""); } + // Overidden from Notifier. We queue the trigger and evaluate it // later to avoid race conditions. - void Modified(notifier::Modifiable* m) override - { QueueTrigger(this); } + void Modified(notifier::Modifiable* m) override; + // Overridden from notifer::Receiver. If we're still waiting // on an ID/Val to be modified at termination time, we can't hope // for any further progress to be made, so just Unref ourselves. @@ -69,18 +72,6 @@ public: const char* Name() const; - static void QueueTrigger(Trigger* trigger); - - // Evaluates all queued Triggers. - static void EvaluatePending(); - - struct Stats { - unsigned long total; - unsigned long pending; - }; - - static void GetStats(Stats* stats); - private: friend class TriggerTraversalCallback; friend class TriggerTimer; @@ -107,11 +98,36 @@ private: std::vector> objs; - typedef map ValCache; + using ValCache = map; ValCache cache; - - typedef list TriggerList; - static TriggerList* pending; - - static unsigned long total_triggers; }; + +class Manager : public iosource::IOSource { +public: + + Manager(); + ~Manager(); + + double GetNextTimeout() override; + void Process() override; + const char* Tag() override { return "TriggerMgr"; } + + void Queue(Trigger* trigger); + + struct Stats { + unsigned long total; + unsigned long pending; + }; + + void GetStats(Stats* stats); + +private: + + using TriggerList = std::list; + TriggerList* pending; + unsigned long total_triggers = 0; + }; + +} + +extern trigger::Manager* trigger_mgr; diff --git a/src/broker/Store.h b/src/broker/Store.h index e1a20775f0..0ba3d2ad5d 100644 --- a/src/broker/Store.h +++ b/src/broker/Store.h @@ -66,7 +66,7 @@ inline RecordVal* query_result(RecordVal* data) */ class StoreQueryCallback { public: - StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call, + StoreQueryCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call, broker::store store) : trigger(arg_trigger), call(arg_call), store(move(store)) { @@ -101,7 +101,7 @@ public: private: - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; broker::store store; }; diff --git a/src/main.cc b/src/main.cc index 6e9ad81d05..00ad77a2e6 100644 --- a/src/main.cc +++ b/src/main.cc @@ -41,6 +41,7 @@ extern "C" { #include "Stats.h" #include "Brofiler.h" #include "Traverse.h" +#include "Trigger.h" #include "supervisor/Supervisor.h" #include "threading/Manager.h" @@ -93,6 +94,7 @@ zeekygen::Manager* zeekygen_mgr = 0; iosource::Manager* iosource_mgr = 0; bro_broker::Manager* broker_mgr = 0; zeek::Supervisor* zeek::supervisor_mgr = 0; +trigger::Manager* trigger_mgr = 0; std::vector zeek_script_prefixes; Stmt* stmts; @@ -594,6 +596,7 @@ int main(int argc, char** argv) input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); broker_mgr = new bro_broker::Manager(! options.pcap_file.empty()); + trigger_mgr = new trigger::Manager(); plugin_mgr->InitPreScript(); analyzer_mgr->InitPreScript(); diff --git a/src/zeek.bif b/src/zeek.bif index 89f181601f..a42491daee 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -3503,7 +3503,7 @@ function dump_packet%(pkt: pcap_packet, file_name: string%) : bool class LookupHostCallback : public DNS_Mgr::LookupCallback { public: - LookupHostCallback(Trigger* arg_trigger, const CallExpr* arg_call, + LookupHostCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call, bool arg_lookup_name) { Ref(arg_trigger); @@ -3556,7 +3556,7 @@ public: } private: - Trigger* trigger; + trigger::Trigger* trigger; const CallExpr* call; bool lookup_name; }; @@ -3575,7 +3575,7 @@ function lookup_addr%(host: addr%) : string %{ // FIXME: It should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) { @@ -3604,7 +3604,7 @@ function lookup_hostname_txt%(host: string%) : string %{ // FIXME: Is should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) { @@ -3633,7 +3633,7 @@ function lookup_hostname%(host: string%) : addr_set %{ // FIXME: Is should be easy to adapt the function to synchronous // lookups if we're reading a trace. - Trigger* trigger = frame->GetTrigger(); + trigger::Trigger* trigger = frame->GetTrigger(); if ( ! trigger) {