From d79d4b1b2a44bc6e5dffb574edd0383629a0921c Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 18:49:36 +0200 Subject: [PATCH] cluster/OnLoop: Support DontBlock and Force flags for queueing Also allow max_queue_size to be 0 for unlimited queueing. --- src/cluster/OnLoop.h | 67 +++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 86cd6a136e..1bb3122ce2 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -14,9 +14,22 @@ #include "zeek/iosource/IOSource.h" #include "zeek/iosource/Manager.h" #include "zeek/telemetry/Manager.h" +#include "zeek/util-types.h" namespace zeek::detail { +/** + * Flags for OnLoopProcess::QueueForProcessing(). + */ +enum class QueueFlag : uint8_t { + Block = 0x0, // block until there's room + DontBlock = 0x1, // fail queueing if there's no room + Force = 0x2, // ignore any queue size limitations +}; + +constexpr QueueFlag operator&(QueueFlag x, QueueFlag y) { + return static_cast(static_cast(x) & static_cast(y)); +}; /** * Template class allowing work items to be queued by threads and processed @@ -46,19 +59,19 @@ public: * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. * @param main_thread_id The ID of the main thread for usage checks. */ - OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, + OnLoopProcess(Proc* proc, std::string_view tag, size_t max_queue_size = 250, std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::thread::id main_thread_id = std::this_thread::get_id()) : cond_timeout(cond_timeout), max_queue_size(max_queue_size), proc(proc), - tag(std::move(tag)), + tag(tag), main_thread_id(main_thread_id), - total_queue_stalls_metric( + total_queue_blocks_metric( zeek::telemetry_mgr ->CounterFamily( - "zeek", "cluster_onloop_queue_stalls", {"tag"}, - "Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") + "zeek", "cluster_onloop_queue_blocks", {"tag"}, + "Increased whenever a cluster backend thread is blocked due to the OnLoop queue being full.") ->GetOrAdd({{"tag", this->tag}})) {} /** @@ -116,7 +129,7 @@ public: bool notify = false; { std::scoped_lock lock(mtx); - if ( queue.size() >= max_queue_size ) + if ( max_queue_size > 0 && queue.size() >= max_queue_size ) notify = true; to_process.splice(to_process.end(), queue); @@ -151,37 +164,52 @@ public: /** * Queue the given Work item to be processed on Zeek's main thread. * - * If there's too many items in the queue, this method blocks until - * there's more room available. The zeek_cluster_onloop_queue_stalls_total + * If there's too many items in the queue and flags does not contains Force or DontBlock, + * the method blocks until there's room available. This may result in deadlocks if + * Zeek's event loop is blocked as well. The zeek_cluster_onloop_queue_blocks_total * metric will be increased once for every cond_timeout being blocked. * + * If the Force flag is used, the queue's max size is ignored and queueing + * succeeds possibly increasing the queue size to more than max_queue_size. + * If DontBlock is used and the queue is full, queueing fails and false is + * returned. + * * Calling this method from the main thread will result in an abort(). + * + * @param work The work to enqueue. + * @param flags Modifies the behavior. + * + * @return True if the message was queued, else false. */ - void QueueForProcessing(Work&& work) { - ++queuers; - std::list to_queue{std::move(work)}; - + bool QueueForProcessing(Work&& work, QueueFlag flags = QueueFlag::Block) { if ( std::this_thread::get_id() == main_thread_id ) { fprintf(stderr, "OnLoopProcess::QueueForProcessing() called by main thread!"); abort(); } + ++queuers; + auto defer = util::Deferred([this] { --queuers; }); bool fire = false; - size_t qs = 0; { std::unique_lock lock(mtx); // Wait for room in the queue. - while ( IsOpen() && queue.size() >= max_queue_size ) { - total_queue_stalls_metric->Inc(); + while ( IsOpen() && max_queue_size > 0 && queue.size() >= max_queue_size ) { + if ( (flags & QueueFlag::Force) == QueueFlag::Force ) // enqueue no matter the limit. + break; + + if ( (flags & QueueFlag::DontBlock) == QueueFlag::DontBlock ) + return false; + + total_queue_blocks_metric->Inc(); cond.wait_for(lock, cond_timeout); } if ( IsOpen() ) { - assert(queue.size() < max_queue_size); - assert(to_queue.size() == 1); + std::list to_queue{std::move(work)}; queue.splice(queue.end(), to_queue); + assert(to_queue.empty()); fire = queue.size() == 1; // first element in queue triggers processing. } else { @@ -190,11 +218,10 @@ public: } } - if ( fire ) flare.Fire(); - --queuers; + return true; } private: @@ -215,7 +242,7 @@ private: std::thread::id main_thread_id; // Track queue stalling. - telemetry::CounterPtr total_queue_stalls_metric; + telemetry::CounterPtr total_queue_blocks_metric; };