diff --git a/CHANGES b/CHANGES index 7bc808db64..c5c7338b8c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +7.2.0-dev.431 | 2025-03-26 14:23:51 +0100 + + * cluster/zeromq: Improve XPUB stall behavior, add a metric (Arne Welzel, Corelight) + + Instead of fprintf, track the number of occurrences via a metric and + change the sleep loop to a blocking send instead. + 7.2.0-dev.429 | 2025-03-26 13:59:23 +0100 * GH-4309: telemetry: Run callbacks at collect time (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index 7eec28b38e..2916d88e30 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.429 +7.2.0-dev.431 diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index e34cdc3ab8..f05ab88d48 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -23,6 +23,7 @@ #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" +#include "zeek/telemetry/Manager.h" #include "zeek/util.h" extern int signal_val; @@ -102,6 +103,11 @@ void ZeroMQBackend::DoInitPostScript() { event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); + + total_xpub_stalls = + zeek::telemetry_mgr + ->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {}, + "Counter for how many times sending on the XPUB socket stalled due to EAGAIN."); } void ZeroMQBackend::DoTerminate() { @@ -434,20 +440,29 @@ void ZeroMQBackend::Run() { break; } - // Empty result means xpub.send() returned EAGAIN. The - // socket reached its high water mark and we should - // relax / backoff a bit. Otherwise we'll be spinning - // unproductively very fast here. Note that this is going - // to build up backpressure and eventually inproc.send() - // will block from the main thread. + // Empty result means xpub.send() returned EAGAIN. The socket reached + // its high-water-mark and we cannot send right now. We simply attempt + // to re-send the message without the dontwait flag after increasing + // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until + // there's enough room available. if ( ! result ) { - ++tries; - auto sleep_for = std::min(tries * 10, 500); - ZEROMQ_THREAD_PRINTF( - "xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries, - sleep_for); + total_xpub_stalls->Inc(); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for)); + try { + // We sent non-blocking above so we are able to observe and report stalls + // in a metric. Now that we have done that switch to blocking send. + zmq::send_flags block_flags = + zmq::send_flags::none | (flags & zmq::send_flags::sndmore); + result = xpub.send(part, block_flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; + + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), + err.num()); + break; + } } } while ( ! result ); } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 9b3fc0e63b..fed3585394 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -10,7 +10,15 @@ #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" -namespace zeek::cluster::zeromq { + +namespace zeek { + +namespace telemetry { +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + +namespace cluster::zeromq { class ZeroMQBackend : public cluster::ThreadedBackend { public: @@ -105,6 +113,9 @@ private: // Tracking the subscriptions on the local XPUB socket. std::map subscription_callbacks; std::set xpub_subscriptions; + + zeek::telemetry::CounterPtr total_xpub_stalls; }; -} // namespace zeek::cluster::zeromq +} // namespace cluster::zeromq +} // namespace zeek