diff --git a/CHANGES b/CHANGES index fce9f46e76..b82d1d67f2 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,36 @@ +8.0.0-dev.503 | 2025-06-24 17:14:28 +0200 + + * cluster/zeromq: Short-circuit DoPublishLogWrite() when not initialized (Arne Welzel, Corelight) + + After moving the log_push initialization from the constructor to the + DoInit() method, it's now possible that DoPublishLogWrites() is invoked + even if DoInit() was never called. Handle this by short-circuiting. This + is sort of an error, but can happen during tests if scripts are loaded + somewhat arbitrarily. + + * cluster/zeromq: Hook up and enable IPV6 by default (Arne Welzel, Corelight) + + ZeroMQ's IPv6 support isn't enabled by default, resulting in + "No such device" errors when attempting to listen on an IPv6 + address. This change adds a ipv6 option to the ZeroMQ module + and enables it by default. Further, adds a test configuring + everything to listen on IPv6 ::1 as well, and one test to provoke + the original error. This also regularizes some error messages. + + The addr_to_uri() calls weren't actually needed, but they apparently do + not hurt and the result is easier on the eyes, so use them :-) + + * cluster/zeromq/connect: Make failures fatal (Arne Welzel, Corelight) + + The cluster is borked if the initialization fails, so may as well just + completely abort Zeek at that point with a fatal error. There's no real + point in continuing to run. + + * cluster/zeromq: Move log_push creation to DoInit() (Arne Welzel, Corelight) + + The log_push socket should be affected by the IPV6 option, so need to + delay its creation a bit. + 8.0.0-dev.498 | 2025-06-24 16:30:52 +0200 * GH-4587: Build builtin Spicy analyzers in debug mode if debug mode is enabled (Benjamin Bannier, Corelight) diff --git a/VERSION b/VERSION index 0e3be39f37..a5bd06a820 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0-dev.498 +8.0.0-dev.503 diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek index 94aee459ae..44f1a4bf20 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek @@ -8,7 +8,11 @@ module Cluster::Backend::ZeroMQ; event zeek_init() &priority=10 { if ( run_proxy_thread ) - Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread(); + { + if ( ! Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread() ) + Reporter::fatal("Failed to spawn ZeroMQ proxy thread"); + } - Cluster::init(); + if ( ! Cluster::init() ) + Reporter::fatal("Failed initialize ZeroMQ backend"); } diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 34c127ddf3..255be89ce4 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -22,6 +22,9 @@ ##! possible to run non-Zeek logger nodes. All a logger node needs to do is ##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes ##! to send their log writes. + +@load base/utils/addrs + module Cluster::Backend::ZeroMQ; export { @@ -139,6 +142,15 @@ export { ## for more details. const log_rcvbuf: int = -1 &redef; + ## Set ZMQ_IPV6 option. + ## + ## The ZeroMQ library has IPv6 support in ZeroMQ. For Zeek we enable it + ## unconditionally such that listening or connecting with IPv6 just works. + ## + ## See ZeroMQ's `ZMQ_IPV6 documentation `_ + ## for more details. + const ipv6 = T &redef; + ## Do not silently drop messages if high-water-mark is reached. ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket @@ -278,7 +290,7 @@ redef Cluster::worker_pool_spec = Cluster::PoolSpec( @if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) const my_node = Cluster::nodes[Cluster::node]; @if ( my_node?$p ) -redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p)); +redef listen_log_endpoint = fmt("tcp://%s:%s", addr_to_uri(my_node$ip), port_to_count(my_node$p)); @endif @endif @@ -298,7 +310,7 @@ event zeek_init() &priority=100 local endp: string; if ( node$node_type == Cluster::LOGGER && node?$p ) { - endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); + endp = fmt("tcp://%s:%s", addr_to_uri(node$ip), port_to_count(node$p)); connect_log_endpoints += endp; } diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index d52a8df7d5..755618df11 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -38,6 +38,9 @@ void thread_fun(ProxyThread::Args* args) { bool ProxyThread::Start() { ctx.set(zmq::ctxopt::io_threads, io_threads); + // Enable IPv6 support for all subsequently created sockets, if configured. + ctx.set(zmq::ctxopt::ipv6, ipv6); + zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub); @@ -53,14 +56,16 @@ bool ProxyThread::Start() { try { xpub.bind(xpub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + zeek::reporter->Error("ZeroMQ: Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), + err.num()); return false; } try { xsub.bind(xsub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + zeek::reporter->Error("ZeroMQ: Failed to bind xsub socket %s: %s (%d)", xsub_endpoint.c_str(), err.what(), + err.num()); return false; } diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.h b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h index 63be24ef25..c1cf63af1e 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.h +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h @@ -21,9 +21,10 @@ public: * @param xsub_endpoint the XSUB socket address to listen on. * @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket. */ - ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop, int io_threads) + ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int ipv6, int xpub_nodrop, int io_threads) : xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), + ipv6(ipv6), xpub_nodrop(xpub_nodrop), io_threads(io_threads) {} @@ -54,6 +55,7 @@ private: Args args; std::string xpub_endpoint; std::string xsub_endpoint; + int ipv6 = 1; int xpub_nodrop = 1; int io_threads = 2; }; diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index c2c5ac52a9..e167e97aae 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -72,10 +72,8 @@ constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast es, std::unique_ptr ls, std::unique_ptr ehs) - : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)) { - log_push = zmq::socket_t(ctx, zmq::socket_type::push); - main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); -} + : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)), + main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)) {} ZeroMQBackend::~ZeroMQBackend() { try { @@ -92,6 +90,7 @@ void ZeroMQBackend::DoInitPostScript() { zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); listen_xsub_endpoint = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString(); + ipv6 = zeek::id::find_val("Cluster::Backend::ZeroMQ::ipv6")->AsBool() ? 1 : 0; listen_xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0; connect_xpub_endpoint = @@ -161,8 +160,12 @@ void ZeroMQBackend::DoTerminate() { } bool ZeroMQBackend::DoInit() { + // Enable IPv6 support for all subsequently created sockets, if configured. + ctx.set(zmq::ctxopt::ipv6, ipv6); + xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); + log_push = zmq::socket_t(ctx, zmq::socket_type::push); log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); @@ -178,14 +181,16 @@ bool ZeroMQBackend::DoInit() { try { xsub.connect(connect_xsub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect xsub socket %s: %s", connect_xsub_endpoint.c_str(), + err.what()); return false; } try { xpub.connect(connect_xpub_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect xpub socket %s: %s", connect_xpub_endpoint.c_str(), + err.what()); return false; } @@ -222,8 +227,7 @@ bool ZeroMQBackend::DoInit() { try { log_pull.bind(listen_log_endpoint); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(), - err.what()); + zeek::reporter->Error("ZeroMQ: Failed to bind pull socket %s: %s", listen_log_endpoint.c_str(), err.what()); return false; } } @@ -237,7 +241,7 @@ bool ZeroMQBackend::DoInit() { try { log_push.connect(endp); } catch ( zmq::error_t& err ) { - zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what()); + zeek::reporter->Error("ZeroMQ: Failed to connect push socket %s: %s", endp.c_str(), err.what()); return false; } } @@ -271,8 +275,8 @@ bool ZeroMQBackend::DoInit() { } bool ZeroMQBackend::SpawnZmqProxyThread() { - proxy_thread = - std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop, proxy_io_threads); + proxy_thread = std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, ipv6, listen_xpub_nodrop, + proxy_io_threads); return proxy_thread->Start(); } @@ -369,7 +373,6 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, byte_buffer& buf) { - ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); static std::string message_type = "log-write"; // Publishing a log write is done using 4 parts @@ -385,6 +388,14 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he zmq::const_buffer{buf.data(), buf.size()}, }; + // If the log_push socket isn't yet initialized or has been closed, just return. + if ( ! log_push ) { + ZEROMQ_DEBUG("Skipping log write - log_push socket not open"); + return false; + } + + ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); + for ( size_t i = 0; i < parts.size(); i++ ) { zmq::send_flags flags = zmq::send_flags::dontwait; if ( i < parts.size() - 1 ) diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 0af5429a18..58d6c7ff08 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -80,6 +80,7 @@ private: std::string listen_xsub_endpoint; std::string listen_xpub_endpoint; std::string listen_log_endpoint; + int ipv6 = 1; int listen_xpub_nodrop = 1; int linger_ms = 0; diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr b/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr new file mode 100644 index 0000000000..143139fc78 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-error/logger..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error: ZeroMQ: Failed to bind pull socket... +fatal error in <...>/connect.zeek, line 17: Failed initialize ZeroMQ backend diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr b/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr new file mode 100644 index 0000000000..4e6f5b3e53 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-error/manager..stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error: ZeroMQ: Failed to bind xpub socket... +fatal error in <...>/connect.zeek, line 13: Failed to spawn ZeroMQ proxy thread diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized new file mode 100644 index 0000000000..fdebaf2784 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/cluster.log.normalized @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger got hello from manager (zeromq_manager___NrFj3eGxkRR5) +logger got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +logger got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +logger got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +manager got hello from logger (zeromq_logger___NrFj3eGxkRR5) +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from logger (zeromq_logger___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out new file mode 100644 index 0000000000..9c5b6173f4 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/manager.out @@ -0,0 +1,16 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A zeek_init, manager +B node_up, logger +B node_up, proxy +B node_up, worker-1 +B node_up, worker-2 +B nodes_up, 2 +B nodes_up, 3 +B nodes_up, 4 +B nodes_up, 5 +C send_finish +D node_down, logger +D node_down, proxy +D node_down, worker-1 +D node_down, worker-2 +D send_finish to logger diff --git a/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted new file mode 100644 index 0000000000..1351c3ed4a --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.ipv6-logging/node_up.sorted @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger manager +logger proxy +logger worker-1 +logger worker-2 +manager logger +manager proxy +manager worker-1 +manager worker-2 +proxy logger +proxy manager +proxy worker-1 +proxy worker-2 +worker-1 logger +worker-1 manager +worker-1 proxy +worker-1 worker-2 +worker-2 logger +worker-2 manager +worker-2 proxy +worker-2 worker-1 diff --git a/testing/btest/Files/zeromq/cluster-layout-simple.zeek b/testing/btest/Files/zeromq/cluster-layout-simple.zeek index be99599819..9ed3c7fed7 100644 --- a/testing/btest/Files/zeromq/cluster-layout-simple.zeek +++ b/testing/btest/Files/zeromq/cluster-layout-simple.zeek @@ -1,9 +1,17 @@ redef Cluster::manager_is_logger = F; +const node_ip = 127.0.0.1 &redef; + +# If ZEEK_CLUSTER_IP is set, populate the cluster-layout's Node$ip fields with it. +const cluster_ip_env = getenv("BTEST_CLUSTER_IP"); +@if ( cluster_ip_env != "" ) +redef node_ip = to_addr(cluster_ip_env); +@endif + redef Cluster::nodes = { - ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], - ["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))], - ["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], - ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["manager"] = [$node_type=Cluster::MANAGER, $ip=node_ip], + ["logger"] = [$node_type=Cluster::LOGGER, $ip=node_ip, $p=to_port(getenv("LOG_PULL_PORT"))], + ["proxy"] = [$node_type=Cluster::PROXY, $ip=node_ip], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=node_ip], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=node_ip], }; diff --git a/testing/btest/Files/zeromq/test-bootstrap.zeek b/testing/btest/Files/zeromq/test-bootstrap.zeek index 23cd514694..882512c82f 100644 --- a/testing/btest/Files/zeromq/test-bootstrap.zeek +++ b/testing/btest/Files/zeromq/test-bootstrap.zeek @@ -1,11 +1,19 @@ # Helper scripts for test expecting XPUB/XSUB ports allocated by # btest and configuring the ZeroMQ globals. @load base/utils/numbers +@load base/utils/addrs @load frameworks/cluster/backend/zeromq @load frameworks/cluster/backend/zeromq/connect -redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT")))); -redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT")))); -redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XSUB_PORT")))); -redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", port_to_count(to_port(getenv("XPUB_PORT")))); +# Use 127.0.0.1 by default for testing, unless there's a cluster-layout with +# a manager. In that case, use its IP address. +const local_addr_str = "127.0.0.1" &redef; +@if ( "manager" in Cluster::nodes ) +redef local_addr_str = addr_to_uri(Cluster::nodes["manager"]$ip); +@endif + +redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT")))); +redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT")))); +redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XSUB_PORT")))); +redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://%s:%s", local_addr_str, port_to_count(to_port(getenv("XPUB_PORT")))); diff --git a/testing/btest/cluster/zeromq/ipv6-error.zeek b/testing/btest/cluster/zeromq/ipv6-error.zeek new file mode 100644 index 0000000000..399f045b01 --- /dev/null +++ b/testing/btest/cluster/zeromq/ipv6-error.zeek @@ -0,0 +1,27 @@ +# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address, but disable ZeroMQ's IPv6 support. Check the error messages. Relates to #4586. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: can-listen-tcp 6 ::1 +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../common.zeek" +# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../common.zeek" +# @TEST-EXEC-FAIL: btest-bg-wait -k 10 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff manager/.stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER="sed -E 's,^error: ZeroMQ: Failed to bind ([^ ]+) socket tcp://\[::1\]:[0-9]+:.*$,error: ZeroMQ: Failed to bind \1 socket...,g' | $SCRIPTS/diff-remove-abspath" btest-diff logger/.stderr + +# @TEST-START-FILE common.zeek +@load frameworks/cluster/backend/zeromq +# Explicitly disable ipv6 support to provoke errors. +redef Cluster::Backend::ZeroMQ::ipv6 = F; +@load ./zeromq-test-bootstrap +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/ipv6-logging.zeek b/testing/btest/cluster/zeromq/ipv6-logging.zeek new file mode 100644 index 0000000000..030c879470 --- /dev/null +++ b/testing/btest/cluster/zeromq/ipv6-logging.zeek @@ -0,0 +1,140 @@ +# @TEST-DOC: Startup a ZeroMQ cluster using ::1 as address. Regression test for #4586 +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: can-listen-tcp 6 ::1 +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# The BTEST_CLUSTER_IP is picked up by cluster-layout-simple.zeek, zeromq-test-bootstrap +# will pickup the manager's IP for rendering XPUB/XSUB listen and connect endpoints. +# @TEST-EXEC: btest-bg-run manager "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run logger "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=logger zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run proxy "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "BTEST_CLUSTER_IP=::1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff cluster.log.normalized +# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted +# @TEST-EXEC: btest-diff node_up.sorted +# @TEST-EXEC: sort manager/out > manager.out +# @TEST-EXEC: btest-diff manager.out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +redef Log::default_rotation_interval = 0sec; +redef Log::flush_interval = 0.01sec; + +type Info: record { + self: string &log &default=Cluster::node; + node: string &log; +}; + +redef enum Log::ID += { TEST_LOG }; + +global finish: event(name: string) &is_used; + +event zeek_init() { + print "A zeek_init", Cluster::node; + Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]); +} + +event Cluster::node_up(name: string, id: string) &priority=-5 { + print "B node_up", name; + Log::write(TEST_LOG, [$node=name]); +} +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = {"manager"}; +global nodes_down: set[string] = {"manager"}; + +event send_finish() { + print "C send_finish"; + for ( n in nodes_up ) + if ( n != "logger" ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); +} + +event check_cluster_log() { + if ( file_size("DONE") >= 0 ) { + event send_finish(); + return; + } + + system("../check-cluster-log.sh"); + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() { + schedule 0.1sec { check_cluster_log() }; +} + +event Cluster::node_up(name: string, id: string) &priority=-1 { + add nodes_up[name]; + print "B nodes_up", |nodes_up|; +} + +event Cluster::node_down(name: string, id: string) { + print "D node_down", name; + add nodes_down[name]; + + if ( |nodes_down| == |Cluster::nodes| - 1 ) { + print "D send_finish to logger"; + Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node); + } + if ( |nodes_down| == |Cluster::nodes| ) + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +event finish(name: string) { + print fmt("finish from %s", name); + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks logger/cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the testing directory for baselining. +CLUSTER_LOG=../logger/cluster.log + +if [ ! -f $CLUSTER_LOG ]; then + echo "$CLUSTER_LOG not found!" >&2 + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log.tmp + +# 4 times 5 +if [ $(wc -l < cluster.log.tmp) = 20 ]; then + echo "DONE!" >&2 + mv cluster.log.tmp ../cluster.log.normalized + echo "DONE" > DONE +fi + +exit 0 +# @TEST-END-FILE