cluster/zeromq: Hook up and enable IPV6 by default

ZeroMQ's IPv6 support isn't enabled by default, resulting in
"No such device" errors when attempting to listen on an IPv6
address. This change adds a ipv6 option to the ZeroMQ module
and enables it by default. Further, adds a test configuring
everything to listen on IPv6 ::1 as well, and one test to provoke
the original error. This also regularizes some error messages.

The addr_to_uri() calls weren't actually needed, but they apparently do
not hurt and the result is easier on the eyes, so use them :-)
This commit is contained in:
Arne Welzel 2025-06-20 12:32:31 +02:00
parent cf43cf1809
commit 89c0b0faf3
14 changed files with 293 additions and 21 deletions

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error: ZeroMQ: Failed to bind pull socket...
fatal error in <...>/connect.zeek, line 17: Failed initialize ZeroMQ backend

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error: ZeroMQ: Failed to bind xpub socket...
fatal error in <...>/connect.zeek, line 13: Failed to spawn ZeroMQ proxy thread

View file

@ -0,0 +1,21 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)

View file

@ -0,0 +1,16 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A zeek_init, manager
B node_up, logger
B node_up, proxy
B node_up, worker-1
B node_up, worker-2
B nodes_up, 2
B nodes_up, 3
B nodes_up, 4
B nodes_up, 5
C send_finish
D node_down, logger
D node_down, proxy
D node_down, worker-1
D node_down, worker-2
D send_finish to logger

View file

@ -0,0 +1,21 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
logger manager
logger proxy
logger worker-1
logger worker-2
manager logger
manager proxy
manager worker-1
manager worker-2
proxy logger
proxy manager
proxy worker-1
proxy worker-2
worker-1 logger
worker-1 manager
worker-1 proxy
worker-1 worker-2
worker-2 logger
worker-2 manager
worker-2 proxy
worker-2 worker-1

View file

@ -1,9 +1,17 @@
redef Cluster::manager_is_logger = F;
const node_ip = 127.0.0.1 &redef;
# If ZEEK_CLUSTER_IP is set, populate the cluster-layout's Node$ip fields with it.
const cluster_ip_env = getenv("BTEST_CLUSTER_IP");
@if ( cluster_ip_env != "" )
redef node_ip = to_addr(cluster_ip_env);
@endif
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))],
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["manager"] = [$node_type=Cluster::MANAGER, $ip=node_ip],
["logger"] = [$node_type=Cluster::LOGGER, $ip=node_ip, $p=to_port(getenv("LOG_PULL_PORT"))],
["proxy"] = [$node_type=Cluster::PROXY, $ip=node_ip],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=node_ip],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=node_ip],
};

View file

@ -1,11 +1,19 @@
# Helper scripts for test expecting XPUB/XSUB ports allocated by
# btest and configuring the ZeroMQ globals.
@load base/utils/numbers
@load base/utils/addrs
@load frameworks/cluster/backend/zeromq
@load frameworks/cluster/backend/zeromq/connect
redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT"))));
redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT"))));
# Use 127.0.0.1 by default for testing, unless there's a cluster-layout with
# a manager. In that case, use its IP address.
const local_addr_str = "127.0.0.1" &redef;
@if ( "manager" in Cluster::nodes )
redef local_addr_str = addr_to_uri(Cluster::nodes["manager"]$ip);
@endif
redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT"))));
redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT"))));
redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT"))));

View file

@ -0,0 +1,27 @@
# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address, but disable ZeroMQ's IPv6 support. Check the error messages. Relates to #4586.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: can-listen-tcp 6 ::1
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../common.zeek"
# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../common.zeek"
# @TEST-EXEC-FAIL: btest-bg-wait -k 10
#
# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff manager/.stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff logger/.stderr
# @TEST-START-FILE common.zeek
@load frameworks/cluster/backend/zeromq
# Explicitly disable ipv6 support to provoke errors.
redef Cluster::Backend::ZeroMQ::ipv6 = F;
@load ./zeromq-test-bootstrap
# @TEST-END-FILE

View file

@ -0,0 +1,140 @@
# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address. Regression test for #4586
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: can-listen-tcp 6 ::1
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# The BTEST_CLUSTER_IP is picked up by cluster-layout-simple.zeek, zeromq-test-bootstrap
# will pickup the manager's IP for rendering XPUB/XSUB listen and connect endpoints.
# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run proxy "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff cluster.log.normalized
# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted
# @TEST-EXEC: btest-diff node_up.sorted
# @TEST-EXEC: sort manager/out > manager.out
# @TEST-EXEC: btest-diff manager.out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
redef Log::default_rotation_interval = 0sec;
redef Log::flush_interval = 0.01sec;
type Info: record {
self: string &log &default=Cluster::node;
node: string &log;
};
redef enum Log::ID += { TEST_LOG };
global finish: event(name: string) &is_used;
event zeek_init() {
print "A zeek_init", Cluster::node;
Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]);
}
event Cluster::node_up(name: string, id: string) &priority=-5 {
print "B node_up", name;
Log::write(TEST_LOG, [$node=name]);
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_up: set[string] = {"manager"};
global nodes_down: set[string] = {"manager"};
event send_finish() {
print "C send_finish";
for ( n in nodes_up )
if ( n != "logger" )
Cluster::publish(Cluster::node_topic(n), finish, Cluster::node);
}
event check_cluster_log() {
if ( file_size("DONE") >= 0 ) {
event send_finish();
return;
}
system("../check-cluster-log.sh");
schedule 0.1sec { check_cluster_log() };
}
event zeek_init() {
schedule 0.1sec { check_cluster_log() };
}
event Cluster::node_up(name: string, id: string) &priority=-1 {
add nodes_up[name];
print "B nodes_up", |nodes_up|;
}
event Cluster::node_down(name: string, id: string) {
print "D node_down", name;
add nodes_down[name];
if ( |nodes_down| == |Cluster::nodes| - 1 ) {
print "D send_finish to logger";
Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node);
}
if ( |nodes_down| == |Cluster::nodes| )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE other.zeek
@load ./common.zeek
event finish(name: string) {
print fmt("finish from %s", name);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE check-cluster-log.sh
#!/bin/sh
#
# This script checks logger/cluster.log until the expected number
# of log entries have been observed and puts a normalized version
# into the testing directory for baselining.
CLUSTER_LOG=../logger/cluster.log
if [ ! -f $CLUSTER_LOG ]; then
echo "$CLUSTER_LOG not found!" >&2
exit 1;
fi
if [ -f DONE ]; then
exit 0
fi
# Remove hostname and pid from node id in message.
zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log.tmp
# 4 times 5
if [ $(wc -l < cluster.log.tmp) = 20 ]; then
echo "DONE!" >&2
mv cluster.log.tmp ../cluster.log.normalized
echo "DONE" > DONE
fi
exit 0
# @TEST-END-FILE