GH-773: Make EventMgr an IOSource so that events interrupt kevent waits

This commit is contained in:
Tim Wojtulewicz 2020-02-04 12:58:55 -07:00
parent 5171f25e3a
commit 4f23c0360a
6 changed files with 59 additions and 9 deletions

View file

@ -9,6 +9,9 @@
#include "Trigger.h" #include "Trigger.h"
#include "Val.h" #include "Val.h"
#include "plugin/Manager.h" #include "plugin/Manager.h"
#include "iosource/Manager.h"
#include "iosource/PktSrc.h"
#include "Net.h"
EventMgr mgr; EventMgr mgr;
@ -124,7 +127,10 @@ void EventMgr::QueueEvent(Event* event)
return; return;
if ( ! head ) if ( ! head )
{
head = tail = event; head = tail = event;
queue_flare.Fire();
}
else else
{ {
tail->SetNext(event); tail->SetNext(event);
@ -204,3 +210,29 @@ void EventMgr::Describe(ODesc* d) const
d->NL(); d->NL();
} }
} }
void EventMgr::Process()
{
// If we don't have a source, or the source is closed, or we're
// reading live (which includes pseudo-realtime), advance the time
// here to the current time since otherwise it won't move forward.
iosource::PktSrc* pkt_src = iosource_mgr->GetPktSrc();
if ( ! pkt_src || ! pkt_src->IsOpen() || reading_live )
net_update_time(current_time());
queue_flare.Extinguish();
// While it semes like the most logical thing to do, we dont want
// to call Drain() as part of this method. It will get called at
// the end of net_run after all of the sources have been processed
// and had the opportunity to spawn new events. We could use
// iosource_mgr->Wakeup() instead of making EventMgr an IOSource,
// but then we couldn't update the time above and nothing would
// drive it forward.
}
void EventMgr::InitPostScript()
{
iosource_mgr->Register(this, true, false);
iosource_mgr->RegisterFd(queue_flare.FD(), this);
}

View file

@ -4,6 +4,8 @@
#include "BroList.h" #include "BroList.h"
#include "analyzer/Analyzer.h" #include "analyzer/Analyzer.h"
#include "iosource/IOSource.h"
#include "Flare.h"
class EventMgr; class EventMgr;
@ -49,7 +51,7 @@ protected:
extern uint64_t num_events_queued; extern uint64_t num_events_queued;
extern uint64_t num_events_dispatched; extern uint64_t num_events_dispatched;
class EventMgr : public BroObj { class EventMgr : public BroObj, public iosource::IOSource {
public: public:
EventMgr(); EventMgr();
~EventMgr() override; ~EventMgr() override;
@ -113,6 +115,11 @@ public:
void Describe(ODesc* d) const override; void Describe(ODesc* d) const override;
double GetNextTimeout() override { return -1; }
void Process() override;
const char* Tag() override { return "EventManager"; }
void InitPostScript();
protected: protected:
void QueueEvent(Event* event); void QueueEvent(Event* event);
@ -123,6 +130,7 @@ protected:
TimerMgr* current_mgr; TimerMgr* current_mgr;
RecordVal* src_val; RecordVal* src_val;
bool draining; bool draining;
bro::Flare queue_flare;
}; };
extern EventMgr mgr; extern EventMgr mgr;

View file

@ -58,9 +58,13 @@ Manager::~Manager()
for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i )
{ {
(*i)->src->Done(); auto src = *i;
delete (*i)->src; src->src->Done();
delete *i;
if ( src->manage_lifetime )
delete src->src;
delete src;
} }
sources.clear(); sources.clear();
@ -152,6 +156,9 @@ void Manager::FindReadySources(std::vector<IOSource*>* ready)
} }
} }
DBG_LOG(DBG_MAINLOOP, "timeout: %f ready size: %ld time_to_poll: %d\n",
timeout, ready->size(), time_to_poll);
// If we didn't find any IOSources with zero timeouts or it's time to // If we didn't find any IOSources with zero timeouts or it's time to
// force a poll, do that and return. Otherwise return the set of ready // force a poll, do that and return. Otherwise return the set of ready
// sources that we have. // sources that we have.
@ -253,7 +260,7 @@ bool Manager::UnregisterFd(int fd, IOSource* src)
} }
} }
void Manager::Register(IOSource* src, bool dont_count) void Manager::Register(IOSource* src, bool dont_count, bool manage_lifetime)
{ {
// First see if we already have registered that source. If so, just // First see if we already have registered that source. If so, just
// adjust dont_count. // adjust dont_count.
@ -273,6 +280,7 @@ void Manager::Register(IOSource* src, bool dont_count)
Source* s = new Source; Source* s = new Source;
s->src = src; s->src = src;
s->dont_count = dont_count; s->dont_count = dont_count;
s->manage_lifetime = manage_lifetime;
if ( dont_count ) if ( dont_count )
++dont_counts; ++dont_counts;

View file

@ -53,7 +53,7 @@ public:
* sources except for the non-counting ones have gone dry, processing * sources except for the non-counting ones have gone dry, processing
* will shut down. * will shut down.
*/ */
void Register(IOSource* src, bool dont_count = false); void Register(IOSource* src, bool dont_count = false, bool manage_lifetime = true);
/** /**
* Returns the number of registered and still active sources, * Returns the number of registered and still active sources,
@ -182,8 +182,9 @@ private:
}; };
struct Source { struct Source {
IOSource* src; IOSource* src = nullptr;
bool dont_count; bool dont_count = false;
bool manage_lifetime = false;
}; };
using SourceList = std::vector<Source*>; using SourceList = std::vector<Source*>;

View file

@ -673,6 +673,7 @@ int main(int argc, char** argv)
zeekygen_mgr->InitPostScript(); zeekygen_mgr->InitPostScript();
broker_mgr->InitPostScript(); broker_mgr->InitPostScript();
timer_mgr->InitPostScript(); timer_mgr->InitPostScript();
mgr.InitPostScript();
if ( zeek::supervisor_mgr ) if ( zeek::supervisor_mgr )
zeek::supervisor_mgr->InitPostScript(); zeek::supervisor_mgr->InitPostScript();