Add Trigger manager for managing triggers created by things like 'when' statements

- Adds new trigger namespace
- Adds trigger::Manager class as a new IOSource for keeping track of triggers and integrating them into the loop. Previously the loop relied on the event manager Drain() method to process all triggers on every loop, but now that the loop actively waits for events to occur, triggers would not fire when they needed to. Adding them as part of the loop ensures they're checked.
This commit is contained in:
Tim Wojtulewicz 2019-11-26 12:47:33 -07:00
parent 92bde61b78
commit a159d075cf
11 changed files with 131 additions and 93 deletions

View file

@ -162,10 +162,9 @@ void EventMgr::Drain()
// do after draining events.
draining = false;
// We evaluate Triggers here. While this is somewhat unrelated to event
// processing, we ensure that it's done at a regular basis by checking
// them here.
Trigger::EvaluatePending();
// Make sure all of the triggers get processed every time the events
// drain.
trigger_mgr->Process();
}
void EventMgr::Describe(ODesc* d) const

View file

@ -4258,7 +4258,7 @@ Val* CallExpr::Eval(Frame* f) const
// Check for that.
if ( f )
{
Trigger* trigger = f->GetTrigger();
trigger::Trigger* trigger = f->GetTrigger();
if ( trigger )
{

View file

@ -515,7 +515,7 @@ void Frame::CaptureClosure(Frame* c, id_list arg_outer_ids)
// if (c) closure = c->SelectiveClone(outer_ids);
}
void Frame::SetTrigger(Trigger* arg_trigger)
void Frame::SetTrigger(trigger::Trigger* arg_trigger)
{
ClearTrigger();

View file

@ -11,7 +11,7 @@
#include "Val.h"
class Trigger;
namespace trigger { class Trigger; }
class CallExpr;
class Frame : public BroObj {
@ -207,9 +207,9 @@ public:
// If the frame is run in the context of a trigger condition evaluation,
// the trigger needs to be registered.
void SetTrigger(Trigger* arg_trigger);
void SetTrigger(trigger::Trigger* arg_trigger);
void ClearTrigger();
Trigger* GetTrigger() const { return trigger; }
trigger::Trigger* GetTrigger() const { return trigger; }
void SetCall(const CallExpr* arg_call) { call = arg_call; }
void ClearCall() { call = 0; }
@ -293,7 +293,7 @@ private:
bool break_before_next_stmt;
bool break_on_return;
Trigger* trigger;
trigger::Trigger* trigger;
const CallExpr* call;
bool delayed;

View file

@ -188,8 +188,8 @@ void ProfileLogger::Log()
dstats.requests, dstats.successful, dstats.failed, dstats.pending,
dstats.cached_hosts, dstats.cached_addresses));
Trigger::Stats tstats;
Trigger::GetStats(&tstats);
trigger::Manager::Stats tstats;
trigger_mgr->GetStats(&tstats);
file->Write(fmt("%.06f Triggers: total=%lu pending=%lu\n", network_time, tstats.total, tstats.pending));

View file

@ -1790,7 +1790,7 @@ Val* WhenStmt::Exec(Frame* f, stmt_flow_type& flow) const
::Ref(timeout);
// The new trigger object will take care of its own deletion.
new Trigger(cond, s1, s2, timeout, f, is_return, location);
new trigger::Trigger(cond, s1, s2, timeout, f, is_return, location);
return 0;
}
@ -1857,4 +1857,3 @@ TraversalCode WhenStmt::Traverse(TraversalCallback* cb) const
tc = cb->PostStmt(this);
HANDLE_TC_STMT_POST(tc);
}

View file

