From 5d9282fd965b8d7edc5b776712cd474321f1cf81 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 13 Mar 2025 16:14:45 +0100 Subject: [PATCH] cluster/zeromq: Fix unsubscription visibility When two workers connect to zeek.cluster.worker, the central ZeroMQ proxy would not propagate unsubscription information to other nodes once they both left. Set ZMQ_XPUB_VERBOSER on the proxies XPUB socket for visibility. --- src/cluster/backend/zeromq/ZeroMQ-Proxy.cc | 11 +- .../..manager.out | 14 ++ .../..worker-1.out | 7 + .../..worker-2.out | 7 + .../zeromq/unsubscribe-two-workers.zeek | 131 ++++++++++++++++++ 5 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out create mode 100644 testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index 5cf36fbefc..d52a8df7d5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -41,11 +41,12 @@ bool ProxyThread::Start() { zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub); - // Enable XPUB_VERBOSE unconditional to enforce nodes receiving - // notifications about any new subscriptions, even if they have - // seen them before. This is needed to for the subscribe callback - // functionality to work reliably. - xpub.set(zmq::sockopt::xpub_verbose, 1); + // Enable XPUB_VERBOSER unconditional to enforce nodes receiving + // notifications about any new and removed subscriptions, even if + // they have seen them before. This is needed for the subscribe + // callback and shared subscription removal notification to work + // reliably. + xpub.set(zmq::sockopt::xpub_verboser, 1); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out new file mode 100644 index 0000000000..8d2c4d989f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out @@ -0,0 +1,14 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A node_up, worker-1 +A node_up, worker-2 +B subscription, /test/worker-1 +B subscription, /test/worker-2 +B subscription, /test/worker-common +C subscribing to /test/manager-common +D unsubscription, /test/worker-1 +D unsubscription, /test/worker-2 +D unsubscription, /test/worker-common +E unsubscribing from /test/manager-common +Z node_down, worker-1 +Z node_down, worker-2 diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek new file mode 100644 index 0000000000..e1fb5de4dc --- /dev/null +++ b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek @@ -0,0 +1,131 @@ +# @TEST-DOC: Regression test for shared unsubscriptions not happening. +# +# Scenario: +# * manager waits for two workers and sends start_test() event +# * workers create subscriptions for /test/worker-1, /test/worker-2 and /test/worker-common +# * manager: Seeing all these subscriptions, subscribe to /test/manager-common +# * workers: Seeing /test/manager-common subscription, unsubscribe /test/worker-common +# * manager: Observes unsubscription for /test/worker-common, unsubscribes from /test/manager-common +# * workers: terminate() when seeing the unsubscription for /test/manager-common +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-wait 30 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-1/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-2/out + + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global start_test: event() &is_used; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global worker_subs_seen = 0; +global worker_common_seen = F; +global subscribe_done = F; +global nodes_up = 0; +global nodes_down = 0; + +event Cluster::Backend::ZeroMQ::subscription(topic: string) { + + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/worker-1" || topic == "/test/worker-2" ) + ++worker_subs_seen; + + if ( topic == "/test/worker-common" ) + worker_common_seen = T; + + if ( ! subscribe_done && worker_common_seen && worker_subs_seen == 2 ) { + print "C subscribing to /test/manager-common"; + Cluster::subscribe("/test/manager-common"); + subscribe_done = T; + } +} + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) { + + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "D unsubscription", topic; + + if ( topic == "/test/worker-common" ) { + print "E unsubscribing from /test/manager-common"; + Cluster::unsubscribe("/test/manager-common"); + } +} + +event Cluster::node_up(name: string, id: string) { + print "A node_up", name; + ++nodes_up; + if ( nodes_up == 2 ) + Cluster::publish(Cluster::worker_topic, start_test); +} + +event Cluster::node_down(name: string, id: string) { + print "Z node_down", name; + ++nodes_down; + if ( nodes_down == 2 ) + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event start_test() { + print "A start_test"; + Cluster::subscribe("/test/worker-common"); + Cluster::subscribe("/test/" + Cluster::node); +} + +event Cluster::Backend::ZeroMQ::subscription(topic: string) { + + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/manager-common" ) { + print "C unsubscribe from /test/worker-common"; + Cluster::unsubscribe("/test/worker-common"); + } +} + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) { + + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "C unsubscription", topic; + + if ( topic == "/test/manager-common" ) { + print "D /test/manager-common unsubscribed, terminate()"; + terminate(); + } +} +# @TEST-END-FILE