diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index d0aae42d45..0c488d24da 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -13,6 +13,7 @@ #include "zeek/Reporter.h" #include "zeek/iosource/IOSource.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" namespace zeek::detail { @@ -52,7 +53,13 @@ public: max_queue_size(max_queue_size), proc(proc), tag(std::move(tag)), - main_thread_id(main_thread_id) {} + main_thread_id(main_thread_id), + total_queue_stalls_metric( + zeek::telemetry_mgr + ->CounterFamily( + "zeek", "cluster_onloop_queue_stalls", {"tag"}, + "Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") + ->GetOrAdd({{"tag", this->tag}})) {} /** * Register this instance with the IO loop. @@ -139,9 +146,9 @@ public: /** * Queue the given Work item to be processed on Zeek's main thread. * - * If there's too many items in the queue, this method sleeps using - * std::this_thread::sleep() for the *block_duration* passed to the - * constructor. + * If there's too many items in the queue, this method blocks until + * there's more room available. The zeek_cluster_onloop_queue_stalls_total + * metric will be increased once for every cond_timeout being blocked. * * Calling this method from the main thread will result in an abort(). */ @@ -157,16 +164,13 @@ public: bool fire = false; size_t qs = 0; - int timeouts = 0; - { std::unique_lock lock(mtx); // Wait for room in the queue. while ( IsOpen() && queue.size() >= max_queue_size ) { - auto status = cond.wait_for(lock, cond_timeout); - if ( status == std::cv_status::timeout && IsOpen() ) - ++timeouts; + total_queue_stalls_metric->Inc(); + cond.wait_for(lock, cond_timeout); } if ( IsOpen() ) { @@ -185,15 +189,6 @@ public: if ( fire ) flare.Fire(); - if ( timeouts > 0 ) { - // XXX: Should this invoke some callback or change the return value - // so users can react on this? - // - // We could also do suicidal snail pattern here. If the event - // loop is unable to process, we may as well knock ourselves out. - std::fprintf(stderr, "timeouts %d!\n", timeouts); - } - --queuers; } @@ -213,6 +208,9 @@ private: std::string tag; std::atomic queuers = 0; std::thread::id main_thread_id; + + // Track queue stalling. + telemetry::CounterPtr total_queue_stalls_metric; };