Remove CQ_TimerMgr in favor of PQ_TimerMgr

This commit is contained in:
Tim Wojtulewicz 2019-10-28 13:50:24 -07:00
parent 70b45d1aba
commit 2b2121be60
6 changed files with 16 additions and 125 deletions

View file

@ -27,7 +27,7 @@ PriorityQueue::~PriorityQueue()
PQ_Element* PriorityQueue::Remove() PQ_Element* PriorityQueue::Remove()
{ {
if ( heap_size == 0 ) if ( heap_size == 0 )
return 0; return nullptr;
PQ_Element* top = heap[0]; PQ_Element* top = heap[0];
@ -43,7 +43,7 @@ PQ_Element* PriorityQueue::Remove(PQ_Element* e)
{ {
if ( e->Offset() < 0 || e->Offset() >= heap_size || if ( e->Offset() < 0 || e->Offset() >= heap_size ||
heap[e->Offset()] != e ) heap[e->Offset()] != e )
return 0; // not in heap return nullptr; // not in heap
e->MinimizeTime(); e->MinimizeTime();
BubbleUp(e->Offset()); BubbleUp(e->Offset());
@ -56,7 +56,7 @@ PQ_Element* PriorityQueue::Remove(PQ_Element* e)
return e2; return e2;
} }
int PriorityQueue::Add(PQ_Element* e) bool PriorityQueue::Add(PQ_Element* e)
{ {
SetElement(heap_size, e); SetElement(heap_size, e);
@ -70,10 +70,10 @@ int PriorityQueue::Add(PQ_Element* e)
if ( heap_size >= max_heap_size ) if ( heap_size >= max_heap_size )
return Resize(max_heap_size * 2); return Resize(max_heap_size * 2);
else else
return 1; return true;
} }
int PriorityQueue::Resize(int new_size) bool PriorityQueue::Resize(int new_size)
{ {
PQ_Element** tmp = new PQ_Element*[new_size]; PQ_Element** tmp = new PQ_Element*[new_size];
for ( int i = 0; i < max_heap_size; ++i ) for ( int i = 0; i < max_heap_size; ++i )
@ -84,7 +84,7 @@ int PriorityQueue::Resize(int new_size)
max_heap_size = new_size; max_heap_size = new_size;
return heap != 0; return heap != nullptr;
} }
void PriorityQueue::BubbleUp(int bin) void PriorityQueue::BubbleUp(int bin)

View file

@ -43,20 +43,20 @@ public:
// is empty. // is empty.
PQ_Element* Remove(); PQ_Element* Remove();
// Removes element e. Returns e, or nil if e wasn't in the queue. // Removes element e. Returns e, or nullptr if e wasn't in the queue.
// Note that e will be modified via MinimizeTime(). // Note that e will be modified via MinimizeTime().
PQ_Element* Remove(PQ_Element* e); PQ_Element* Remove(PQ_Element* e);
// Add a new element to the queue. Returns 0 on failure (not enough // Add a new element to the queue. Returns false on failure (not enough
// memory to add the element), 1 on success. // memory to add the element), true on success.
int Add(PQ_Element* e); bool Add(PQ_Element* e);
int Size() const { return heap_size; } int Size() const { return heap_size; }
int PeakSize() const { return peak_heap_size; } int PeakSize() const { return peak_heap_size; }
uint64_t CumulativeNum() const { return cumulative_num; } uint64_t CumulativeNum() const { return cumulative_num; }
protected: protected:
int Resize(int new_size); bool Resize(int new_size);
void BubbleUp(int bin); void BubbleUp(int bin);
void BubbleDown(int bin); void BubbleDown(int bin);

View file

@ -1356,7 +1356,7 @@ TimerMgr* NetSessions::LookupTimerMgr(const TimerMgr::Tag* tag, bool create)
return 0; return 0;
// Create new queue for tag. // Create new queue for tag.
TimerMgr* mgr = new CQ_TimerMgr(*tag); TimerMgr* mgr = new PQ_TimerMgr(*tag);
DBG_LOG(DBG_TM, "tag %s, creating new non-global timer mgr %p", tag->c_str(), mgr); DBG_LOG(DBG_TM, "tag %s, creating new non-global timer mgr %p", tag->c_str(), mgr);
timer_mgrs.insert(TimerMgrMap::value_type(*tag, mgr)); timer_mgrs.insert(TimerMgrMap::value_type(*tag, mgr));
double t = timer_mgr->Time() + timer_mgr_inactivity_timeout; double t = timer_mgr->Time() + timer_mgr_inactivity_timeout;

View file

@ -175,11 +175,9 @@ void ProfileLogger::Log()
stats.nfa_states, stats.dfa_states, stats.computed, stats.mem / 1024)); stats.nfa_states, stats.dfa_states, stats.computed, stats.mem / 1024));
} }
file->Write(fmt("%.06f Timers: current=%d max=%d mem=%dK lag=%.2fs\n", file->Write(fmt("%.06f Timers: current=%d max=%d lag=%.2fs\n",
network_time, network_time,
timer_mgr->Size(), timer_mgr->PeakSize(), timer_mgr->Size(), timer_mgr->PeakSize(),
int(cq_memory_allocation() +
(timer_mgr->Size() * padded_sizeof(ConnectionTimer))) / 1024,
network_time - timer_mgr->LastTimestamp())); network_time - timer_mgr->LastTimestamp()));
DNS_Mgr::Stats dstats; DNS_Mgr::Stats dstats;

