Make TimerMgr an IOSource

- This allows the loop to check what the next timeout is and use that as the basis for the timeout of the poll
- This commit also removes the TimerMgr::Tag type, since it causes a name clash with other code in IOSource
This commit is contained in:
Tim Wojtulewicz 2019-11-26 12:37:22 -07:00
parent 4fa3e4b9b4
commit 92bde61b78
7 changed files with 101 additions and 39 deletions

View file

@ -115,8 +115,8 @@ Connection::Connection(NetSessions* s, const ConnIDKey& k, double t, const ConnI
++current_connections; ++current_connections;
++total_connections; ++total_connections;
TimerMgr::Tag* tag = current_iosrc->GetCurrentTag(); std::string* tag = current_iosrc->GetCurrentTag();
conn_timer_mgr = tag ? new TimerMgr::Tag(*tag) : 0; conn_timer_mgr = tag ? new std::string(*tag) : nullptr;
if ( arg_encap ) if ( arg_encap )
encapsulation = new EncapsulationStack(*arg_encap); encapsulation = new EncapsulationStack(*arg_encap);

View file

@ -321,7 +321,7 @@ protected:
bool key_valid; bool key_valid;
// Timer manager to use for this conn (or nil). // Timer manager to use for this conn (or nil).
TimerMgr::Tag* conn_timer_mgr; std::string* conn_timer_mgr;
timer_list timers; timer_list timers;
IPAddr orig_addr; IPAddr orig_addr;

View file

@ -1338,7 +1338,7 @@ bool NetSessions::WantConnection(uint16_t src_port, uint16_t dst_port,
return true; return true;
} }
TimerMgr* NetSessions::LookupTimerMgr(const TimerMgr::Tag* tag, bool create) TimerMgr* NetSessions::LookupTimerMgr(const std::string* tag, bool create)
{ {
if ( ! tag ) if ( ! tag )
{ {

View file

@ -104,7 +104,7 @@ public:
// Looks up timer manager associated with tag. If tag is unknown and // Looks up timer manager associated with tag. If tag is unknown and
// "create" is true, creates new timer manager and stores it. Returns // "create" is true, creates new timer manager and stores it. Returns
// global timer manager if tag is nil. // global timer manager if tag is nil.
TimerMgr* LookupTimerMgr(const TimerMgr::Tag* tag, bool create = true); TimerMgr* LookupTimerMgr(const std::string* tag, bool create = true);
void ExpireTimerMgrs(); void ExpireTimerMgrs();
@ -245,8 +245,8 @@ protected:
PacketProfiler* pkt_profiler; PacketProfiler* pkt_profiler;
// We may use independent timer managers for different sets of related // We may use independent timer managers for different sets of related
// activity. The managers are identified by an unique tag. // activity. The managers are identified by a unique tag.
typedef std::map<TimerMgr::Tag, TimerMgr*> TimerMgrMap; typedef std::map<std::string, TimerMgr*> TimerMgrMap;
TimerMgrMap timer_mgrs; TimerMgrMap timer_mgrs;
}; };

View file

@ -5,7 +5,11 @@
#include "util.h" #include "util.h"
#include "Timer.h" #include "Timer.h"
#include "Desc.h" #include "Desc.h"
#include "Net.h"
#include "NetVar.h"
#include "broker/Manager.h" #include "broker/Manager.h"
#include "iosource/Manager.h"
#include "iosource/PktSrc.h"
// Names of timers in same order than in TimerType. // Names of timers in same order than in TimerType.
const char* TimerNames[] = { const char* TimerNames[] = {
@ -55,6 +59,17 @@ void Timer::Describe(ODesc* d) const
unsigned int TimerMgr::current_timers[NUM_TIMER_TYPES]; unsigned int TimerMgr::current_timers[NUM_TIMER_TYPES];
TimerMgr::TimerMgr(const std::string& arg_tag)
{
t = 0.0;
num_expired = 0;
last_advance = last_timestamp = 0;
tag = arg_tag;
if ( iosource_mgr )
iosource_mgr->Register(this, true);
}
TimerMgr::~TimerMgr() TimerMgr::~TimerMgr()
{ {
DBG_LOG(DBG_TM, "deleting timer mgr %p", this); DBG_LOG(DBG_TM, "deleting timer mgr %p", this);
@ -74,8 +89,28 @@ int TimerMgr::Advance(double arg_t, int max_expire)
return DoAdvance(t, max_expire); return DoAdvance(t, max_expire);
} }
void TimerMgr::Process()
{
// If we don't have a source, or the source is closed, or we're reading live (which includes
// pseudo-realtime), advance the timer here to the current time since otherwise it won't
// move forward and the timers won't fire correctly.
iosource::PktSrc* pkt_src = iosource_mgr->GetPktSrc();
if ( ! pkt_src || ! pkt_src->IsOpen() || reading_live )
net_update_time(current_time());
PQ_TimerMgr::PQ_TimerMgr(const Tag& tag) : TimerMgr(tag) // Just advance the timer manager based on the current network time. This won't actually
// change the time, but will dispatch any timers that need dispatching.
current_dispatched += Advance(network_time, max_timer_expires - current_dispatched);
}
void TimerMgr::InitPostScript()
{
if ( iosource_mgr )
iosource_mgr->Register(this, true);
}
PQ_TimerMgr::PQ_TimerMgr(const std::string& tag) : TimerMgr(tag)
{ {
q = new PriorityQueue; q = new PriorityQueue;
} }
@ -145,3 +180,12 @@ void PQ_TimerMgr::Remove(Timer* timer)
--current_timers[timer->Type()]; --current_timers[timer->Type()];
delete timer; delete timer;
} }
double PQ_TimerMgr::GetNextTimeout()
{
Timer* top = Top();
if ( top )
return std::max(0.0, top->Time() - ::network_time);
return -1;
}

View file

@ -2,10 +2,9 @@
#pragma once #pragma once
#include <string>
#include <string> #include <string>
#include "PriorityQueue.h" #include "PriorityQueue.h"
#include "iosource/IOSource.h"
// If you add a timer here, adjust TimerNames in Timer.cc. // If you add a timer here, adjust TimerNames in Timer.cc.
enum TimerType : uint8_t { enum TimerType : uint8_t {
@ -51,7 +50,7 @@ public:
Timer(double t, TimerType arg_type) : PQ_Element(t), type(arg_type) {} Timer(double t, TimerType arg_type) : PQ_Element(t), type(arg_type) {}
~Timer() override { } ~Timer() override { }
TimerType Type() const { return (TimerType) type; } TimerType Type() const { return type; }
// t gives the dispatch time. is_expire is true if the // t gives the dispatch time. is_expire is true if the
// timer is being dispatched because we're expiring all // timer is being dispatched because we're expiring all
@ -65,53 +64,73 @@ protected:
TimerType type; TimerType type;
}; };
class TimerMgr { class TimerMgr : public iosource::IOSource {
public: public:
virtual ~TimerMgr(); virtual ~TimerMgr();
virtual void Add(Timer* timer) = 0; virtual void Add(Timer* timer) = 0;
// Advance the clock to time t, expiring at most max_expire timers. /**
// Returns number of timers expired. * Advance the clock to time t, expiring at most max_expire timers.
*
* @param t the new time.
* @param max_expire the maximum number of timers to expire.
* @return the number of timers expired.
*/
int Advance(double t, int max_expire); int Advance(double t, int max_expire);
// Returns the number of timers expired (so far) during the current /**
// or most recent advance. * Returns the number of timers expired (so far) during the current
* or most recent advance.
*/
int NumExpiredDuringCurrentAdvance() { return num_expired; } int NumExpiredDuringCurrentAdvance() { return num_expired; }
// Expire all timers. /**
* Expire all timers.
*/
virtual void Expire() = 0; virtual void Expire() = 0;
// Cancel() is a method separate from Remove because /**
// (1) Remove is protected, but, more importantly, (2) in some * Removes a timer. Cancel() is a method separate from Remove()
// timer schemes we have wound up separating timer cancelation * because (1) Remove is protected, but, more importantly, (2)
// from removing it from the manager's data structures, because * in some timer schemes we have wound up separating timer
// the manager lacked an efficient way to find it. * cancelation from removing it from the manager's data structures,
* because the manager lacked an efficient way to find it.
*
* @param timer the timer to cancel
*/
void Cancel(Timer* timer) { Remove(timer); } void Cancel(Timer* timer) { Remove(timer); }
double Time() const { return t ? t : 1; } // 1 > 0 double Time() const { return t ? t : 1; } // 1 > 0
typedef std::string Tag; const std::string& GetTag() const { return tag; }
const Tag& GetTag() const { return tag; }
virtual int Size() const = 0; virtual int Size() const = 0;
virtual int PeakSize() const = 0; virtual int PeakSize() const = 0;
virtual uint64_t CumulativeNum() const = 0; virtual uint64_t CumulativeNum() const = 0;
double LastTimestamp() const { return last_timestamp; } double LastTimestamp() const { return last_timestamp; }
// Returns time of last advance in global network time.
/**
* Returns time of last advance in global network time
*/
double LastAdvance() const { return last_advance; } double LastAdvance() const { return last_advance; }
static unsigned int* CurrentTimers() { return current_timers; } static unsigned int* CurrentTimers() { return current_timers; }
// IOSource API methods
virtual double GetNextTimeout() override { return -1; }
virtual void Process() override;
virtual const char* Tag() override { return fmt("TimerMgr %s", tag.c_str()); }
/**
* Performs some extra initialization on a timer manager. This shouldn't
* need to be called for managers other than the global one.
*/
void InitPostScript();
protected: protected:
explicit TimerMgr(const Tag& arg_tag) explicit TimerMgr(const std::string& arg_tag);
{
t = 0.0;
num_expired = 0;
last_advance = last_timestamp = 0;
tag = arg_tag;
}
virtual int DoAdvance(double t, int max_expire) = 0; virtual int DoAdvance(double t, int max_expire) = 0;
virtual void Remove(Timer* timer) = 0; virtual void Remove(Timer* timer) = 0;
@ -119,7 +138,7 @@ protected:
double t; double t;
double last_timestamp; double last_timestamp;
double last_advance; double last_advance;
Tag tag; std::string tag;
int num_expired; int num_expired;
@ -128,7 +147,7 @@ protected:
class PQ_TimerMgr : public TimerMgr { class PQ_TimerMgr : public TimerMgr {
public: public:
explicit PQ_TimerMgr(const Tag& arg_tag); explicit PQ_TimerMgr(const std::string& arg_tag);
~PQ_TimerMgr() override; ~PQ_TimerMgr() override;
void Add(Timer* timer) override; void Add(Timer* timer) override;
@ -137,6 +156,7 @@ public:
int Size() const override { return q->Size(); } int Size() const override { return q->Size(); }
int PeakSize() const override { return q->PeakSize(); } int PeakSize() const override { return q->PeakSize(); }
uint64_t CumulativeNum() const override { return q->CumulativeNum(); } uint64_t CumulativeNum() const override { return q->CumulativeNum(); }
double GetNextTimeout() override;
protected: protected:
int DoAdvance(double t, int max_expire) override; int DoAdvance(double t, int max_expire) override;

View file

@ -294,12 +294,10 @@ void terminate_bro()
plugin_mgr->FinishPlugins(); plugin_mgr->FinishPlugins();
delete zeekygen_mgr; delete zeekygen_mgr;
delete timer_mgr;
delete event_registry; delete event_registry;
delete analyzer_mgr; delete analyzer_mgr;
delete file_mgr; delete file_mgr;
// broker_mgr is deleted via iosource_mgr // broker_mgr, timer_mgr, and supervisor are deleted via iosource_mgr
// supervisor is deleted via iosource_mgr
delete iosource_mgr; delete iosource_mgr;
delete log_mgr; delete log_mgr;
delete reporter; delete reporter;
@ -550,7 +548,6 @@ int main(int argc, char** argv)
#endif #endif
timer_mgr = new PQ_TimerMgr("<GLOBAL>"); timer_mgr = new PQ_TimerMgr("<GLOBAL>");
// timer_mgr = new CQ_TimerMgr();
auto zeekygen_cfg = options.zeekygen_config_file.value_or(""); auto zeekygen_cfg = options.zeekygen_config_file.value_or("");
zeekygen_mgr = new zeekygen::Manager(zeekygen_cfg, bro_argv[0]); zeekygen_mgr = new zeekygen::Manager(zeekygen_cfg, bro_argv[0]);
@ -671,6 +668,7 @@ int main(int argc, char** argv)
plugin_mgr->InitPostScript(); plugin_mgr->InitPostScript();
zeekygen_mgr->InitPostScript(); zeekygen_mgr->InitPostScript();
broker_mgr->InitPostScript(); broker_mgr->InitPostScript();
timer_mgr->InitPostScript();
if ( options.print_plugins ) if ( options.print_plugins )
{ {