@ -2,10 +2,15 @@
#include "Trigger.h"
#include "Traverse.h"
#include "iosource/Manager.h"
using namespace trigger;
// Callback class to traverse an expression, registering all relevant IDs and
// Vals for change notifications.
namespace trigger {
class TriggerTraversalCallback : public TraversalCallback {
public:
TriggerTraversalCallback(Trigger *arg_trigger)
@ -20,6 +25,8 @@ private:
Trigger* trigger;
};
}
TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr)
{
// We catch all expressions here which in some way reference global
@ -67,6 +74,8 @@ TraversalCode TriggerTraversalCallback::PreExpr(const Expr* expr)
return TC_CONTINUE;
}
namespace trigger {
class TriggerTimer : public Timer {
public:
TriggerTimer(double arg_timeout, Trigger* arg_trigger)
@ -102,13 +111,12 @@ protected:
double time;
};
}
Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts,
Expr* arg_timeout, Frame* arg_frame,
bool arg_is_return, const Location* arg_location)
{
if ( ! pending )
pending = new list<Trigger*>;
cond = arg_cond;
body = arg_body;
timeout_stmts = arg_timeout_stmts;
@ -122,8 +130,6 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts,
location = arg_location;
timeout_value = -1;
++total_triggers;
DBG_LOG(DBG_NOTIFIERS, "%s: instantiating", Name());
if ( is_return )
@ -215,9 +221,6 @@ void Trigger::Init()
cond->Traverse(&cb);
}
Trigger::TriggerList* Trigger::pending = 0;
unsigned long Trigger::total_triggers = 0;
bool Trigger::Eval()
{
if ( disabled )
@ -330,47 +333,6 @@ bool Trigger::Eval()
return true;
}
void Trigger::QueueTrigger(Trigger* trigger)
{
assert(! trigger->disabled);
assert(pending);
if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() )
{
Ref(trigger);
pending->push_back(trigger);
}
}
void Trigger::EvaluatePending()
{
DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers");
if ( ! pending )
return;
// While we iterate over the list, executing statements, we may
// in fact trigger new triggers and thereby modify the list.
// Therefore, we create a new temporary list which will receive
// triggers triggered during this time.
TriggerList* orig = pending;
TriggerList tmp;
pending = &tmp;
for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i )
{
Trigger* t = *i;
(*i)->Eval();
Unref(t);
}
pending = orig;
orig->clear();
// Sigh... Is this really better than a for-loop?
std::copy(tmp.begin(), tmp.end(),
insert_iterator<TriggerList>(*pending, pending->begin()));
}
void Trigger::Timeout()
{
if ( disabled )
@ -484,7 +446,7 @@ void Trigger::Cache(const CallExpr* expr, Val* v)
Ref(v);
QueueTrigger(this);
trigger_mgr->Queue(this);
}
@ -502,6 +464,11 @@ void Trigger::Disable()
disabled = true;
}
void Trigger::Modified(notifier::Modifiable* m)
{
trigger_mgr->Queue(this);
}
const char* Trigger::Name() const
{
assert(location);
@ -509,8 +476,62 @@ const char* Trigger::Name() const
location->first_line, location->last_line);
}
void Trigger::GetStats(Stats* stats)
Manager::Manager() : IOSource()
{
pending = new TriggerList();
iosource_mgr->Register(this, true);
}
Manager::~Manager()
{
delete pending;
}
double Manager::GetNextTimeout()
{
return pending->empty() ? -1 : network_time + 0.100;
}
void Manager::Process()
{
DBG_LOG(DBG_NOTIFIERS, "evaluating all pending triggers");
// While we iterate over the list, executing statements, we may
// in fact trigger new triggers and thereby modify the list.
// Therefore, we create a new temporary list which will receive
// triggers triggered during this time.
TriggerList* orig = pending;
TriggerList tmp;
pending = &tmp;
for ( TriggerList::iterator i = orig->begin(); i != orig->end(); ++i )
{
Trigger* t = *i;
(*i)->Eval();
Unref(t);
}
pending = orig;
orig->clear();
std::swap(tmp, *pending);
}
void Manager::Queue(Trigger* trigger)
{
if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() )
{
Ref(trigger);
pending->push_back(trigger);
total_triggers++;
iosource_mgr->Wakeup(Tag());
}
}
void Manager::GetStats(Stats* stats)
{
stats->total = total_triggers;
stats->pending = pending ? pending->size() : 0;
stats->pending = pending->size();
}

View file

