diff --git a/src/Event.cc b/src/Event.cc index 89fdd5d907..292d70b05f 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -9,6 +9,9 @@ #include "Trigger.h" #include "Val.h" #include "plugin/Manager.h" +#include "iosource/Manager.h" +#include "iosource/PktSrc.h" +#include "Net.h" EventMgr mgr; @@ -124,7 +127,10 @@ void EventMgr::QueueEvent(Event* event) return; if ( ! head ) + { head = tail = event; + queue_flare.Fire(); + } else { tail->SetNext(event); @@ -204,3 +210,29 @@ void EventMgr::Describe(ODesc* d) const d->NL(); } } + +void EventMgr::Process() + { + // If we don't have a source, or the source is closed, or we're + // reading live (which includes pseudo-realtime), advance the time + // here to the current time since otherwise it won't move forward. + iosource::PktSrc* pkt_src = iosource_mgr->GetPktSrc(); + if ( ! pkt_src || ! pkt_src->IsOpen() || reading_live ) + net_update_time(current_time()); + + queue_flare.Extinguish(); + + // While it semes like the most logical thing to do, we dont want + // to call Drain() as part of this method. It will get called at + // the end of net_run after all of the sources have been processed + // and had the opportunity to spawn new events. We could use + // iosource_mgr->Wakeup() instead of making EventMgr an IOSource, + // but then we couldn't update the time above and nothing would + // drive it forward. + } + +void EventMgr::InitPostScript() + { + iosource_mgr->Register(this, true, false); + iosource_mgr->RegisterFd(queue_flare.FD(), this); + } diff --git a/src/Event.h b/src/Event.h index 1482ecb83e..ce15cc5f52 100644 --- a/src/Event.h +++ b/src/Event.h @@ -4,6 +4,8 @@ #include "BroList.h" #include "analyzer/Analyzer.h" +#include "iosource/IOSource.h" +#include "Flare.h" class EventMgr; @@ -49,7 +51,7 @@ protected: extern uint64_t num_events_queued; extern uint64_t num_events_dispatched; -class EventMgr : public BroObj { +class EventMgr : public BroObj, public iosource::IOSource { public: EventMgr(); ~EventMgr() override; @@ -113,6 +115,11 @@ public: void Describe(ODesc* d) const override; + double GetNextTimeout() override { return -1; } + void Process() override; + const char* Tag() override { return "EventManager"; } + void InitPostScript(); + protected: void QueueEvent(Event* event); @@ -123,6 +130,7 @@ protected: TimerMgr* current_mgr; RecordVal* src_val; bool draining; + bro::Flare queue_flare; }; extern EventMgr mgr; diff --git a/src/iosource/Manager.cc b/src/iosource/Manager.cc index ded3348040..d205c007fb 100644 --- a/src/iosource/Manager.cc +++ b/src/iosource/Manager.cc @@ -58,9 +58,13 @@ Manager::~Manager() for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) { - (*i)->src->Done(); - delete (*i)->src; - delete *i; + auto src = *i; + src->src->Done(); + + if ( src->manage_lifetime ) + delete src->src; + + delete src; } sources.clear(); @@ -152,6 +156,9 @@ void Manager::FindReadySources(std::vector* ready) } } + DBG_LOG(DBG_MAINLOOP, "timeout: %f ready size: %ld time_to_poll: %d\n", + timeout, ready->size(), time_to_poll); + // If we didn't find any IOSources with zero timeouts or it's time to // force a poll, do that and return. Otherwise return the set of ready // sources that we have. @@ -253,7 +260,7 @@ bool Manager::UnregisterFd(int fd, IOSource* src) } } -void Manager::Register(IOSource* src, bool dont_count) +void Manager::Register(IOSource* src, bool dont_count, bool manage_lifetime) { // First see if we already have registered that source. If so, just // adjust dont_count. @@ -273,6 +280,7 @@ void Manager::Register(IOSource* src, bool dont_count) Source* s = new Source; s->src = src; s->dont_count = dont_count; + s->manage_lifetime = manage_lifetime; if ( dont_count ) ++dont_counts; diff --git a/src/iosource/Manager.h b/src/iosource/Manager.h index 2b0f4bec9a..93cdd998d7 100644 --- a/src/iosource/Manager.h +++ b/src/iosource/Manager.h @@ -53,7 +53,7 @@ public: * sources except for the non-counting ones have gone dry, processing * will shut down. */ - void Register(IOSource* src, bool dont_count = false); + void Register(IOSource* src, bool dont_count = false, bool manage_lifetime = true); /** * Returns the number of registered and still active sources, @@ -182,8 +182,9 @@ private: }; struct Source { - IOSource* src; - bool dont_count; + IOSource* src = nullptr; + bool dont_count = false; + bool manage_lifetime = false; }; using SourceList = std::vector; diff --git a/src/main.cc b/src/main.cc index 344218331a..152e3c77e9 100644 --- a/src/main.cc +++ b/src/main.cc @@ -673,6 +673,7 @@ int main(int argc, char** argv) zeekygen_mgr->InitPostScript(); broker_mgr->InitPostScript(); timer_mgr->InitPostScript(); + mgr.InitPostScript(); if ( zeek::supervisor_mgr ) zeek::supervisor_mgr->InitPostScript(); diff --git a/testing/btest/Baseline/core.recursive-event/output b/testing/btest/Baseline/core.recursive-event/output index ec635144f6..f599e28b8a 100644 --- a/testing/btest/Baseline/core.recursive-event/output +++ b/testing/btest/Baseline/core.recursive-event/output @@ -1 +1 @@ -9 +10