From 89c0b0faf36e1e3ce0361ce442e0df099808c313 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 20 Jun 2025 12:32:31 +0200 Subject: [PATCH] cluster/zeromq: Hook up and enable IPV6 by default ZeroMQ's IPv6 support isn't enabled by default, resulting in "No such device" errors when attempting to listen on an IPv6 address. This change adds a ipv6 option to the ZeroMQ module and enables it by default. Further, adds a test configuring everything to listen on IPv6 ::1 as well, and one test to provoke the original error. This also regularizes some error messages. The addr_to_uri() calls weren't actually needed, but they apparently do not hurt and the result is easier on the eyes, so use them :-) --- .../cluster/backend/zeromq/main.zeek | 16 +- src/cluster/backend/zeromq/ZeroMQ-Proxy.cc | 9 +- src/cluster/backend/zeromq/ZeroMQ-Proxy.h | 4 +- src/cluster/backend/zeromq/ZeroMQ.cc | 19 ++- src/cluster/backend/zeromq/ZeroMQ.h | 1 + .../cluster.zeromq.ipv6-error/logger..stderr | 3 + .../cluster.zeromq.ipv6-error/manager..stderr | 3 + .../cluster.log.normalized | 21 +++ .../cluster.zeromq.ipv6-logging/manager.out | 16 ++ .../node_up.sorted | 21 +++ .../Files/zeromq/cluster-layout-simple.zeek | 18 ++- .../btest/Files/zeromq/test-bootstrap.zeek | 16 +- testing/btest/cluster/zeromq/ipv6-error.zeek | 27 ++++ .../btest/cluster/zeromq/ipv6-logging.zeek | 140 ++++++++++++++++++ 14 files changed, 293 insertions(+), 21 deletions(-) create mode 100644 testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr create mode 100644 testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr create mode 100644 testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized create mode 100644 testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted create mode 100644 testing/btest/cluster/zeromq/ipv6-error.zeek create mode 100644 testing/btest/cluster/zeromq/ipv6-logging.zeek diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 34c127ddf3..255be89ce4 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -22,6 +22,9 @@ ##! possible to run non-Zeek logger nodes. All a logger node needs to do is ##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes ##! to send their log writes. + +@load base/utils/addrs + module Cluster::Backend::ZeroMQ; export { @@ -139,6 +142,15 @@ export { ## for more details. const log_rcvbuf: int = -1 &redef; + ## Set ZMQ_IPV6 option. + ## + ## The ZeroMQ library has IPv6 support in ZeroMQ. For Zeek we enable it + ## unconditionally such that listening or connecting with IPv6 just works. + ## + ## See ZeroMQ's `ZMQ_IPV6 documentation `_ + ## for more details. + const ipv6 = T &redef; + ## Do not silently drop messages if high-water-mark is reached. ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket @@ -278,7 +290,7 @@ redef Cluster::worker_pool_spec = Cluster::PoolSpec( @if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) const my_node = Cluster::nodes[Cluster::node]; @if ( my_node?$p ) -redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p)); +redef listen_log_endpoint = fmt("tcp://%s:%s", addr_to_uri(my_node$ip), port_to_count(my_node$p)); @endif @endif @@ -298,7 +310,7 @@ event zeek_init() &priority=100 local endp: string; if ( node$node_type == Cluster::LOGGER && node?$p ) { - endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); + endp = fmt("tcp://%s:%s", addr_to_uri(node$ip), port_to_count(node$p)); connect_log_endpoints += endp; } diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index d52a8df7d5..755618df11 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -38,6 +38,9 @@ void thread_fun(ProxyThread::Args* args) { bool ProxyThread::Start() { ctx.set(zmq::ctxopt::io_threads, io_threads); + // Enable IPv6 support for all subsequently created sockets, if configured. + ctx.set(zmq::ctxopt::ipv6, ipv6); + zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub); @@ -53,14 +56,16 @@ bool ProxyThread::Start() { try { xpub.bind(xpub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + zeek::reporter->Error("ZeroMQ: Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), + err.num()); return false; } try { xsub.bind(xsub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + zeek::reporter->Error("ZeroMQ: Failed to bind xsub socket %s: %s (%d)", xsub_endpoint.c_str(), err.what(), + err.num()); return false; } diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.h b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h index 63be24ef25..c1cf63af1e 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.h +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h @@ -21,9 +21,10 @@ public: * @param xsub_endpoint the XSUB socket address to listen on. * @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket. */ - ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop, int io_threads) + ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int ipv6, int xpub_nodrop, int io_threads) : xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), + ipv6(ipv6), xpub_nodrop(xpub_nodrop), io_threads(io_threads) {} @@ -54,6 +55,7 @@ private: Args args; std::string xpub_endpoint; std::string xsub_endpoint; + int ipv6 = 1; int xpub_nodrop = 1; int io_threads = 2; }; diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index f8d2fe8aed..4793e81ac9 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -90,6 +90,7 @@ void ZeroMQBackend::DoInitPostScript() { zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); listen_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString(); + ipv6 = zeek::id::find_val("Cluster::Backend::ZeroMQ::ipv6")->AsBool() ? 1 : 0; listen_xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0; connect_xpub_endpoint = @@ -159,6 +160,9 @@ void ZeroMQBackend::DoTerminate() { } bool ZeroMQBackend::DoInit() { + // Enable IPv6 support for all subsequently created sockets, if configured. + ctx.set(zmq::ctxopt::ipv6, ipv6); + xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_push = zmq::socket_t(ctx, zmq::socket_type::push); @@ -177,14 +181,16 @@ bool ZeroMQBackend::DoInit() { try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect xsub socket %s: %s", connect_xsub_endpoint.c_str(), + err.what()); return false; } try { xpub.connect(connect_xpub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect xpub socket %s: %s", connect_xpub_endpoint.c_str(), + err.what()); return false; } @@ -221,8 +227,7 @@ bool ZeroMQBackend::DoInit() { try { log_pull.bind(listen_log_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(), - err.what()); + zeek::reporter->Error("ZeroMQ: Failed to bind pull socket %s: %s", listen_log_endpoint.c_str(), err.what()); return false; } } @@ -236,7 +241,7 @@ bool ZeroMQBackend::DoInit() { try { log_push.connect(endp); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect push socket %s: %s", endp.c_str(), err.what()); return false; } } @@ -270,8 +275,8 @@ bool ZeroMQBackend::DoInit() { } bool ZeroMQBackend::SpawnZmqProxyThread() { - proxy_thread = - std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop, proxy_io_threads); + proxy_thread = std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, ipv6, listen_xpub_nodrop, + proxy_io_threads); return proxy_thread->Start(); } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index dae822160b..20070a1ea5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -80,6 +80,7 @@ private: std::string listen_xsub_endpoint; std::string listen_xpub_endpoint; std::string listen_log_endpoint; + int ipv6 = 1; int listen_xpub_nodrop = 1; int linger_ms = 0; diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr b/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr new file mode 100644 index 0000000000..143139fc78 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error: ZeroMQ: Failed to bind pull socket... +fatal error in <...>/connect.zeek, line 17: Failed initialize ZeroMQ backend diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr b/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr new file mode 100644 index 0000000000..4e6f5b3e53 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error: ZeroMQ: Failed to bind xpub socket... +fatal error in <...>/connect.zeek, line 13: Failed to spawn ZeroMQ proxy thread diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized new file mode 100644 index 0000000000..fdebaf2784 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger got hello from manager (zeromq_manager___NrFj3eGxkRR5) +logger got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +logger got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +logger got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +manager got hello from logger (zeromq_logger___NrFj3eGxkRR5) +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from logger (zeromq_logger___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out new file mode 100644 index 0000000000..9c5b6173f4 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out @@ -0,0 +1,16 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A zeek_init, manager +B node_up, logger +B node_up, proxy +B node_up, worker-1 +B node_up, worker-2 +B nodes_up, 2 +B nodes_up, 3 +B nodes_up, 4 +B nodes_up, 5 +C send_finish +D node_down, logger +D node_down, proxy +D node_down, worker-1 +D node_down, worker-2 +D send_finish to logger diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted new file mode 100644 index 0000000000..1351c3ed4a --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger manager +logger proxy +logger worker-1 +logger worker-2 +manager logger +manager proxy +manager worker-1 +manager worker-2 +proxy logger +proxy manager +proxy worker-1 +proxy worker-2 +worker-1 logger +worker-1 manager +worker-1 proxy +worker-1 worker-2 +worker-2 logger +worker-2 manager +worker-2 proxy +worker-2 worker-1 diff --git a/testing/btest/Files/zeromq/cluster-layout-simple.zeek b/testing/btest/Files/zeromq/cluster-layout-simple.zeek index be99599819..9ed3c7fed7 100644 --- a/testing/btest/Files/zeromq/cluster-layout-simple.zeek +++ b/testing/btest/Files/zeromq/cluster-layout-simple.zeek @@ -1,9 +1,17 @@ redef Cluster::manager_is_logger = F; +const node_ip = 127.0.0.1 &redef; + +# If ZEEK_CLUSTER_IP is set, populate the cluster-layout's Node$ip fields with it. +const cluster_ip_env = getenv("BTEST_CLUSTER_IP"); +@if ( cluster_ip_env != "" ) +redef node_ip = to_addr(cluster_ip_env); +@endif + redef Cluster::nodes = { - ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], - ["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))], - ["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], - ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["manager"] = [$node_type=Cluster::MANAGER, $ip=node_ip], + ["logger"] = [$node_type=Cluster::LOGGER, $ip=node_ip, $p=to_port(getenv("LOG_PULL_PORT"))], + ["proxy"] = [$node_type=Cluster::PROXY, $ip=node_ip], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=node_ip], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=node_ip], }; diff --git a/testing/btest/Files/zeromq/test-bootstrap.zeek b/testing/btest/Files/zeromq/test-bootstrap.zeek index 23cd514694..882512c82f 100644 --- a/testing/btest/Files/zeromq/test-bootstrap.zeek +++ b/testing/btest/Files/zeromq/test-bootstrap.zeek @@ -1,11 +1,19 @@ # Helper scripts for test expecting XPUB/XSUB ports allocated by # btest and configuring the ZeroMQ globals. @load base/utils/numbers +@load base/utils/addrs @load frameworks/cluster/backend/zeromq @load frameworks/cluster/backend/zeromq/connect -redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT")))); -redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT")))); -redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT")))); -redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT")))); +# Use 127.0.0.1 by default for testing, unless there's a cluster-layout with +# a manager. In that case, use its IP address. +const local_addr_str = "127.0.0.1" &redef; +@if ( "manager" in Cluster::nodes ) +redef local_addr_str = addr_to_uri(Cluster::nodes["manager"]$ip); +@endif + +redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT")))); +redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT")))); +redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT")))); +redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT")))); diff --git a/testing/btest/cluster/zeromq/ipv6-error.zeek b/testing/btest/cluster/zeromq/ipv6-error.zeek new file mode 100644 index 0000000000..399f045b01 --- /dev/null +++ b/testing/btest/cluster/zeromq/ipv6-error.zeek @@ -0,0 +1,27 @@ +# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address, but disable ZeroMQ's IPv6 support. Check the error messages. Relates to #4586. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: can-listen-tcp 6 ::1 +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../common.zeek" +# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../common.zeek" +# @TEST-EXEC-FAIL: btest-bg-wait -k 10 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff manager/.stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff logger/.stderr + +# @TEST-START-FILE common.zeek +@load frameworks/cluster/backend/zeromq +# Explicitly disable ipv6 support to provoke errors. +redef Cluster::Backend::ZeroMQ::ipv6 = F; +@load ./zeromq-test-bootstrap +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/ipv6-logging.zeek b/testing/btest/cluster/zeromq/ipv6-logging.zeek new file mode 100644 index 0000000000..030c879470 --- /dev/null +++ b/testing/btest/cluster/zeromq/ipv6-logging.zeek @@ -0,0 +1,140 @@ +# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address. Regression test for #4586 +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: can-listen-tcp 6 ::1 +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# The BTEST_CLUSTER_IP is picked up by cluster-layout-simple.zeek, zeromq-test-bootstrap +# will pickup the manager's IP for rendering XPUB/XSUB listen and connect endpoints. +# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run proxy "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff cluster.log.normalized +# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted +# @TEST-EXEC: btest-diff node_up.sorted +# @TEST-EXEC: sort manager/out > manager.out +# @TEST-EXEC: btest-diff manager.out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +redef Log::default_rotation_interval = 0sec; +redef Log::flush_interval = 0.01sec; + +type Info: record { + self: string &log &default=Cluster::node; + node: string &log; +}; + +redef enum Log::ID += { TEST_LOG }; + +global finish: event(name: string) &is_used; + +event zeek_init() { + print "A zeek_init", Cluster::node; + Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]); +} + +event Cluster::node_up(name: string, id: string) &priority=-5 { + print "B node_up", name; + Log::write(TEST_LOG, [$node=name]); +} +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = {"manager"}; +global nodes_down: set[string] = {"manager"}; + +event send_finish() { + print "C send_finish"; + for ( n in nodes_up ) + if ( n != "logger" ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); +} + +event check_cluster_log() { + if ( file_size("DONE") >= 0 ) { + event send_finish(); + return; + } + + system("../check-cluster-log.sh"); + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() { + schedule 0.1sec { check_cluster_log() }; +} + +event Cluster::node_up(name: string, id: string) &priority=-1 { + add nodes_up[name]; + print "B nodes_up", |nodes_up|; +} + +event Cluster::node_down(name: string, id: string) { + print "D node_down", name; + add nodes_down[name]; + + if ( |nodes_down| == |Cluster::nodes| - 1 ) { + print "D send_finish to logger"; + Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node); + } + if ( |nodes_down| == |Cluster::nodes| ) + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +event finish(name: string) { + print fmt("finish from %s", name); + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks logger/cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the testing directory for baselining. +CLUSTER_LOG=../logger/cluster.log + +if [ ! -f $CLUSTER_LOG ]; then + echo "$CLUSTER_LOG not found!" >&2 + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log.tmp + +# 4 times 5 +if [ $(wc -l < cluster.log.tmp) = 20 ]; then + echo "DONE!" >&2 + mv cluster.log.tmp ../cluster.log.normalized + echo "DONE" > DONE +fi + +exit 0 +# @TEST-END-FILE