From a318463c1c2c582e0f6f32c713a511a19892749b Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 10:38:07 +0200 Subject: [PATCH] cluster/zeromq: Improve EINTR handling When using ZeroMQ also within the Supervisor process, zmq::poll() and recv() were observed to return EINTR, handle these. --- src/cluster/backend/zeromq/ZeroMQ-Proxy.cc | 24 ++++-- src/cluster/backend/zeromq/ZeroMQ.cc | 27 +++++- .../supervisor.cluster.log | 21 +++++ .../zeromq/supervisor-runs-zmq-proxy.zeek | 86 +++++++++++++++++++ 4 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 testing/btest/Baseline/cluster.zeromq.supervisor-runs-zmq-proxy/supervisor.cluster.log create mode 100644 testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index 755618df11..92aa946cea 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -20,15 +20,23 @@ namespace { void thread_fun(ProxyThread::Args* args) { zeek::util::detail::set_thread_name("zmq-proxy-thread"); - try { - zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/); - } catch ( zmq::error_t& err ) { - args->xsub.close(); - args->xpub.close(); + bool done = false; - if ( err.num() != ETERM ) { - std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num()); - throw; + while ( ! done ) { + try { + zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/); + } catch ( zmq::error_t& err ) { + if ( err.num() == EINTR ) + continue; + + done = true; + args->xsub.close(); + args->xpub.close(); + + if ( err.num() != ETERM ) { + std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num()); + throw; + } } } } diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 0a4e9720b0..4085179b8b 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -650,8 +650,17 @@ void ZeroMQBackend::Run() { // Awkward. std::vector> rcv_messages(sockets.size()); try { - int r = zmq::poll(poll_items, std::chrono::seconds(-1)); - ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r); + try { + int r = zmq::poll(poll_items, std::chrono::seconds(-1)); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r); + } catch ( const zmq::error_t& err ) { + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll exception: what=%s num=%d", err.what(), err.num()); + // Retry interrupted zmq::poll() calls. + if ( err.num() == EINTR ) + continue; + + throw; + } for ( size_t i = 0; i < poll_items.size(); i++ ) { const auto& item = poll_items[i]; @@ -691,7 +700,17 @@ void ZeroMQBackend::Run() { // Read a multi-part message. do { - auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait); + zmq::recv_result_t recv_result; + try { + recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait); + } catch ( const zmq::error_t& err ) { + // Retry interrupted recv() calls. + if ( err.num() == EINTR ) + continue; + + throw; + } + if ( recv_result ) { consumed_one = true; more = msg.more(); @@ -708,7 +727,7 @@ void ZeroMQBackend::Run() { assert(rcv_messages[i].back().size() == 0); rcv_messages[i].pop_back(); } - } catch ( zmq::error_t& err ) { + } catch ( const zmq::error_t& err ) { if ( err.num() != ETERM ) throw; diff --git a/testing/btest/Baseline/cluster.zeromq.supervisor-runs-zmq-proxy/supervisor.cluster.log b/testing/btest/Baseline/cluster.zeromq.supervisor-runs-zmq-proxy/supervisor.cluster.log new file mode 100644 index 0000000000..fdebaf2784 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.supervisor-runs-zmq-proxy/supervisor.cluster.log @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger got hello from manager (zeromq_manager___NrFj3eGxkRR5) +logger got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +logger got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +logger got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +manager got hello from logger (zeromq_logger___NrFj3eGxkRR5) +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from logger (zeromq_logger___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek b/testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek new file mode 100644 index 0000000000..74a239ae91 --- /dev/null +++ b/testing/btest/cluster/zeromq/supervisor-runs-zmq-proxy.zeek @@ -0,0 +1,86 @@ +# @TEST-DOC: Test a Zeek cluster where the ZeroMQ proxy thread is spawned by the supervisor instead of the manager. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT + +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run supervisor "ZEEKPATH=$ZEEKPATH:.. && zeek -j ../supervisor-runs-zmq-proxy.zeek >out" +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff supervisor/cluster.log + +redef Log::default_rotation_interval = 0secs; +redef Log::flush_interval = 0.01sec; + +@load ./zeromq-test-bootstrap + +redef Cluster::Backend::ZeroMQ::run_proxy_thread = F; + +# The supervisor peeks into logger/cluster.log to initate a shutdown when +# all nodes have said hello to each other. See the check-cluster.log.sh +# script below. +event check_cluster_log() { + system_env("../check-cluster-log.sh", table(["SUPERVISOR_PID"] = cat(getpid()))); + + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + if ( ! Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread() ) + Reporter::fatal("Failed to spawn proxy thread"); + + local cluster: table[string] of Supervisor::ClusterEndpoint; + cluster["manager"] = [$role=Supervisor::MANAGER, $host=127.0.0.1, $p=0/unknown]; + cluster["logger"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))]; + cluster["proxy"] = [$role=Supervisor::PROXY, $host=127.0.0.1, $p=0/unknown]; + cluster["worker-1"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown]; + cluster["worker-2"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown]; + + for ( n, ep in cluster ) + { + local sn = Supervisor::NodeConfig($name=n, $bare_mode=T, $cluster=cluster, $directory=n); + local res = Supervisor::create(sn); + + if ( res != "" ) + print fmt("supervisor failed to create node '%s': %s", n, res); + } + + # Start polling the cluster.log + event check_cluster_log(); + } + +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks logger/cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the current directory. This runs from the supervisor. +if [ ! -f logger/cluster.log ]; then + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < logger/cluster.log | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log + +if [ $(wc -l < cluster.log) = 20 ]; then + echo "DONE!" >&2 + # Trigger shutdown through supervisor. + kill ${ZEEK_ARG_SUPERVISOR_PID}; + echo "DONE" > DONE +fi +# @TEST-END-FILE