cluster/zeromq: Support local XPUB/XSUB hwm and buf configurability

This commit is contained in:
Arne Welzel 2025-06-30 16:38:53 +02:00
parent d79d4b1b2a
commit 5dc4586b70
3 changed files with 50 additions and 0 deletions

View file

@ -97,6 +97,39 @@ export {
## for more details.
const linger_ms: int = 500 &redef;
## Send high water mark value for the XPUB socket.
##
## If reached, Zeek nodes will block or drop messages.
##
## See ZeroMQ's `ZMQ_SNDHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc46>`_
## for more details.
const xpub_sndhwm: int = 1000 &redef;
## Kernel transmit buffer size for the XPUB socket.
##
## Using -1 will use the kernel's default.
##
## See ZeroMQ's `ZMQ_SNDBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc45>`_
## for more details.
const xpub_sndbuf: int = -1 &redef;
## Receive high water mark value for the XSUB socket.
##
## If reached, the Zeek node will start reporting back pressure
## to the central XPUB socket.
##
## See ZeroMQ's `ZMQ_RCVHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc35>`_
## for more details.
const xsub_rcvhwm: int = 1000 &redef;
## Kernel receive buffer size for the XSUB socket.
##
## Using -1 will use the kernel's default.
##
## See ZeroMQ's `ZMQ_RCVBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc34>`_
## for more details.
const xsub_rcvbuf: int = -1 &redef;
## Configure ZeroMQ's immediate setting on PUSH sockets
##
## Setting this to ``T`` will queue log writes only to completed

View file

@ -118,6 +118,12 @@ void ZeroMQBackend::DoInitPostScript() {
zeek::telemetry_mgr
->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {},
"Counter for how many times sending on the XPUB socket stalled due to EAGAIN.");
// xpub/xsub hwm configuration
xpub_sndhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt());
xpub_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt());
xsub_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt());
xsub_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt());
}
void ZeroMQBackend::DoTerminate() {
@ -179,6 +185,11 @@ bool ZeroMQBackend::DoInit() {
xpub.set(zmq::sockopt::xpub_nodrop, connect_xpub_nodrop);
xpub.set(zmq::sockopt::xpub_verbose, 1);
xpub.set(zmq::sockopt::sndhwm, xpub_sndhwm);
xpub.set(zmq::sockopt::sndbuf, xpub_sndbuf);
xsub.set(zmq::sockopt::rcvhwm, xsub_rcvhwm);
xsub.set(zmq::sockopt::rcvbuf, xsub_rcvbuf);
try {
xsub.connect(connect_xsub_endpoint);
} catch ( zmq::error_t& err ) {

View file

@ -92,6 +92,12 @@ private:
EventHandlerPtr event_subscription;
EventHandlerPtr event_unsubscription;
// xpub/xsub configuration
int xpub_sndhwm = 1000; // libzmq default
int xpub_sndbuf = -1; // OS defaults
int xsub_rcvhwm = 1000; // libzmq default
int xsub_rcvbuf = -1; // OS defaults
zmq::context_t ctx;
zmq::socket_t xsub;
zmq::socket_t xpub;