View file

@ -87,8 +87,8 @@ PQ_TimerMgr::~PQ_TimerMgr()
void PQ_TimerMgr::Add(Timer* timer) void PQ_TimerMgr::Add(Timer* timer)
{ {
DBG_LOG(DBG_TM, "Adding timer %s to TimeMgr %p", DBG_LOG(DBG_TM, "Adding timer %s to TimeMgr %p at %.6f",
timer_type_to_string(timer->Type()), this); timer_type_to_string(timer->Type()), this, timer->Time());
// Add the timer even if it's already expired - that way, if // Add the timer even if it's already expired - that way, if
// multiple already-added timers are added, they'll still // multiple already-added timers are added, they'll still
@ -145,86 +145,3 @@ void PQ_TimerMgr::Remove(Timer* timer)
--current_timers[timer->Type()]; --current_timers[timer->Type()];
delete timer; delete timer;
} }
CQ_TimerMgr::CQ_TimerMgr(const Tag& tag) : TimerMgr(tag)
{
cq = cq_init(60.0, 1.0);
if ( ! cq )
reporter->InternalError("could not initialize calendar queue");
}
CQ_TimerMgr::~CQ_TimerMgr()
{
cq_destroy(cq);
}
void CQ_TimerMgr::Add(Timer* timer)
{
DBG_LOG(DBG_TM, "Adding timer %s to TimeMgr %p",
timer_type_to_string(timer->Type()), this);
// Add the timer even if it's already expired - that way, if
// multiple already-added timers are added, they'll still
// execute in sorted order.
double t = timer->Time();
if ( t <= 0.0 )
// Illegal time, which cq_enqueue won't like. For our
// purposes, just treat it as an old time that's already
// expired.
t = network_time;
if ( cq_enqueue(cq, t, timer) < 0 )
reporter->InternalError("problem queueing timer");
++current_timers[timer->Type()];
}
void CQ_TimerMgr::Expire()
{
double huge_t = 1e20; // larger than any Unix timestamp
for ( Timer* timer = (Timer*) cq_dequeue(cq, huge_t);
timer; timer = (Timer*) cq_dequeue(cq, huge_t) )
{
DBG_LOG(DBG_TM, "Dispatching timer %s in TimeMgr %p",
timer_type_to_string(timer->Type()), this);
timer->Dispatch(huge_t, 1);
--current_timers[timer->Type()];
delete timer;
}
}
int CQ_TimerMgr::DoAdvance(double new_t, int max_expire)
{
Timer* timer;
while ( (num_expired < max_expire || max_expire == 0) &&
(timer = (Timer*) cq_dequeue(cq, new_t)) )
{
last_timestamp = timer->Time();
DBG_LOG(DBG_TM, "Dispatching timer %s in TimeMgr %p",
timer_type_to_string(timer->Type()), this);
timer->Dispatch(new_t, 0);
--current_timers[timer->Type()];
delete timer;
++num_expired;
}
return num_expired;
}
unsigned int CQ_TimerMgr::MemoryUsage() const
{
// FIXME.
return 0;
}
void CQ_TimerMgr::Remove(Timer* timer)
{
// This may fail if we cancel a timer which has already been removed.
// That's ok, but then we mustn't delete the timer.
if ( cq_remove(cq, timer->Time(), timer) )
{
--current_timers[timer->Type()];
delete timer;
}
}

View file

@ -7,10 +7,6 @@
#include <string> #include <string>
#include "PriorityQueue.h" #include "PriorityQueue.h"
extern "C" {
#include "cq.h"
}
// If you add a timer here, adjust TimerNames in Timer.cc. // If you add a timer here, adjust TimerNames in Timer.cc.
enum TimerType : uint8_t { enum TimerType : uint8_t {
TIMER_BACKDOOR, TIMER_BACKDOOR,
@ -152,24 +148,4 @@ protected:
PriorityQueue* q; PriorityQueue* q;
}; };
class CQ_TimerMgr : public TimerMgr {
public:
explicit CQ_TimerMgr(const Tag& arg_tag);
~CQ_TimerMgr() override;
void Add(Timer* timer) override;
void Expire() override;
int Size() const override { return cq_size(cq); }
int PeakSize() const override { return cq_max_size(cq); }
uint64_t CumulativeNum() const override { return cq_cumulative_num(cq); }
unsigned int MemoryUsage() const;
protected:
int DoAdvance(double t, int max_expire) override;
void Remove(Timer* timer) override;
struct cq_handle *cq;
};
extern TimerMgr* timer_mgr; extern TimerMgr* timer_mgr;