cluster/OnLoop: Add metric for queue stalling instead of fprintf

This commit is contained in:
Arne Welzel 2025-03-24 18:35:44 +01:00
parent 50b26fcea8
commit 52143a5712

View file

@ -13,6 +13,7 @@
#include "zeek/Reporter.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/iosource/Manager.h"
#include "zeek/telemetry/Manager.h"
namespace zeek::detail {
@ -52,7 +53,13 @@ public:
max_queue_size(max_queue_size),
proc(proc),
tag(std::move(tag)),
main_thread_id(main_thread_id) {}
main_thread_id(main_thread_id),
total_queue_stalls_metric(
zeek::telemetry_mgr
->CounterFamily(
"zeek", "cluster_onloop_queue_stalls", {"tag"},
"Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.")
->GetOrAdd({{"tag", this->tag}})) {}
/**
* Register this instance with the IO loop.
@ -139,9 +146,9 @@ public:
/**
* Queue the given Work item to be processed on Zeek's main thread.
*
* If there's too many items in the queue, this method sleeps using
* std::this_thread::sleep() for the *block_duration* passed to the
* constructor.
* If there's too many items in the queue, this method blocks until
* there's more room available. The zeek_cluster_onloop_queue_stalls_total
* metric will be increased once for every cond_timeout being blocked.
*
* Calling this method from the main thread will result in an abort().
*/
@ -157,16 +164,13 @@ public:
bool fire = false;
size_t qs = 0;
int timeouts = 0;
{
std::unique_lock lock(mtx);
// Wait for room in the queue.
while ( IsOpen() && queue.size() >= max_queue_size ) {
auto status = cond.wait_for(lock, cond_timeout);
if ( status == std::cv_status::timeout && IsOpen() )
++timeouts;
total_queue_stalls_metric->Inc();
cond.wait_for(lock, cond_timeout);
}
if ( IsOpen() ) {
@ -185,15 +189,6 @@ public:
if ( fire )
flare.Fire();
if ( timeouts > 0 ) {
// XXX: Should this invoke some callback or change the return value
// so users can react on this?
//
// We could also do suicidal snail pattern here. If the event
// loop is unable to process, we may as well knock ourselves out.
std::fprintf(stderr, "timeouts %d!\n", timeouts);
}
--queuers;
}
@ -213,6 +208,9 @@ private:
std::string tag;
std::atomic<int> queuers = 0;
std::thread::id main_thread_id;
// Track queue stalling.
telemetry::CounterPtr total_queue_stalls_metric;
};