Put timers with 5- and 6-second expirations into separate ordered bucket to avoid extra priority queue operations

This commit is contained in:
Tim Wojtulewicz 2020-08-17 16:08:17 -07:00
parent 92d2d0c6b8
commit d32e764fa9
2 changed files with 120 additions and 19 deletions

View file

@ -75,6 +75,8 @@ TimerMgr::TimerMgr() {
iosource_mgr->Register(this, true); iosource_mgr->Register(this, true);
} }
TimerMgr::~TimerMgr() { q.reset(); }
int TimerMgr::Advance(double arg_t, int max_expire) { int TimerMgr::Advance(double arg_t, int max_expire) {
DBG_LOG(DBG_TM, "advancing timer mgr to %.6f", arg_t); DBG_LOG(DBG_TM, "advancing timer mgr to %.6f", arg_t);
@ -120,11 +122,20 @@ void TimerMgr::InitPostScript() {
void TimerMgr::Add(Timer* timer) { void TimerMgr::Add(Timer* timer) {
DBG_LOG(DBG_TM, "Adding timer %s (%p) at %.6f", timer_type_to_string(timer->Type()), timer, timer->Time()); DBG_LOG(DBG_TM, "Adding timer %s (%p) at %.6f", timer_type_to_string(timer->Type()), timer, timer->Time());
// Add the timer even if it's already expired - that way, if if ( timer->Time() - run_state::network_time == 5.0 )
// multiple already-added timers are added, they'll still q_5s.push_back(timer);
// execute in sorted order. else if ( timer->Time() - run_state::network_time == 6.0 )
if ( ! q->Add(timer) ) q_6s.push_back(timer);
reporter->InternalError("out of memory"); else
// Add the timer even if it's already expired - that way, if
// multiple already-added timers are added, they'll still
// execute in sorted order.
if ( ! q->Add(timer) )
reporter->InternalError("out of memory");
cumulative_num++;
if ( Size() > peak_size )
peak_size = Size();
++current_timers[timer->Type()]; ++current_timers[timer->Type()];
} }
@ -140,45 +151,124 @@ void TimerMgr::Expire() {
} }
int TimerMgr::DoAdvance(double new_t, int max_expire) { int TimerMgr::DoAdvance(double new_t, int max_expire) {
Timer* timer = Top(); auto res = Top();
QueueIndex index = res.first;
Timer* timer = res.second;
for ( num_expired = 0; (num_expired < max_expire || dispatch_all_expired) && timer && timer->Time() <= new_t; for ( num_expired = 0; (num_expired < max_expire || dispatch_all_expired) && timer && timer->Time() <= new_t;
++num_expired ) { ++num_expired ) {
last_timestamp = timer->Time(); last_timestamp = timer->Time();
--current_timers[timer->Type()]; --current_timers[timer->Type()];
// Remove it before dispatching, since the dispatch Remove(index);
// can otherwise delete it, and then we won't know
// whether we should delete it too. if ( timer->active ) {
(void)Remove(); DBG_LOG(zeek::DBG_TM, "Dispatching timer %s (%p)", timer_type_to_string(timer->Type()), timer);
timer->Dispatch(new_t, false);
}
else {
--num_expired;
}
DBG_LOG(DBG_TM, "Dispatching timer %s (%p)", timer_type_to_string(timer->Type()), timer); DBG_LOG(DBG_TM, "Dispatching timer %s (%p)", timer_type_to_string(timer->Type()), timer);
timer->Dispatch(new_t, false); timer->Dispatch(new_t, false);
delete timer; delete timer;
timer = Top(); res = Top();
index = res.first;
timer = res.second;
} }
return num_expired; return num_expired;
} }
void TimerMgr::Remove(Timer* timer) { void TimerMgr::Remove(Timer* timer) {
timer->active = false;
std::deque<Timer*>::iterator it;
if ( ! q_5s.empty() ) {
it = std::find(q_5s.begin(), q_5s.end(), timer);
if ( it != q_5s.end() ) {
q_5s.erase(it);
--current_timers[timer->Type()];
delete timer;
return;
}
}
if ( ! q_6s.empty() ) {
it = std::find(q_6s.begin(), q_6s.end(), timer);
if ( it != q_6s.end() ) {
q_6s.erase(it);
--current_timers[timer->Type()];
delete timer;
return;
}
}
if ( ! q->Remove(timer) ) if ( ! q->Remove(timer) )
reporter->InternalError("asked to remove a missing timer"); reporter->InternalError("asked to remove a missing timer");
--current_timers[timer->Type()]; --current_timers[timer->Type()];
delete timer; delete timer;
} }
double TimerMgr::GetNextTimeout() { double TimerMgr::GetNextTimeout() {
Timer* top = Top(); const auto& [index, top] = Top();
if ( top ) if ( top )
return std::max(0.0, top->Time() - run_state::network_time); return std::max(0.0, top->Time() - run_state::network_time);
return -1; return -1;
} }
Timer* TimerMgr::Remove() { return (Timer*)q->Remove(); } Timer* TimerMgr::Remove(QueueIndex index) {
Timer* top = nullptr;
if ( index == QueueIndex::NONE ) {
auto res = Top();
index = res.first;
top = res.second;
}
Timer* TimerMgr::Top() { return (Timer*)q->Top(); } if ( index == QueueIndex::Q5 )
q_5s.pop_front();
else if ( index == QueueIndex::Q6 )
q_6s.pop_front();
else if ( index == QueueIndex::PQ )
q->Remove();
return top;
}
std::pair<TimerMgr::QueueIndex, Timer*> TimerMgr::Top() {
Timer* top = nullptr;
QueueIndex index = QueueIndex::NONE;
if ( ! q_5s.empty() ) {
top = q_5s.front();
index = QueueIndex::Q5;
}
if ( ! q_6s.empty() ) {
Timer* t = q_6s.front();
if ( ! top || t->Time() < top->Time() ) {
top = t;
index = QueueIndex::Q6;
}
}
if ( q->Size() > 0 ) {
Timer* t = static_cast<Timer*>(q->Top());
if ( ! top || t->Time() < top->Time() ) {
index = QueueIndex::PQ;
top = t;
}
}
return {index, top};
}
} // namespace zeek::detail } // namespace zeek::detail

