From 5dc4586b70ac848e4661461acce7a53cef76a218 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 30 Jun 2025 16:38:53 +0200 Subject: [PATCH] cluster/zeromq: Support local XPUB/XSUB hwm and buf configurability --- .../cluster/backend/zeromq/main.zeek | 33 +++++++++++++++++++ src/cluster/backend/zeromq/ZeroMQ.cc | 11 +++++++ src/cluster/backend/zeromq/ZeroMQ.h | 6 ++++ 3 files changed, 50 insertions(+) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index dd1e28fe86..e450a96e50 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -97,6 +97,39 @@ export { ## for more details. const linger_ms: int = 500 &redef; + ## Send high water mark value for the XPUB socket. + ## + ## If reached, Zeek nodes will block or drop messages. + ## + ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ + ## for more details. + const xpub_sndhwm: int = 1000 &redef; + + ## Kernel transmit buffer size for the XPUB socket. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_SNDBUF documentation `_ + ## for more details. + const xpub_sndbuf: int = -1 &redef; + + ## Receive high water mark value for the XSUB socket. + ## + ## If reached, the Zeek node will start reporting back pressure + ## to the central XPUB socket. + ## + ## See ZeroMQ's `ZMQ_RCVHWM documentation `_ + ## for more details. + const xsub_rcvhwm: int = 1000 &redef; + + ## Kernel receive buffer size for the XSUB socket. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_RCVBUF documentation `_ + ## for more details. + const xsub_rcvbuf: int = -1 &redef; + ## Configure ZeroMQ's immediate setting on PUSH sockets ## ## Setting this to ``T`` will queue log writes only to completed diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 234194d885..7f42027477 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -118,6 +118,12 @@ void ZeroMQBackend::DoInitPostScript() { zeek::telemetry_mgr ->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {}, "Counter for how many times sending on the XPUB socket stalled due to EAGAIN."); + + // xpub/xsub hwm configuration + xpub_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt()); + xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); + xsub_rcvhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt()); + xsub_rcvbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt()); } void ZeroMQBackend::DoTerminate() { @@ -179,6 +185,11 @@ bool ZeroMQBackend::DoInit() { xpub.set(zmq::sockopt::xpub_nodrop, connect_xpub_nodrop); xpub.set(zmq::sockopt::xpub_verbose, 1); + xpub.set(zmq::sockopt::sndhwm, xpub_sndhwm); + xpub.set(zmq::sockopt::sndbuf, xpub_sndbuf); + xsub.set(zmq::sockopt::rcvhwm, xsub_rcvhwm); + xsub.set(zmq::sockopt::rcvbuf, xsub_rcvbuf); + try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 58d6c7ff08..4d94b51142 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -92,6 +92,12 @@ private: EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription; + // xpub/xsub configuration + int xpub_sndhwm = 1000; // libzmq default + int xpub_sndbuf = -1; // OS defaults + int xsub_rcvhwm = 1000; // libzmq default + int xsub_rcvbuf = -1; // OS defaults + zmq::context_t ctx; zmq::socket_t xsub; zmq::socket_t xpub;