@ -5,7 +5,9 @@
#include "Notifier.h"
#include "Traverse.h"
#include "iosource/IOSource.h"
namespace trigger {
// Triggers are the heart of "when" statements: expressions that when
// they become true execute a body of statements.
@ -58,10 +60,11 @@ public:
void Describe(ODesc* d) const override
{ d->Add("<trigger>"); }
// Overidden from Notifier. We queue the trigger and evaluate it
// later to avoid race conditions.
void Modified(notifier::Modifiable* m) override
{ QueueTrigger(this); }
void Modified(notifier::Modifiable* m) override;
// Overridden from notifer::Receiver. If we're still waiting
// on an ID/Val to be modified at termination time, we can't hope
// for any further progress to be made, so just Unref ourselves.
@ -69,18 +72,6 @@ public:
const char* Name() const;
static void QueueTrigger(Trigger* trigger);
// Evaluates all queued Triggers.
static void EvaluatePending();
struct Stats {
unsigned long total;
unsigned long pending;
};
static void GetStats(Stats* stats);
private:
friend class TriggerTraversalCallback;
friend class TriggerTimer;
@ -107,11 +98,36 @@ private:
std::vector<std::pair<BroObj *, notifier::Modifiable*>> objs;
typedef map<const CallExpr*, Val*> ValCache;
using ValCache = map<const CallExpr*, Val*>;
ValCache cache;
typedef list<Trigger*> TriggerList;
static TriggerList* pending;
static unsigned long total_triggers;
};
class Manager : public iosource::IOSource {
public:
Manager();
~Manager();
double GetNextTimeout() override;
void Process() override;
const char* Tag() override { return "TriggerMgr"; }
void Queue(Trigger* trigger);
struct Stats {
unsigned long total;
unsigned long pending;
};
void GetStats(Stats* stats);
private:
using TriggerList = std::list<Trigger*>;
TriggerList* pending;
unsigned long total_triggers = 0;
};
}
extern trigger::Manager* trigger_mgr;

View file

@ -66,7 +66,7 @@ inline RecordVal* query_result(RecordVal* data)
*/
class StoreQueryCallback {
public:
StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call,
StoreQueryCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call,
broker::store store)
: trigger(arg_trigger), call(arg_call), store(move(store))
{
@ -101,7 +101,7 @@ public:
private:
Trigger* trigger;
trigger::Trigger* trigger;
const CallExpr* call;
broker::store store;
};

View file

@ -41,6 +41,7 @@ extern "C" {
#include "Stats.h"
#include "Brofiler.h"
#include "Traverse.h"
#include "Trigger.h"
#include "supervisor/Supervisor.h"
#include "threading/Manager.h"
@ -93,6 +94,7 @@ zeekygen::Manager* zeekygen_mgr = 0;
iosource::Manager* iosource_mgr = 0;
bro_broker::Manager* broker_mgr = 0;
zeek::Supervisor* zeek::supervisor_mgr = 0;
trigger::Manager* trigger_mgr = 0;
std::vector<std::string> zeek_script_prefixes;
Stmt* stmts;
@ -594,6 +596,7 @@ int main(int argc, char** argv)
input_mgr = new input::Manager();
file_mgr = new file_analysis::Manager();
broker_mgr = new bro_broker::Manager(! options.pcap_file.empty());
trigger_mgr = new trigger::Manager();
plugin_mgr->InitPreScript();
analyzer_mgr->InitPreScript();

View file

@ -3503,7 +3503,7 @@ function dump_packet%(pkt: pcap_packet, file_name: string%) : bool
class LookupHostCallback : public DNS_Mgr::LookupCallback {
public:
LookupHostCallback(Trigger* arg_trigger, const CallExpr* arg_call,
LookupHostCallback(trigger::Trigger* arg_trigger, const CallExpr* arg_call,
bool arg_lookup_name)
{
Ref(arg_trigger);
@ -3556,7 +3556,7 @@ public:
}
private:
Trigger* trigger;
trigger::Trigger* trigger;
const CallExpr* call;
bool lookup_name;
};
@ -3575,7 +3575,7 @@ function lookup_addr%(host: addr%) : string
%{
// FIXME: It should be easy to adapt the function to synchronous
// lookups if we're reading a trace.
Trigger* trigger = frame->GetTrigger();
trigger::Trigger* trigger = frame->GetTrigger();
if ( ! trigger)
{
@ -3604,7 +3604,7 @@ function lookup_hostname_txt%(host: string%) : string
%{
// FIXME: Is should be easy to adapt the function to synchronous
// lookups if we're reading a trace.
Trigger* trigger = frame->GetTrigger();
trigger::Trigger* trigger = frame->GetTrigger();
if ( ! trigger)
{
@ -3633,7 +3633,7 @@ function lookup_hostname%(host: string%) : addr_set
%{
// FIXME: Is should be easy to adapt the function to synchronous
// lookups if we're reading a trace.
Trigger* trigger = frame->GetTrigger();
trigger::Trigger* trigger = frame->GetTrigger();
if ( ! trigger)
{