View file

@ -3,6 +3,7 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include <deque>
#include <memory> #include <memory>
#include "zeek/PriorityQueue.h" #include "zeek/PriorityQueue.h"
@ -78,6 +79,8 @@ public:
void Describe(ODesc* d) const; void Describe(ODesc* d) const;
bool active = true;
protected: protected:
TimerType type{}; TimerType type{};
}; };
@ -85,6 +88,7 @@ protected:
class TimerMgr final : public iosource::IOSource { class TimerMgr final : public iosource::IOSource {
public: public:
TimerMgr(); TimerMgr();
~TimerMgr();
void Add(Timer* timer); void Add(Timer* timer);
@ -122,9 +126,9 @@ public:
double Time() const { return t ? t : 1; } // 1 > 0 double Time() const { return t ? t : 1; } // 1 > 0
size_t Size() const { return q->Size(); } size_t Size() const { return q->Size() + q_5s.size() + q_6s.size(); }
size_t PeakSize() const { return q->PeakSize(); } size_t PeakSize() const { return peak_size; }
size_t CumulativeNum() const { return q->CumulativeNum(); } size_t CumulativeNum() const { return cumulative_num; }
double LastTimestamp() const { return last_timestamp; } double LastTimestamp() const { return last_timestamp; }
@ -147,11 +151,13 @@ public:
void InitPostScript(); void InitPostScript();
private: private:
enum class QueueIndex { NONE, Q5, Q6, PQ };
int DoAdvance(double t, int max_expire); int DoAdvance(double t, int max_expire);
void Remove(Timer* timer); void Remove(Timer* timer);
Timer* Remove(); Timer* Remove(QueueIndex index = QueueIndex::NONE);
Timer* Top(); std::pair<QueueIndex, Timer*> Top();
double t; double t;
double last_timestamp; double last_timestamp;
@ -168,7 +174,12 @@ private:
telemetry::GaugePtr lag_time_metric; telemetry::GaugePtr lag_time_metric;
telemetry::GaugePtr current_timer_metrics[NUM_TIMER_TYPES]; telemetry::GaugePtr current_timer_metrics[NUM_TIMER_TYPES];
size_t peak_size = 0;
size_t cumulative_num = 0;
std::unique_ptr<PriorityQueue> q; std::unique_ptr<PriorityQueue> q;
std::deque<Timer*> q_5s;
std::deque<Timer*> q_6s;
}; };
extern TimerMgr* timer_mgr; extern TimerMgr* timer_mgr;