Merge remote-tracking branch 'origin/topic/awelzel/4586-zeromq-ipv6'

* origin/topic/awelzel/4586-zeromq-ipv6:
  cluster/zeromq: Short-circuit DoPublishLogWrite() when not initialized
  cluster/zeromq: Hook up and enable IPV6 by default
  cluster/zeromq/connect: Make failures fatal
  cluster/zeromq: Move log_push creation to DoInit()

(cherry picked from commit cab4ebf513)
This commit is contained in:
Arne Welzel 2025-06-24 17:14:28 +02:00 committed by Tim Wojtulewicz
parent 8045612bb4
commit 7cdbbec44c
17 changed files with 352 additions and 29 deletions

View file

@ -8,7 +8,11 @@ module Cluster::Backend::ZeroMQ;
event zeek_init() &priority=10
{
if ( run_proxy_thread )
Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread();
{
if ( ! Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread() )
Reporter::fatal("Failed to spawn ZeroMQ proxy thread");
}
Cluster::init();
if ( ! Cluster::init() )
Reporter::fatal("Failed initialize ZeroMQ backend");
}

View file

@ -22,6 +22,9 @@
##! possible to run non-Zeek logger nodes. All a logger node needs to do is
##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes
##! to send their log writes.
@load base/utils/addrs
module Cluster::Backend::ZeroMQ;
export {
@ -139,6 +142,15 @@ export {
## for more details.
const log_rcvbuf: int = -1 &redef;
## Set ZMQ_IPV6 option.
##
## The ZeroMQ library has IPv6 support in ZeroMQ. For Zeek we enable it
## unconditionally such that listening or connecting with IPv6 just works.
##
## See ZeroMQ's `ZMQ_IPV6 documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc23>`_
## for more details.
const ipv6 = T &redef;
## Do not silently drop messages if high-water-mark is reached.
##
## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket
@ -278,7 +290,7 @@ redef Cluster::worker_pool_spec = Cluster::PoolSpec(
@if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) )
const my_node = Cluster::nodes[Cluster::node];
@if ( my_node?$p )
redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p));
redef listen_log_endpoint = fmt("tcp://%s:%s", addr_to_uri(my_node$ip), port_to_count(my_node$p));
@endif
@endif
@ -298,7 +310,7 @@ event zeek_init() &priority=100
local endp: string;
if ( node$node_type == Cluster::LOGGER && node?$p )
{
endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p));
endp = fmt("tcp://%s:%s", addr_to_uri(node$ip), port_to_count(node$p));
connect_log_endpoints += endp;
}