From b4d2af23dd586f932d393a4d7c84e6291d723503 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 18:15:06 +0200 Subject: [PATCH 1/9] cluster/ThreadedBackend: Injectable OnLoopProcess instance This allows injecting a custom onloop process to configure the max_queue_size at instantiation time. Also allow access to the instance directly and deprecate the QueueForProcessing() helper --- src/cluster/Backend.cc | 14 ++++++++++---- src/cluster/Backend.h | 16 +++++++++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index ffa4872e60..9b8467bde6 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -264,14 +264,20 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, byte_buffer_span payload) { } ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr es, - std::unique_ptr ls, std::unique_ptr ehs) - : Backend(name, std::move(es), std::move(ls), std::move(ehs)) { - onloop = new zeek::detail::OnLoopProcess(this, Name()); + std::unique_ptr ls, std::unique_ptr ehs, + zeek::detail::OnLoopProcess* onloop) + : Backend(name, std::move(es), std::move(ls), std::move(ehs)), onloop(onloop) { onloop->Register(true); // Register as don't count first } + +ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr es, + std::unique_ptr ls, std::unique_ptr ehs) + : ThreadedBackend(name, std::move(es), std::move(ls), std::move(ehs), + new zeek::detail::OnLoopProcess(this, name)) {} + bool ThreadedBackend::DoInit() { - // Have the backend count so Zeek does not terminate. + // Have the onloop instance count so Zeek does not terminate. onloop->Register(/*dont_count=*/false); return true; } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index f401c02b9d..6ae6b9a78a 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -16,6 +16,7 @@ #include "zeek/Val.h" #include "zeek/ZeekArgs.h" #include "zeek/cluster/BifSupport.h" +#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" #include "zeek/cluster/Telemetry.h" #include "zeek/logging/Types.h" @@ -572,7 +573,7 @@ private: /** * A cluster backend may receive event and log messages asynchronously - * through threads. The following structs can be used with QueueForProcessing() + * through threads. The following structs can be used with OnLoop() * to enqueue these messages onto the main IO loop for processing. * * EventMessage and LogMessage are processed in a generic fashion in @@ -623,6 +624,13 @@ using QueueMessage = std::variant; */ class ThreadedBackend : public Backend { protected: + /** + * Constructor with custom onloop parameter. + */ + ThreadedBackend(std::string_view name, std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs, + zeek::detail::OnLoopProcess* onloop); + /** * Constructor. */ @@ -637,6 +645,7 @@ protected: * * @param messages Messages to be enqueued. */ + [[deprecated("Remove in v8.1: Use OnLoop() and QueueForProcessing() directly.")]] void QueueForProcessing(QueueMessage&& messages); /** @@ -664,6 +673,11 @@ protected: */ void DoTerminate() override; + /** + * @return this backend's OnLoopProcess instance, or nullptr after DoTerminate(). + */ + zeek::detail::OnLoopProcess* OnLoop() { return onloop; } + private: /** * Process a backend specific message queued as BackendMessage. From d79d4b1b2a44bc6e5dffb574edd0383629a0921c Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 18:49:36 +0200 Subject: [PATCH 2/9] cluster/OnLoop: Support DontBlock and Force flags for queueing Also allow max_queue_size to be 0 for unlimited queueing. --- src/cluster/OnLoop.h | 67 +++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 86cd6a136e..1bb3122ce2 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -14,9 +14,22 @@ #include "zeek/iosource/IOSource.h" #include "zeek/iosource/Manager.h" #include "zeek/telemetry/Manager.h" +#include "zeek/util-types.h" namespace zeek::detail { +/** + * Flags for OnLoopProcess::QueueForProcessing(). + */ +enum class QueueFlag : uint8_t { + Block = 0x0, // block until there's room + DontBlock = 0x1, // fail queueing if there's no room + Force = 0x2, // ignore any queue size limitations +}; + +constexpr QueueFlag operator&(QueueFlag x, QueueFlag y) { + return static_cast(static_cast(x) & static_cast(y)); +}; /** * Template class allowing work items to be queued by threads and processed @@ -46,19 +59,19 @@ public: * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. * @param main_thread_id The ID of the main thread for usage checks. */ - OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, + OnLoopProcess(Proc* proc, std::string_view tag, size_t max_queue_size = 250, std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::thread::id main_thread_id = std::this_thread::get_id()) : cond_timeout(cond_timeout), max_queue_size(max_queue_size), proc(proc), - tag(std::move(tag)), + tag(tag), main_thread_id(main_thread_id), - total_queue_stalls_metric( + total_queue_blocks_metric( zeek::telemetry_mgr ->CounterFamily( - "zeek", "cluster_onloop_queue_stalls", {"tag"}, - "Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") + "zeek", "cluster_onloop_queue_blocks", {"tag"}, + "Increased whenever a cluster backend thread is blocked due to the OnLoop queue being full.") ->GetOrAdd({{"tag", this->tag}})) {} /** @@ -116,7 +129,7 @@ public: bool notify = false; { std::scoped_lock lock(mtx); - if ( queue.size() >= max_queue_size ) + if ( max_queue_size > 0 && queue.size() >= max_queue_size ) notify = true; to_process.splice(to_process.end(), queue); @@ -151,37 +164,52 @@ public: /** * Queue the given Work item to be processed on Zeek's main thread. * - * If there's too many items in the queue, this method blocks until - * there's more room available. The zeek_cluster_onloop_queue_stalls_total + * If there's too many items in the queue and flags does not contains Force or DontBlock, + * the method blocks until there's room available. This may result in deadlocks if + * Zeek's event loop is blocked as well. The zeek_cluster_onloop_queue_blocks_total * metric will be increased once for every cond_timeout being blocked. * + * If the Force flag is used, the queue's max size is ignored and queueing + * succeeds possibly increasing the queue size to more than max_queue_size. + * If DontBlock is used and the queue is full, queueing fails and false is + * returned. + * * Calling this method from the main thread will result in an abort(). + * + * @param work The work to enqueue. + * @param flags Modifies the behavior. + * + * @return True if the message was queued, else false. */ - void QueueForProcessing(Work&& work) { - ++queuers; - std::list to_queue{std::move(work)}; - + bool QueueForProcessing(Work&& work, QueueFlag flags = QueueFlag::Block) { if ( std::this_thread::get_id() == main_thread_id ) { fprintf(stderr, "OnLoopProcess::QueueForProcessing() called by main thread!"); abort(); } + ++queuers; + auto defer = util::Deferred([this] { --queuers; }); bool fire = false; - size_t qs = 0; { std::unique_lock lock(mtx); // Wait for room in the queue. - while ( IsOpen() && queue.size() >= max_queue_size ) { - total_queue_stalls_metric->Inc(); + while ( IsOpen() && max_queue_size > 0 && queue.size() >= max_queue_size ) { + if ( (flags & QueueFlag::Force) == QueueFlag::Force ) // enqueue no matter the limit. + break; + + if ( (flags & QueueFlag::DontBlock) == QueueFlag::DontBlock ) + return false; + + total_queue_blocks_metric->Inc(); cond.wait_for(lock, cond_timeout); } if ( IsOpen() ) { - assert(queue.size() < max_queue_size); - assert(to_queue.size() == 1); + std::list to_queue{std::move(work)}; queue.splice(queue.end(), to_queue); + assert(to_queue.empty()); fire = queue.size() == 1; // first element in queue triggers processing. } else { @@ -190,11 +218,10 @@ public: } } - if ( fire ) flare.Fire(); - --queuers; + return true; } private: @@ -215,7 +242,7 @@ private: std::thread::id main_thread_id; // Track queue stalling. - telemetry::CounterPtr total_queue_stalls_metric; + telemetry::CounterPtr total_queue_blocks_metric; }; From 5dc4586b70ac848e4661461acce7a53cef76a218 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 30 Jun 2025 16:38:53 +0200 Subject: [PATCH 3/9] cluster/zeromq: Support local XPUB/XSUB hwm and buf configurability --- .../cluster/backend/zeromq/main.zeek | 33 +++++++++++++++++++ src/cluster/backend/zeromq/ZeroMQ.cc | 11 +++++++ src/cluster/backend/zeromq/ZeroMQ.h | 6 ++++ 3 files changed, 50 insertions(+) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index dd1e28fe86..e450a96e50 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -97,6 +97,39 @@ export { ## for more details. const linger_ms: int = 500 &redef; + ## Send high water mark value for the XPUB socket. + ## + ## If reached, Zeek nodes will block or drop messages. + ## + ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ + ## for more details. + const xpub_sndhwm: int = 1000 &redef; + + ## Kernel transmit buffer size for the XPUB socket. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_SNDBUF documentation `_ + ## for more details. + const xpub_sndbuf: int = -1 &redef; + + ## Receive high water mark value for the XSUB socket. + ## + ## If reached, the Zeek node will start reporting back pressure + ## to the central XPUB socket. + ## + ## See ZeroMQ's `ZMQ_RCVHWM documentation `_ + ## for more details. + const xsub_rcvhwm: int = 1000 &redef; + + ## Kernel receive buffer size for the XSUB socket. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_RCVBUF documentation `_ + ## for more details. + const xsub_rcvbuf: int = -1 &redef; + ## Configure ZeroMQ's immediate setting on PUSH sockets ## ## Setting this to ``T`` will queue log writes only to completed diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 234194d885..7f42027477 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -118,6 +118,12 @@ void ZeroMQBackend::DoInitPostScript() { zeek::telemetry_mgr ->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {}, "Counter for how many times sending on the XPUB socket stalled due to EAGAIN."); + + // xpub/xsub hwm configuration + xpub_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt()); + xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); + xsub_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt()); + xsub_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt()); } void ZeroMQBackend::DoTerminate() { @@ -179,6 +185,11 @@ bool ZeroMQBackend::DoInit() { xpub.set(zmq::sockopt::xpub_nodrop, connect_xpub_nodrop); xpub.set(zmq::sockopt::xpub_verbose, 1); + xpub.set(zmq::sockopt::sndhwm, xpub_sndhwm); + xpub.set(zmq::sockopt::sndbuf, xpub_sndbuf); + xsub.set(zmq::sockopt::rcvhwm, xsub_rcvhwm); + xsub.set(zmq::sockopt::rcvbuf, xsub_rcvbuf); + try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 58d6c7ff08..4d94b51142 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -92,6 +92,12 @@ private: EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription; + // xpub/xsub configuration + int xpub_sndhwm = 1000; // libzmq default + int xpub_sndbuf = -1; // OS defaults + int xsub_rcvhwm = 1000; // libzmq default + int xsub_rcvbuf = -1; // OS defaults + zmq::context_t ctx; zmq::socket_t xsub; zmq::socket_t xpub; From 85d5dda028bba10332b96eec3f1d5fce63ce9801 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 14:45:18 +0200 Subject: [PATCH 4/9] cluster/zeromq: Rework lambdas to member functions --- src/cluster/backend/zeromq/ZeroMQ.cc | 258 +++++++++++++-------------- src/cluster/backend/zeromq/ZeroMQ.h | 7 + 2 files changed, 134 insertions(+), 131 deletions(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 7f42027477..7af2fb41ac 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -444,155 +444,151 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he return true; } -void ZeroMQBackend::Run() { - char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> - snprintf(name, sizeof(name), "zmq-%p", this); - util::detail::set_thread_name(name); - ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); - - using MultipartMessage = std::vector; - - auto HandleLogMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - // sender, format, type, payload - if ( msg.size() != 4 ) { - ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); - continue; - } - - byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; - LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), - .payload = std::move(payload)}; - - QueueForProcessing(std::move(lm)); - } - }; - - auto HandleInprocMessages = [this](std::vector& msgs) { - // Forward messages from the inprocess bridge. - // - // Either it's 2 parts (tag and payload) for controlling subscriptions - // or terminating the thread, or it is 4 parts in which case all the parts - // are forwarded to the XPUB socket directly for publishing. - for ( auto& msg : msgs ) { - if ( msg.size() == 2 ) { - InprocTag tag = msg[0].data()[0]; - switch ( tag ) { - case InprocTag::XsubUpdate: { - xsub.send(msg[1], zmq::send_flags::none); - break; - } - case InprocTag::Terminate: { - if ( self_thread_stop ) - ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); - self_thread_stop = true; - } +// Forward messages from the inprocess bridge. +// +// Either it's 2 parts (tag and payload) for controlling subscriptions +// or terminating the thread, or it is 4 parts in which case all the parts +// are forwarded to the XPUB socket directly for publishing. +void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { + for ( auto& msg : msgs ) { + if ( msg.size() == 2 ) { + InprocTag tag = msg[0].data()[0]; + switch ( tag ) { + case InprocTag::XsubUpdate: { + xsub.send(msg[1], zmq::send_flags::none); + break; + } + case InprocTag::Terminate: { + if ( self_thread_stop ) + ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); + self_thread_stop = true; } } - else if ( msg.size() == 4 ) { - for ( auto& part : msg ) { - zmq::send_flags flags = zmq::send_flags::dontwait; - if ( part.more() ) - flags = flags | zmq::send_flags::sndmore; + } + else if ( msg.size() == 4 ) { + for ( auto& part : msg ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( part.more() ) + flags = flags | zmq::send_flags::sndmore; + + zmq::send_result_t result; + int tries = 0; + do { + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; + + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The socket reached + // its high-water-mark and we cannot send right now. We simply attempt + // to re-send the message without the dontwait flag after increasing + // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until + // there's enough room available. + if ( ! result ) { + total_xpub_stalls->Inc(); - zmq::send_result_t result; - int tries = 0; - do { try { - result = xpub.send(part, flags); + // We sent non-blocking above so we are able to observe and report stalls + // in a metric. Now that we have done that switch to blocking send. + zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore); + result = xpub.send(part, block_flags); } catch ( zmq::error_t& err ) { if ( err.num() == ETERM ) return; // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), + err.num()); break; } + } + } while ( ! result ); + } + } + else { + ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); + } + } +} - // Empty result means xpub.send() returned EAGAIN. The socket reached - // its high-water-mark and we cannot send right now. We simply attempt - // to re-send the message without the dontwait flag after increasing - // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until - // there's enough room available. - if ( ! result ) { - total_xpub_stalls->Inc(); +void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + // sender, format, type, payload + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); + continue; + } - try { - // We sent non-blocking above so we are able to observe and report stalls - // in a metric. Now that we have done that switch to blocking send. - zmq::send_flags block_flags = - zmq::send_flags::none | (flags & zmq::send_flags::sndmore); - result = xpub.send(part, block_flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), - err.num()); - break; - } - } - } while ( ! result ); - } + QueueForProcessing(std::move(lm)); + } +} + +void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + if ( msg.size() != 1 ) { + ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + continue; + } + + // Check if the messages starts with \x00 or \x01 to understand if it's + // a subscription or unsubscription message. + auto first = *reinterpret_cast(msg[0].data()); + if ( first == 0 || first == 1 ) { + QueueMessage qm; + auto* start = msg[0].data() + 1; + auto* end = msg[0].data() + msg[0].size(); + byte_buffer topic(start, end); + if ( first == 1 ) { + qm = BackendMessage{1, std::move(topic)}; + } + else if ( first == 0 ) { + qm = BackendMessage{0, std::move(topic)}; } else { - ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); - } - } - }; - - auto HandleXPubMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - if ( msg.size() != 1 ) { - ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); continue; } - // Check if the messages starts with \x00 or \x01 to understand if it's - // a subscription or unsubscription message. - auto first = *reinterpret_cast(msg[0].data()); - if ( first == 0 || first == 1 ) { - QueueMessage qm; - auto* start = msg[0].data() + 1; - auto* end = msg[0].data() + msg[0].size(); - byte_buffer topic(start, end); - if ( first == 1 ) { - qm = BackendMessage{1, std::move(topic)}; - } - else if ( first == 0 ) { - qm = BackendMessage{0, std::move(topic)}; - } - else { - ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); - continue; - } - - QueueForProcessing(std::move(qm)); - } + QueueForProcessing(std::move(qm)); } - }; + } +} - auto HandleXSubMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - if ( msg.size() != 4 ) { - ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); - continue; - } - - // Filter out messages that are coming from this node. - std::string sender(msg[1].data(), msg[1].size()); - if ( sender == NodeId() ) - continue; - - byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; - EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), - .format = std::string(msg[2].data(), msg[2].size()), - .payload = std::move(payload)}; - - QueueForProcessing(std::move(em)); +void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); + continue; } - }; + + // Filter out messages that are coming from this node. + std::string sender(msg[1].data(), msg[1].size()); + if ( sender == NodeId() ) + continue; + + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), + .format = std::string(msg[2].data(), msg[2].size()), + .payload = std::move(payload)}; + + QueueForProcessing(std::move(em)); + } +} + +void ZeroMQBackend::Run() { + char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> + snprintf(name, sizeof(name), "zmq-%p", this); + util::detail::set_thread_name(name); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); struct SocketInfo { zmq::socket_ref socket; @@ -601,10 +597,10 @@ void ZeroMQBackend::Run() { }; std::vector sockets = { - {.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages}, - {.socket = xpub, .name = "xpub", .handler = HandleXPubMessages}, - {.socket = xsub, .name = "xsub", .handler = HandleXSubMessages}, - {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, + {.socket = child_inproc, .name = "inproc", .handler = [this](auto& msgs) { HandleInprocMessages(msgs); }}, + {.socket = xpub, .name = "xpub", .handler = [this](const auto& msgs) { HandleXPubMessages(msgs); }}, + {.socket = xsub, .name = "xsub", .handler = [this](const auto& msgs) { HandleXSubMessages(msgs); }}, + {.socket = log_pull, .name = "log_pull", .handler = [this](const auto& msgs) { HandleLogMessages(msgs); }}, }; // Called when Run() terminates. diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 4d94b51142..2932f0161a 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -73,6 +73,13 @@ private: void DoReadyToPublishCallback(ReadyCallback cb) override; + // Inner thread helper methods. + using MultipartMessage = std::vector; + void HandleInprocMessages(std::vector& msgs); + void HandleLogMessages(const std::vector& msgs); + void HandleXPubMessages(const std::vector& msgs); + void HandleXSubMessages(const std::vector& msgs); + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint; From 5de9296c77ad9d58ecefdeee49c34ae3aacb7083 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 14:14:11 +0200 Subject: [PATCH 5/9] cluster/zeromq: Comments and move lookups to InitPostScript() --- src/cluster/backend/zeromq/ZeroMQ.cc | 39 ++++++++++++++-------------- src/cluster/backend/zeromq/ZeroMQ.h | 7 +++++ 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 7af2fb41ac..acbc430018 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -124,6 +124,14 @@ void ZeroMQBackend::DoInitPostScript() { xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); xsub_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt()); xsub_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt()); + + // log push/pull socket configuration + log_immediate = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); + log_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); + log_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); + log_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); + log_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); } void ZeroMQBackend::DoTerminate() { @@ -206,22 +214,6 @@ bool ZeroMQBackend::DoInit() { return false; } - - auto log_immediate = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); - - auto log_sndhwm = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); - - auto log_sndbuf = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); - - auto log_rcvhwm = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); - - auto log_rcvbuf = - static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); - ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf, log_rcvhwm, log_rcvbuf, linger_ms); @@ -244,6 +236,11 @@ bool ZeroMQBackend::DoInit() { } } + // The connect_log_endpoints variable may be modified by zeek_init(), so + // need to look it up here rather than during DoInitPostScript(). + // + // We should've probably introduced a configuration record similar to the + // storage framework, too. Hmm. Maybe in the future. const auto& log_endpoints = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_log_endpoints"); for ( unsigned int i = 0; i < log_endpoints->Size(); i++ ) connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString()); @@ -313,9 +310,11 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& if ( i < parts.size() - 1 ) flags = flags | zmq::send_flags::sndmore; - // This should never fail, it will instead block - // when HWM is reached. I guess we need to see if - // and how this can happen :-/ + // This never returns EAGAIN. A pair socket blocks whenever the hwm + // is reached, regardless of passing any dontwait flag. + // + // This can result in blocking on Cluster::publish() if the inner + // thread does not consume from child_inproc. try { main_inproc.send(parts[i], flags); } catch ( const zmq::error_t& err ) { @@ -512,7 +511,7 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { } } else { - ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); + ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size()); } } } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 2932f0161a..cb41a22d6a 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -105,6 +105,13 @@ private: int xsub_rcvhwm = 1000; // libzmq default int xsub_rcvbuf = -1; // OS defaults + // log socket configuration + int log_immediate = false; // libzmq default + int log_sndhwm = 1000; // libzmq default + int log_sndbuf = -1; // OS defaults + int log_rcvhwm = 1000; // libzmq defaults + int log_rcvbuf = -1; // OS defaults + zmq::context_t ctx; zmq::socket_t xsub; zmq::socket_t xpub; From 073de9f5fdc2f1bce6e25c7aebbc9d2501e96e03 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 30 Jun 2025 19:07:19 +0200 Subject: [PATCH 6/9] cluster/zeromq: Drop events when overloaded When either the XPUB socket's hwm is reached, or the onloop queue is full, drop the events. Users can set ths xpub_sndhwm and onloop_queue_hwm to 0 to avoid these drops at the risk of unbounded memory growth. --- .../cluster/backend/zeromq/main.zeek | 61 +++++++++- src/cluster/backend/zeromq/ZeroMQ.cc | 113 +++++++++++------- src/cluster/backend/zeromq/ZeroMQ.h | 13 +- 3 files changed, 134 insertions(+), 53 deletions(-) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index e450a96e50..947ee17533 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -1,5 +1,7 @@ ##! ZeroMQ cluster backend support. ##! +##! Overview +##! ##! For publish-subscribe functionality, one node in the Zeek cluster spawns a ##! thread running a central broker listening on a XPUB and XSUB socket. ##! These sockets are connected via `zmq_proxy() `_. @@ -22,7 +24,43 @@ ##! possible to run non-Zeek logger nodes. All a logger node needs to do is ##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes ##! to send their log writes. - +##! +##! Overload Behavior +##! +##! The ZeroMQ cluster backend by default drops outgoing and incoming events +##! when the Zeek cluster is overloaded. Dropping of outgoing events is governed +##! by the :zeek:see:`Cluster::Backend::ZeroMQ::xpub_sndhwm` setting. This +##! is the High Water Mark (HWM) for the local XPUB socket's queue. Once reached, +##! any outgoing events are dropped until there's room in the socket's queue again. +##! The metric ``zeek_cluster_zeromq_xpub_drops_total`` is incremented for every +##! dropped event. +##! +##! For incoming events, the :zeek:see:`Cluster::Backend::ZeroMQ::onloop_queue_hwm` +##! setting is used. Remote events received via the local XSUB socket are first +##! enqueued as raw event messages for processing on Zeek's main event loop. +##! When this queue is full due to more remote events incoming than Zeek +##! can possibly process in an event loop iteration, incoming events are dropped +##! and the ``zeek_cluster_zeromq_onloop_drops_total`` metric is incremented. +##! +##! Incoming log batches or subscription and unsubscription events are passed +##! through the onloop queue, but the HWM does currently not apply to them. The +##! assumption is that 1) these are not frequent and 2) more important than +##! arbitrary publish-subscribe events. +##! +##! To avoid dropping any events (e.g. for performance testing or offline PCAP +##! processing), the recommended strategy is to set both +##! :zeek:see:`Cluster::Backend::ZeroMQ::xpub_sndhwm` and +##! :zeek:see:`Cluster::Backend::ZeroMQ::onloop_queue_hwm` to ``0``, +##! disabling the HWM and dropping logic. It is up to the user to monitor CPU +##! and memory usage of individual nodes to avoid overloading and running into +##! out-of-memory situations. +##! +##! As a Zeek operator, you should monitor ``zeek_cluster_zeromq_xpub_drops_total`` +##! and ``zeek_cluster_zeromq_onloop_drops_total``. Any non-zero values for these +##! metrics indicate an overloaded Zeek cluster. See the the cluster telemetry +##! options :zeek:see:`Cluster::Telemetry::core_metrics` and +##! :zeek:see:`Cluster::Telemetry::websocket_metrics` for ways to get a better +##! understanding about the events published and received. @load base/utils/addrs module Cluster::Backend::ZeroMQ; @@ -99,7 +137,8 @@ export { ## Send high water mark value for the XPUB socket. ## - ## If reached, Zeek nodes will block or drop messages. + ## Events published when the XPUB queue is full will be dropped and the + ## ``zeek_cluster_zeromq_xpub_drops_total`` metric incremented. ## ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ ## for more details. @@ -130,6 +169,19 @@ export { ## for more details. const xsub_rcvbuf: int = -1 &redef; + ## Maximum number of incoming events queued for Zeek's event loop. + ## + ## This constant defines the maximum number of remote events queued + ## by the ZeroMQ cluster backend for Zeek's event loop to drain in + ## one go. If you set this value to 0 (unlimited), consider closely + ## CPU and memory usage of cluster nodes as high remote event rates + ## may starve packet processing. + ## + ## If more events are received than can fit the queue, new events will be + ## dropped and the ``zeek_cluster_zeromq_onloop_drops_total`` metric + ## incremented. + const onloop_queue_hwm = 10000 &redef; + ## Configure ZeroMQ's immediate setting on PUSH sockets ## ## Setting this to ``T`` will queue log writes only to completed @@ -188,7 +240,10 @@ export { ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket ## connecting to the proxy to detect when sending a message fails - ## due to reaching the high-water-mark. + ## due to reaching the high-water-mark. If you set this to **F**, + ## then the XPUB drops metric will stop working as sending on the + ## XPUB socket will always succeed. Unless you're developing on the + ## ZeroMQ cluster backend, keep this set to **T**. ## ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ ## for more details. diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index acbc430018..45763fab54 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -22,10 +22,12 @@ #include "zeek/Reporter.h" #include "zeek/Val.h" #include "zeek/cluster/Backend.h" +#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/telemetry/Manager.h" +#include "zeek/util-types.h" #include "zeek/util.h" extern int signal_val; @@ -71,10 +73,25 @@ constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast ZeroMQBackend::Instantiate(std::unique_ptr es, + std::unique_ptr ls, + std::unique_ptr ehs) { + auto onloop_queue_hwm = zeek::id::find_val("Cluster::Backend::ZeroMQ::onloop_queue_hwm")->AsCount(); + return std::make_unique(std::move(es), std::move(ls), std::move(ehs), onloop_queue_hwm); +} + ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, - std::unique_ptr ehs) - : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)), - main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)) {} + std::unique_ptr ehs, zeek_uint_t onloop_queue_hwm) + : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs), + new zeek::detail::OnLoopProcess(this, "ZeroMQ", onloop_queue_hwm)), + main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)), + // Counters for block and drop metrics. + total_xpub_drops( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {}, + "Number of published events dropped due to XPUB socket HWM.")), + total_onloop_drops( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {}, + "Number of received events dropped due to OnLoop queue full.")) {} ZeroMQBackend::~ZeroMQBackend() { try { @@ -114,11 +131,6 @@ void ZeroMQBackend::DoInitPostScript() { event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); - total_xpub_stalls = - zeek::telemetry_mgr - ->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {}, - "Counter for how many times sending on the XPUB socket stalled due to EAGAIN."); - // xpub/xsub hwm configuration xpub_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt()); xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); @@ -471,43 +483,32 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { flags = flags | zmq::send_flags::sndmore; zmq::send_result_t result; - int tries = 0; - do { - try { - result = xpub.send(part, flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); - break; + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The socket reached + // its high-water-mark and we drop this message. + if ( ! result ) { + total_xpub_drops->Inc(); + + // Warn once about a dropped message. + // + // TODO: warn every n seconds? + if ( xpub_drop_last_warn_at == 0.0 ) { + ZEROMQ_THREAD_PRINTF("xpub: warn: dropped a message due to hwm\n"); + xpub_drop_last_warn_at = util::current_time(true); } - // Empty result means xpub.send() returned EAGAIN. The socket reached - // its high-water-mark and we cannot send right now. We simply attempt - // to re-send the message without the dontwait flag after increasing - // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until - // there's enough room available. - if ( ! result ) { - total_xpub_stalls->Inc(); - - try { - // We sent non-blocking above so we are able to observe and report stalls - // in a metric. Now that we have done that switch to blocking send. - zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore); - result = xpub.send(part, block_flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; - - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), - err.num()); - break; - } - } - } while ( ! result ); + break; // Skip the whole message. + } } } else { @@ -527,7 +528,11 @@ void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - QueueForProcessing(std::move(lm)); + // Always enqueue log messages for processing, they are important. + // + // Hmm, we could also consider bypassing Zeek's event loop completely and + // just go to the log_mgr directly in the future. + OnLoop()->QueueForProcessing(std::move(lm), zeek::detail::QueueFlag::Force); } } @@ -557,7 +562,12 @@ void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs continue; } - QueueForProcessing(std::move(qm)); + // Always enqueue subscription messages from other nodes as events. + // + // There shouldn't be all that many, unless some script calls Cluster::subscribe() and + // Cluster::unsubscribe() a lot, so assume we can afford the extra memory rather than + // missing these low-frequency events. + OnLoop()->QueueForProcessing(std::move(qm), zeek::detail::QueueFlag::Force); } } } @@ -579,7 +589,20 @@ void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs .format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - QueueForProcessing(std::move(em)); + + // If queueing the event message for Zeek's main loop doesn't work due to reaching the onloop hwm, + // drop the message. + // + // This is sort of a suicidal snail pattern but without exiting the node. + if ( ! OnLoop()->QueueForProcessing(std::move(em), zeek::detail::QueueFlag::DontBlock) ) { + total_onloop_drops->Inc(); + + // Warn once about a dropped message. + if ( onloop_drop_last_warn_at == 0.0 ) { + ZEROMQ_THREAD_PRINTF("warn: dropped a message due to onloop queue full\n"); + onloop_drop_last_warn_at = util::current_time(true); + } + } } } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index cb41a22d6a..11e905d2d7 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -26,7 +26,7 @@ public: * Constructor. */ ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, - std::unique_ptr ehs); + std::unique_ptr ehs, zeek_uint_t onloop_max_queue_size); /** * Destructor. @@ -49,9 +49,7 @@ public: */ static std::unique_ptr Instantiate(std::unique_ptr event_serializer, std::unique_ptr log_serializer, - std::unique_ptr ehs) { - return std::make_unique(std::move(event_serializer), std::move(log_serializer), std::move(ehs)); - } + std::unique_ptr ehs); private: void DoInitPostScript() override; @@ -140,7 +138,12 @@ private: std::map subscription_callbacks; std::set xpub_subscriptions; - zeek::telemetry::CounterPtr total_xpub_stalls; + zeek::telemetry::CounterPtr total_xpub_drops; // events dropped due to XPUB socket hwm reached + zeek::telemetry::CounterPtr total_onloop_drops; // events dropped due to onloop queue full + + // Could rework to log-once-every X seconds if needed. + double xpub_drop_last_warn_at = 0.0; + double onloop_drop_last_warn_at = 0.0; }; } // namespace cluster::zeromq From d2bb86f8b482b84d30987992937db081e59c7586 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 22 Jul 2025 12:44:34 +0200 Subject: [PATCH 7/9] cluster/zeromq: Metric for msg errors --- src/cluster/backend/zeromq/ZeroMQ.cc | 9 ++++++++- src/cluster/backend/zeromq/ZeroMQ.h | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 45763fab54..76373b3c64 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -91,7 +91,10 @@ ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_pt "Number of published events dropped due to XPUB socket HWM.")), total_onloop_drops( zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {}, - "Number of received events dropped due to OnLoop queue full.")) {} + "Number of received events dropped due to OnLoop queue full.")), + total_msg_errors( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {}, + "Number of events with the wrong number of message parts.")) {} ZeroMQBackend::~ZeroMQBackend() { try { @@ -513,6 +516,7 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { } else { ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); } } } @@ -522,6 +526,7 @@ void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) // sender, format, type, payload if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } @@ -540,6 +545,7 @@ void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs for ( const auto& msg : msgs ) { if ( msg.size() != 1 ) { ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } @@ -576,6 +582,7 @@ void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs for ( const auto& msg : msgs ) { if ( msg.size() != 4 ) { ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); + total_msg_errors->Inc(); continue; } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 11e905d2d7..952c978dd3 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -140,6 +140,7 @@ private: zeek::telemetry::CounterPtr total_xpub_drops; // events dropped due to XPUB socket hwm reached zeek::telemetry::CounterPtr total_onloop_drops; // events dropped due to onloop queue full + zeek::telemetry::CounterPtr total_msg_errors; // messages with the wrong number of parts // Could rework to log-once-every X seconds if needed. double xpub_drop_last_warn_at = 0.0; From c8307487d16ea6cc00f4d3468b7c904d65cb5aed Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 22 Jul 2025 13:15:23 +0200 Subject: [PATCH 8/9] btest/cluster/zeromq: Add tests for overload behavior The overload-drop.zeek and overload-no-drop.zeek tests have proxy, worker-1 and worker-2 publish to the manager topic. For the drop case, we verify that both, the senders, but also the manager drops events. For the no-drop test, the HWMs are set such that all events are buffered. The overload-worker-proxy-topic*.zeek tests are similar, but instead of publishing to the manager topic, proxy, worker-1 and worker-2 publish to the proxy and worker topics to overload each other. This had previously resulted in lockups and these tests verify that this doesn't happen anymore. --- .../cluster.zeromq.overload-drop/manager.out | 13 ++ .../cluster.zeromq.overload-drop/proxy.out | 3 + .../cluster.zeromq.overload-drop/worker-1.out | 3 + .../cluster.zeromq.overload-drop/worker-2.out | 3 + .../manager.out | 13 ++ .../cluster.zeromq.overload-no-drop/proxy.out | 3 + .../worker-1.out | 3 + .../worker-2.out | 3 + .../manager.out | 13 ++ .../proxy.out | 5 + .../worker-1.out | 5 + .../worker-2.out | 5 + .../manager.out | 13 ++ .../proxy.out | 5 + .../worker-1.out | 5 + .../worker-2.out | 5 + testing/btest/Files/zeromq/metrics.zeek | 20 ++ .../btest/cluster/zeromq/overload-drop.zeek | 166 ++++++++++++++++ .../cluster/zeromq/overload-no-drop.zeek | 160 +++++++++++++++ .../overload-worker-proxy-topic-drop.zeek | 183 ++++++++++++++++++ .../overload-worker-proxy-topic-no-drop.zeek | 181 +++++++++++++++++ 21 files changed, 810 insertions(+) create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-drop/manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-drop/proxy.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-drop/worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-drop/worker-2.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-no-drop/manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-no-drop/proxy.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-2.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/proxy.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-2.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/proxy.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-2.out create mode 100644 testing/btest/Files/zeromq/metrics.zeek create mode 100644 testing/btest/cluster/zeromq/overload-drop.zeek create mode 100644 testing/btest/cluster/zeromq/overload-no-drop.zeek create mode 100644 testing/btest/cluster/zeromq/overload-worker-proxy-topic-drop.zeek create mode 100644 testing/btest/cluster/zeromq/overload-worker-proxy-topic-no-drop.zeek diff --git a/testing/btest/Baseline/cluster.zeromq.overload-drop/manager.out b/testing/btest/Baseline/cluster.zeromq.overload-drop/manager.out new file mode 100644 index 0000000000..80114a677d --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-drop/manager.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +nodes_up, 1 +nodes_up, 2 +nodes_up, 3 +sending finish +nodes_down, 1 +nodes_down, 2 +nodes_down, 3 +had xpub_drops?, F +had onloop_drops?, T +node proxy dropped=T +node worker-1 dropped=T +node worker-2 dropped=T diff --git a/testing/btest/Baseline/cluster.zeromq.overload-drop/proxy.out b/testing/btest/Baseline/cluster.zeromq.overload-drop/proxy.out new file mode 100644 index 0000000000..0f07291450 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-drop/proxy.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-1.out new file mode 100644 index 0000000000..0f07291450 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-1.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-2.out new file mode 100644 index 0000000000..0f07291450 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-drop/worker-2.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-no-drop/manager.out b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/manager.out new file mode 100644 index 0000000000..c3e9ba5d8b --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/manager.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +nodes_up, 1 +nodes_up, 2 +nodes_up, 3 +sending finish +nodes_down, 1 +nodes_down, 2 +nodes_down, 3 +had xpub_drops?, F +had onloop_drops?, F +node proxy dropped=0 count=100000 +node worker-1 dropped=0 count=100000 +node worker-2 dropped=0 count=100000 diff --git a/testing/btest/Baseline/cluster.zeromq.overload-no-drop/proxy.out b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/proxy.out new file mode 100644 index 0000000000..3a256a356f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/proxy.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-1.out new file mode 100644 index 0000000000..3a256a356f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-1.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-2.out new file mode 100644 index 0000000000..3a256a356f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-no-drop/worker-2.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/manager.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/manager.out new file mode 100644 index 0000000000..25817823c9 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/manager.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +nodes_up, 1 +nodes_up, 2 +nodes_up, 3 +nodes_done, 1 +nodes_done, 2 +nodes_done, 3 +sending finish +nodes_down, 1 +nodes_down, 2 +nodes_down, 3 +had xpub_drops?, F +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/proxy.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/proxy.out new file mode 100644 index 0000000000..777154a514 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/proxy.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, T +node worker-1 dropped=T +node worker-2 dropped=T diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-1.out new file mode 100644 index 0000000000..7c514b130b --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-1.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, T +node proxy dropped=T +node worker-2 dropped=T diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-2.out new file mode 100644 index 0000000000..cb945972c4 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-drop/worker-2.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, T +had onloop_drops?, T +node proxy dropped=T +node worker-1 dropped=T diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/manager.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/manager.out new file mode 100644 index 0000000000..25817823c9 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/manager.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +nodes_up, 1 +nodes_up, 2 +nodes_up, 3 +nodes_done, 1 +nodes_done, 2 +nodes_done, 3 +sending finish +nodes_down, 1 +nodes_down, 2 +nodes_down, 3 +had xpub_drops?, F +had onloop_drops?, F diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/proxy.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/proxy.out new file mode 100644 index 0000000000..8d0ddb542b --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/proxy.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F +node worker-1 dropped=0 count=100000 +node worker-2 dropped=0 count=100000 diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-1.out new file mode 100644 index 0000000000..2ff9717005 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-1.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F +node proxy dropped=0 count=100000 +node worker-2 dropped=0 count=100000 diff --git a/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-2.out new file mode 100644 index 0000000000..2720e190f8 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.overload-worker-proxy-topic-no-drop/worker-2.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +had xpub_drops?, F +had onloop_drops?, F +node proxy dropped=0 count=100000 +node worker-1 dropped=0 count=100000 diff --git a/testing/btest/Files/zeromq/metrics.zeek b/testing/btest/Files/zeromq/metrics.zeek new file mode 100644 index 0000000000..0812e4659e --- /dev/null +++ b/testing/btest/Files/zeromq/metrics.zeek @@ -0,0 +1,20 @@ +module Cluster::Backend::ZeroMQ; + +export { + global xpub_drops: function(): count; + global onloop_drops: function(): count; +} + +function xpub_drops(): count + { + local ms = Telemetry::collect_metrics("zeek", "cluster_zeromq_xpub_drops_total"); + assert |ms| == 1, fmt("%s", |ms|); + return double_to_count(ms[0]$value); + } + +function onloop_drops(): count + { + local ms = Telemetry::collect_metrics("zeek", "cluster_zeromq_onloop_drops_total"); + assert |ms| == 1, fmt("%s", |ms|); + return double_to_count(ms[0]$value); + } diff --git a/testing/btest/cluster/zeromq/overload-drop.zeek b/testing/btest/cluster/zeromq/overload-drop.zeek new file mode 100644 index 0000000000..3aeec17d84 --- /dev/null +++ b/testing/btest/cluster/zeromq/overload-drop.zeek @@ -0,0 +1,166 @@ +# @TEST-DOC: Workers and proxy publish to the manager topic. They publish so fast that messages are dropped a) on their end and b) on the manager as well. The test checks that metrics are incremented and the manager also verifies that not all messages arrived. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/zeromq/metrics.zeek zeromq-metrics.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek +# @TEST-EXEC: zeek --parse-only other.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek> out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff proxy/out +# @TEST-EXEC: btest-diff worker-1/out +# @TEST-EXEC: btest-diff worker-2/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap +@load ./zeromq-metrics + +global tick: event() &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; + +# How many messages each node publishes in total. +const total_publishes = 100000; +# How many events to publish per tick() +const batch = 100; + +# Lower HWMs to provoke drops +redef Cluster::Backend::ZeroMQ::xpub_sndhwm = batch/ 5; +redef Cluster::Backend::ZeroMQ::onloop_queue_hwm = batch / 5; + +global test_nodes = set( "proxy", "worker-1", "worker-2" ) &ordered; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = set(); +global nodes_down: set[string] = set(); + +global sent_finish = F; + +event send_finish() + { + if ( sent_finish ) + return; + + print "sending finish"; + for ( n in test_nodes ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); + + sent_finish = T; + } + +event Cluster::node_up(name: string, id: string) + { + add nodes_up[name]; + print "nodes_up", |nodes_up|; + + # Get the ball rolling once all nodes are available, sending the + # first tick() to proxy and workers. + if ( |nodes_up| == |test_nodes| ) + { + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); + } + } + +event Cluster::node_down(name: string, id: string) + { + add nodes_down[name]; + print "nodes_down", |nodes_down|; + if ( |nodes_down| == |test_nodes| ) + terminate(); + } + +global last_c: table[string] of count &default=0; +global drop_c: table[string] of count &default=0; + +event ping(sender: string, c: count) + { + local dropped = c - last_c[sender] - 1; + if ( dropped > 0 ) + drop_c[sender] += dropped; + + last_c[sender] = c; + + # Check if all senders sent enough messages. If not, + # wait for the next ping to arrive. + for ( _, lc in last_c ) + if ( lc < total_publishes ) + return; + + # Send finish just once. + event send_finish(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + + for ( n in test_nodes ) + print fmt("node %s dropped=%s", n, drop_c[n] > 0); + + } +# @TEST-END-FILE + + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +global publishes = 0; + +event tick() + { + local i = batch; + while ( i > 0 ) + { + --i; + ++publishes; + Cluster::publish(Cluster::manager_topic, ping, Cluster::node, publishes); + + # Continue sending a single publish for every tick() even + # if we've published enough in order for the manager to + # detect we're done. We need to continue here because this + # node, but also the manager node, may have dropped events. + if ( publishes >= total_publishes ) + break; + } + + # Relax publishing if we published enough so the manager + # isn't totally overloaded. + local s = publishes < total_publishes ? 0sec : 0.05sec; + schedule s { tick() }; + } + +event finish(name: string) + { + terminate(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + } +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/overload-no-drop.zeek b/testing/btest/cluster/zeromq/overload-no-drop.zeek new file mode 100644 index 0000000000..6d19dc6acb --- /dev/null +++ b/testing/btest/cluster/zeromq/overload-no-drop.zeek @@ -0,0 +1,160 @@ +# @TEST-DOC: Workers and proxy publish to the manager topic. They publish so fast that messages would be dropped by sender and receiver, but the HWM settings are 0 so nothing is dropped at the expense of using more memory. This is verified via metrics and checking the recevied pings on the manager. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/zeromq/metrics.zeek zeromq-metrics.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek +# @TEST-EXEC: zeek --parse-only other.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek> out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff proxy/out +# @TEST-EXEC: btest-diff worker-1/out +# @TEST-EXEC: btest-diff worker-2/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap +@load ./zeromq-metrics + +global tick: event() &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; + +# How many messages each node publishes in total. +const total_publishes = 100000; +# How many events to publish per tick() +const batch = 100; + +# Unlimited buffering. +redef Cluster::Backend::ZeroMQ::xpub_sndhwm = 0; +redef Cluster::Backend::ZeroMQ::onloop_queue_hwm = 0; + +global test_nodes = set( "proxy", "worker-1", "worker-2" ) &ordered; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = set(); +global nodes_down: set[string] = set(); + +global sent_finish = F; + +event send_finish() + { + if ( sent_finish ) + return; + + print "sending finish"; + for ( n in test_nodes ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); + + sent_finish = T; + } + +event Cluster::node_up(name: string, id: string) + { + add nodes_up[name]; + print "nodes_up", |nodes_up|; + + # Get the ball rolling once all nodes are available, sending the + # first tick() to proxy and workers. + if ( |nodes_up| == |test_nodes| ) + { + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); + } + } + +event Cluster::node_down(name: string, id: string) + { + add nodes_down[name]; + print "nodes_down", |nodes_down|; + if ( |nodes_down| == |test_nodes| ) + terminate(); + } + +global last_c: table[string] of count &default=0; +global drop_c: table[string] of count &default=0; + +event ping(sender: string, c: count) + { + local dropped = c - last_c[sender] - 1; + if ( dropped > 0 ) + drop_c[sender] += dropped; + + last_c[sender] = c; + + # Check if all senders sent enough messages. If not, + # wait for the next ping to arrive. + for ( _, lc in last_c ) + if ( lc < total_publishes ) + return; + + # Send finish just once. + event send_finish(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + + for ( n in test_nodes ) + print fmt("node %s dropped=%s count=%s", n, drop_c[n], last_c[n]); + } +# @TEST-END-FILE + + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +global publishes = 0; + +event tick() + { + local i = batch; + while ( i > 0 ) + { + --i; + ++publishes; + Cluster::publish(Cluster::manager_topic, ping, Cluster::node, publishes); + + # Return once all messages were published. Nothing's supposed + # to be dropped, so that should be fine. + if ( publishes >= total_publishes ) + return; + } + + schedule 0sec { tick() }; + } + +event finish(name: string) + { + terminate(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + } +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/overload-worker-proxy-topic-drop.zeek b/testing/btest/cluster/zeromq/overload-worker-proxy-topic-drop.zeek new file mode 100644 index 0000000000..fe1f73f9f0 --- /dev/null +++ b/testing/btest/cluster/zeromq/overload-worker-proxy-topic-drop.zeek @@ -0,0 +1,183 @@ +# @TEST-DOC: Workers and proxy publish to the worker and proxy topics. They publish so fast that messages are dropped a) on their end and b) their own onloop queue as well. The test checks that metrics are incremented and there's no lockup. The manager only coordinates startup and shutdown. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/zeromq/metrics.zeek zeromq-metrics.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek +# @TEST-EXEC: zeek --parse-only other.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek> out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff proxy/out +# @TEST-EXEC: btest-diff worker-1/out +# @TEST-EXEC: btest-diff worker-2/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap +@load ./zeromq-metrics + +global tick: event() &is_used; +global done: event(name: string) &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; + +# How many messages each node publishes in total. +const total_publishes = 100000; +# How many events to publish per tick() +const batch = 100; + +# Lower HWMs to provoke drops +redef Cluster::Backend::ZeroMQ::xpub_sndhwm = batch/ 5; +redef Cluster::Backend::ZeroMQ::onloop_queue_hwm = batch / 5; + +global test_nodes = set( "proxy", "worker-1", "worker-2" ) &ordered; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = set(); +global nodes_done: set[string] = set(); +global nodes_down: set[string] = set(); + +global sent_finish = F; + +event send_finish() + { + if ( sent_finish ) + return; + + print "sending finish"; + for ( n in test_nodes ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); + + sent_finish = T; + } + +event Cluster::node_up(name: string, id: string) + { + add nodes_up[name]; + print "nodes_up", |nodes_up|; + + # Get the ball rolling once all nodes are available, sending the + # first tick() to proxy and workers. + if ( |nodes_up| == |test_nodes| ) + { + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); + } + } + +event Cluster::node_down(name: string, id: string) + { + add nodes_down[name]; + print "nodes_down", |nodes_down|; + if ( |nodes_down| == |test_nodes| ) + terminate(); + } + +event done(sender: string) + { + local prev = |nodes_done|; + add nodes_done[sender]; + if ( prev < |nodes_done| ) + print "nodes_done", |nodes_done|; + + if ( |nodes_done| == |test_nodes| ) + event send_finish(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + } +# @TEST-END-FILE + + +# @TEST-START-FILE other.zeek +@load ./common.zeek +global last_c: table[string] of count &default=0; +global drop_c: table[string] of count &default=0; + +event ping(sender: string, c: count) + { + local dropped = c - last_c[sender] - 1; + if ( dropped > 0 ) + drop_c[sender] += dropped; + + last_c[sender] = c; + + # Check if all senders sent enough messages. If not, + # wait for the next ping to arrive. + if ( |last_c| < |test_nodes| - 1 ) + return; + + for ( _, lc in last_c ) + if ( lc < total_publishes ) + return; + + # If all nodes sent enough pings, send "done" to the manager. + Cluster::publish(Cluster::manager_topic, done, Cluster::node); + } + +global publishes = 0; + +event tick() + { + local i = batch; + while ( i > 0 ) + { + --i; + ++publishes; + Cluster::publish(Cluster::worker_topic, ping, Cluster::node, publishes); + Cluster::publish(Cluster::proxy_topic, ping, Cluster::node, publishes); + + # Continue sending a single publish for every tick() even + # if we've published enough in order for the manager to + # detect we're done. We need to continue here because this + # node, but also the manager node, may have dropped events. + if ( publishes >= total_publishes ) + break; + } + + # Relax publishing if we published enough as to not + # continue to overload the cluster and have a better + # chance of termination events going through. + local s = publishes < total_publishes ? 0sec : 0.05sec; + schedule s { tick() }; + } + +event finish(name: string) + { + terminate(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + + for ( n in test_nodes ) + if ( n != Cluster::node ) + print fmt("node %s dropped=%s", n, drop_c[n] > 0); + } +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/overload-worker-proxy-topic-no-drop.zeek b/testing/btest/cluster/zeromq/overload-worker-proxy-topic-no-drop.zeek new file mode 100644 index 0000000000..69a96ace67 --- /dev/null +++ b/testing/btest/cluster/zeromq/overload-worker-proxy-topic-no-drop.zeek @@ -0,0 +1,181 @@ +# @TEST-DOC: Workers and proxy publish to the worker and proxy topics. They publish so fast that messages are dropped a) on their end and b) their own onloop queue as well. The test checks that metrics are incremented and there's no lockup. The manager only coordinates startup and shutdown. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/zeromq/metrics.zeek zeromq-metrics.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek +# @TEST-EXEC: zeek --parse-only other.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek> out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff proxy/out +# @TEST-EXEC: btest-diff worker-1/out +# @TEST-EXEC: btest-diff worker-2/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap +@load ./zeromq-metrics + +global tick: event() &is_used; +global done: event(name: string) &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; + +# How many messages each node publishes in total. +const total_publishes = 100000; +# How many events to publish per tick() +const batch = 100; + +# Lower HWMs to provoke drops +redef Cluster::Backend::ZeroMQ::xpub_sndhwm = 0; +redef Cluster::Backend::ZeroMQ::onloop_queue_hwm = 0; + +global test_nodes = set( "proxy", "worker-1", "worker-2" ) &ordered; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = set(); +global nodes_done: set[string] = set(); +global nodes_down: set[string] = set(); + +global sent_finish = F; + +event send_finish() + { + if ( sent_finish ) + return; + + print "sending finish"; + for ( n in test_nodes ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); + + sent_finish = T; + } + +event Cluster::node_up(name: string, id: string) + { + add nodes_up[name]; + print "nodes_up", |nodes_up|; + + # Get the ball rolling once all nodes are available, sending the + # first tick() to proxy and workers. + if ( |nodes_up| == |test_nodes| ) + { + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); + } + } + +event Cluster::node_down(name: string, id: string) + { + add nodes_down[name]; + print "nodes_down", |nodes_down|; + if ( |nodes_down| == |test_nodes| ) + terminate(); + } + +event done(sender: string) + { + add nodes_done[sender]; + print "nodes_done", |nodes_done|; + if ( |nodes_done| == |test_nodes| ) + event send_finish(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + } +# @TEST-END-FILE + + +# @TEST-START-FILE other.zeek +@load ./common.zeek +global last_c: table[string] of count &default=0; +global drop_c: table[string] of count &default=0; + +global sent_done = F; + +event ping(sender: string, c: count) + { + local dropped = c - last_c[sender] - 1; + if ( dropped > 0 ) + drop_c[sender] += dropped; + + last_c[sender] = c; + + # Check if all senders sent enough messages. If not, + # wait for the next ping to arrive. + if ( |last_c| < |test_nodes| - 1 ) + return; + + for ( _, lc in last_c ) + if ( lc < total_publishes ) + return; + + # If all nodes sent enough pings, send "done" to the manager. + if ( ! sent_done ) + { + Cluster::publish(Cluster::manager_topic, done, Cluster::node); + sent_done = T; + } + } + +global publishes = 0; + +event tick() + { + local i = batch; + while ( i > 0 ) + { + --i; + ++publishes; + Cluster::publish(Cluster::worker_topic, ping, Cluster::node, publishes); + Cluster::publish(Cluster::proxy_topic, ping, Cluster::node, publishes); + + # Return once all messages were published. Nothing's supposed + # to be dropped, so that should be fine. + if ( publishes >= total_publishes ) + return; + } + + schedule 0sec { tick() }; + } + +event finish(name: string) + { + terminate(); + } + +event zeek_done() + { + local xpub_drops = Cluster::Backend::ZeroMQ::xpub_drops(); + local onloop_drops = Cluster::Backend::ZeroMQ::onloop_drops(); + print "had xpub_drops?", xpub_drops > 0; + print "had onloop_drops?", onloop_drops > 0; + + for ( n in test_nodes ) + if ( n != Cluster::node ) + print fmt("node %s dropped=%s count=%s", n, drop_c[n], last_c[n]); + + } +# @TEST-END-FILE From 55ecd909285a7c82454ea2717470703e39923ca9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 22 Jul 2025 15:13:26 +0200 Subject: [PATCH 9/9] cluster.bif: Improve Cluster::publish() docstring --- src/cluster/cluster.bif | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index 927f129bc0..562cace461 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -25,7 +25,10 @@ type Cluster::WebSocketTLSOptions: record; ## :zeek:see:`Cluster::make_event` or the argument list to pass along ## to it. ## -## Returns: true if the message is sent. +## Returns: T if the event was accepted for sending. Depending on +## the selected cluster backend, an event may be dropped +## when a Zeek cluster is overloadede. This can happen on +## the sending or receiving node. function Cluster::publish%(topic: string, ...%): bool %{ ScriptLocationScope scope{frame};