From d32e764fa9d57db49384ae1784f0da1943462ad7 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 17 Aug 2020 16:08:17 -0700 Subject: [PATCH] Put timers with 5- and 6-second expirations into separate ordered bucket to avoid extra priority queue operations --- src/Timer.cc | 118 +++++++++++++++++++++++++++++++++++++++++++++------ src/Timer.h | 21 ++++++--- 2 files changed, 120 insertions(+), 19 deletions(-) diff --git a/src/Timer.cc b/src/Timer.cc index 70fa7f4ff7..f2afe5461e 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -75,6 +75,8 @@ TimerMgr::TimerMgr() { iosource_mgr->Register(this, true); } +TimerMgr::~TimerMgr() { q.reset(); } + int TimerMgr::Advance(double arg_t, int max_expire) { DBG_LOG(DBG_TM, "advancing timer mgr to %.6f", arg_t); @@ -120,11 +122,20 @@ void TimerMgr::InitPostScript() { void TimerMgr::Add(Timer* timer) { DBG_LOG(DBG_TM, "Adding timer %s (%p) at %.6f", timer_type_to_string(timer->Type()), timer, timer->Time()); - // Add the timer even if it's already expired - that way, if - // multiple already-added timers are added, they'll still - // execute in sorted order. - if ( ! q->Add(timer) ) - reporter->InternalError("out of memory"); + if ( timer->Time() - run_state::network_time == 5.0 ) + q_5s.push_back(timer); + else if ( timer->Time() - run_state::network_time == 6.0 ) + q_6s.push_back(timer); + else + // Add the timer even if it's already expired - that way, if + // multiple already-added timers are added, they'll still + // execute in sorted order. + if ( ! q->Add(timer) ) + reporter->InternalError("out of memory"); + + cumulative_num++; + if ( Size() > peak_size ) + peak_size = Size(); ++current_timers[timer->Type()]; } @@ -140,45 +151,124 @@ void TimerMgr::Expire() { } int TimerMgr::DoAdvance(double new_t, int max_expire) { - Timer* timer = Top(); + auto res = Top(); + QueueIndex index = res.first; + Timer* timer = res.second; + for ( num_expired = 0; (num_expired < max_expire || dispatch_all_expired) && timer && timer->Time() <= new_t; ++num_expired ) { last_timestamp = timer->Time(); --current_timers[timer->Type()]; - // Remove it before dispatching, since the dispatch - // can otherwise delete it, and then we won't know - // whether we should delete it too. - (void)Remove(); + Remove(index); + + if ( timer->active ) { + DBG_LOG(zeek::DBG_TM, "Dispatching timer %s (%p)", timer_type_to_string(timer->Type()), timer); + timer->Dispatch(new_t, false); + } + else { + --num_expired; + } DBG_LOG(DBG_TM, "Dispatching timer %s (%p)", timer_type_to_string(timer->Type()), timer); timer->Dispatch(new_t, false); delete timer; - timer = Top(); + res = Top(); + index = res.first; + timer = res.second; } return num_expired; } void TimerMgr::Remove(Timer* timer) { + timer->active = false; + + std::deque::iterator it; + + if ( ! q_5s.empty() ) { + it = std::find(q_5s.begin(), q_5s.end(), timer); + if ( it != q_5s.end() ) { + q_5s.erase(it); + --current_timers[timer->Type()]; + delete timer; + return; + } + } + + if ( ! q_6s.empty() ) { + it = std::find(q_6s.begin(), q_6s.end(), timer); + if ( it != q_6s.end() ) { + q_6s.erase(it); + --current_timers[timer->Type()]; + delete timer; + return; + } + } + if ( ! q->Remove(timer) ) reporter->InternalError("asked to remove a missing timer"); --current_timers[timer->Type()]; + delete timer; } double TimerMgr::GetNextTimeout() { - Timer* top = Top(); + const auto& [index, top] = Top(); if ( top ) return std::max(0.0, top->Time() - run_state::network_time); return -1; } -Timer* TimerMgr::Remove() { return (Timer*)q->Remove(); } +Timer* TimerMgr::Remove(QueueIndex index) { + Timer* top = nullptr; + if ( index == QueueIndex::NONE ) { + auto res = Top(); + index = res.first; + top = res.second; + } -Timer* TimerMgr::Top() { return (Timer*)q->Top(); } + if ( index == QueueIndex::Q5 ) + q_5s.pop_front(); + else if ( index == QueueIndex::Q6 ) + q_6s.pop_front(); + else if ( index == QueueIndex::PQ ) + q->Remove(); + + return top; +} + + +std::pair TimerMgr::Top() { + Timer* top = nullptr; + QueueIndex index = QueueIndex::NONE; + + if ( ! q_5s.empty() ) { + top = q_5s.front(); + index = QueueIndex::Q5; + } + + if ( ! q_6s.empty() ) { + Timer* t = q_6s.front(); + if ( ! top || t->Time() < top->Time() ) { + top = t; + index = QueueIndex::Q6; + } + } + + if ( q->Size() > 0 ) { + Timer* t = static_cast(q->Top()); + + if ( ! top || t->Time() < top->Time() ) { + index = QueueIndex::PQ; + top = t; + } + } + + return {index, top}; +} } // namespace zeek::detail diff --git a/src/Timer.h b/src/Timer.h index 509a6bec18..7e1f30f80f 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -3,6 +3,7 @@ #pragma once #include +#include #include #include "zeek/PriorityQueue.h" @@ -78,6 +79,8 @@ public: void Describe(ODesc* d) const; + bool active = true; + protected: TimerType type{}; }; @@ -85,6 +88,7 @@ protected: class TimerMgr final : public iosource::IOSource { public: TimerMgr(); + ~TimerMgr(); void Add(Timer* timer); @@ -122,9 +126,9 @@ public: double Time() const { return t ? t : 1; } // 1 > 0 - size_t Size() const { return q->Size(); } - size_t PeakSize() const { return q->PeakSize(); } - size_t CumulativeNum() const { return q->CumulativeNum(); } + size_t Size() const { return q->Size() + q_5s.size() + q_6s.size(); } + size_t PeakSize() const { return peak_size; } + size_t CumulativeNum() const { return cumulative_num; } double LastTimestamp() const { return last_timestamp; } @@ -147,11 +151,13 @@ public: void InitPostScript(); private: + enum class QueueIndex { NONE, Q5, Q6, PQ }; + int DoAdvance(double t, int max_expire); void Remove(Timer* timer); - Timer* Remove(); - Timer* Top(); + Timer* Remove(QueueIndex index = QueueIndex::NONE); + std::pair Top(); double t; double last_timestamp; @@ -168,7 +174,12 @@ private: telemetry::GaugePtr lag_time_metric; telemetry::GaugePtr current_timer_metrics[NUM_TIMER_TYPES]; + size_t peak_size = 0; + size_t cumulative_num = 0; + std::unique_ptr q; + std::deque q_5s; + std::deque q_6s; }; extern TimerMgr* timer_mgr;