cluster/OnLoop: Support DontBlock and Force flags for queueing

Also allow max_queue_size to be 0 for unlimited queueing.
This commit is contained in:
Arne Welzel 2025-07-21 18:49:36 +02:00
parent b4d2af23dd
commit d79d4b1b2a

View file

@ -14,9 +14,22 @@
#include "zeek/iosource/IOSource.h" #include "zeek/iosource/IOSource.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/telemetry/Manager.h" #include "zeek/telemetry/Manager.h"
#include "zeek/util-types.h"
namespace zeek::detail { namespace zeek::detail {
/**
* Flags for OnLoopProcess::QueueForProcessing().
*/
enum class QueueFlag : uint8_t {
Block = 0x0, // block until there's room
DontBlock = 0x1, // fail queueing if there's no room
Force = 0x2, // ignore any queue size limitations
};
constexpr QueueFlag operator&(QueueFlag x, QueueFlag y) {
return static_cast<QueueFlag>(static_cast<uint8_t>(x) & static_cast<uint8_t>(y));
};
/** /**
* Template class allowing work items to be queued by threads and processed * Template class allowing work items to be queued by threads and processed
@ -46,19 +59,19 @@ public:
* @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning.
* @param main_thread_id The ID of the main thread for usage checks. * @param main_thread_id The ID of the main thread for usage checks.
*/ */
OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, OnLoopProcess(Proc* proc, std::string_view tag, size_t max_queue_size = 250,
std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000),
std::thread::id main_thread_id = std::this_thread::get_id()) std::thread::id main_thread_id = std::this_thread::get_id())
: cond_timeout(cond_timeout), : cond_timeout(cond_timeout),
max_queue_size(max_queue_size), max_queue_size(max_queue_size),
proc(proc), proc(proc),
tag(std::move(tag)), tag(tag),
main_thread_id(main_thread_id), main_thread_id(main_thread_id),
total_queue_stalls_metric( total_queue_blocks_metric(
zeek::telemetry_mgr zeek::telemetry_mgr
->CounterFamily( ->CounterFamily(
"zeek", "cluster_onloop_queue_stalls", {"tag"}, "zeek", "cluster_onloop_queue_blocks", {"tag"},
"Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") "Increased whenever a cluster backend thread is blocked due to the OnLoop queue being full.")
->GetOrAdd({{"tag", this->tag}})) {} ->GetOrAdd({{"tag", this->tag}})) {}
/** /**
@ -116,7 +129,7 @@ public:
bool notify = false; bool notify = false;
{ {
std::scoped_lock lock(mtx); std::scoped_lock lock(mtx);
if ( queue.size() >= max_queue_size ) if ( max_queue_size > 0 && queue.size() >= max_queue_size )
notify = true; notify = true;
to_process.splice(to_process.end(), queue); to_process.splice(to_process.end(), queue);
@ -151,37 +164,52 @@ public:
/** /**
* Queue the given Work item to be processed on Zeek's main thread. * Queue the given Work item to be processed on Zeek's main thread.
* *
* If there's too many items in the queue, this method blocks until * If there's too many items in the queue and flags does not contains Force or DontBlock,
* there's more room available. The zeek_cluster_onloop_queue_stalls_total * the method blocks until there's room available. This may result in deadlocks if
* Zeek's event loop is blocked as well. The zeek_cluster_onloop_queue_blocks_total
* metric will be increased once for every cond_timeout being blocked. * metric will be increased once for every cond_timeout being blocked.
* *
* If the Force flag is used, the queue's max size is ignored and queueing
* succeeds possibly increasing the queue size to more than max_queue_size.
* If DontBlock is used and the queue is full, queueing fails and false is
* returned.
*
* Calling this method from the main thread will result in an abort(). * Calling this method from the main thread will result in an abort().
*
* @param work The work to enqueue.
* @param flags Modifies the behavior.
*
* @return True if the message was queued, else false.
*/ */
void QueueForProcessing(Work&& work) { bool QueueForProcessing(Work&& work, QueueFlag flags = QueueFlag::Block) {
++queuers;
std::list<Work> to_queue{std::move(work)};
if ( std::this_thread::get_id() == main_thread_id ) { if ( std::this_thread::get_id() == main_thread_id ) {
fprintf(stderr, "OnLoopProcess::QueueForProcessing() called by main thread!"); fprintf(stderr, "OnLoopProcess::QueueForProcessing() called by main thread!");
abort(); abort();
} }
++queuers;
auto defer = util::Deferred([this] { --queuers; });
bool fire = false; bool fire = false;
size_t qs = 0;
{ {
std::unique_lock lock(mtx); std::unique_lock lock(mtx);
// Wait for room in the queue. // Wait for room in the queue.
while ( IsOpen() && queue.size() >= max_queue_size ) { while ( IsOpen() && max_queue_size > 0 && queue.size() >= max_queue_size ) {
total_queue_stalls_metric->Inc(); if ( (flags & QueueFlag::Force) == QueueFlag::Force ) // enqueue no matter the limit.
break;
if ( (flags & QueueFlag::DontBlock) == QueueFlag::DontBlock )
return false;
total_queue_blocks_metric->Inc();
cond.wait_for(lock, cond_timeout); cond.wait_for(lock, cond_timeout);
} }
if ( IsOpen() ) { if ( IsOpen() ) {
assert(queue.size() < max_queue_size); std::list<Work> to_queue{std::move(work)};
assert(to_queue.size() == 1);
queue.splice(queue.end(), to_queue); queue.splice(queue.end(), to_queue);
assert(to_queue.empty());
fire = queue.size() == 1; // first element in queue triggers processing. fire = queue.size() == 1; // first element in queue triggers processing.
} }
else { else {
@ -190,11 +218,10 @@ public:
} }
} }
if ( fire ) if ( fire )
flare.Fire(); flare.Fire();
--queuers; return true;
} }
private: private:
@ -215,7 +242,7 @@ private:
std::thread::id main_thread_id; std::thread::id main_thread_id;
// Track queue stalling. // Track queue stalling.
telemetry::CounterPtr total_queue_stalls_metric; telemetry::CounterPtr total_queue_blocks_metric;
}; };