From 3885871e7dbb47a358ee5da19c6e0aa1a84bc0b2 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 13 Mar 2025 16:14:45 +0100 Subject: [PATCH 1/7] cluster/zeromq: Fix unsubscription visibility When two workers connect to zeek.cluster.worker, the central ZeroMQ proxy would not propagate unsubscription information to other nodes once they both left. Set ZMQ_XPUB_VERBOSER on the proxies XPUB socket for visibility. --- src/cluster/backend/zeromq/ZeroMQ-Proxy.cc | 11 +- .../..manager.out | 14 ++ .../..worker-1.out | 7 + .../..worker-2.out | 7 + .../zeromq/unsubscribe-two-workers.zeek | 139 ++++++++++++++++++ 5 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out create mode 100644 testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index 5cf36fbefc..d52a8df7d5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -41,11 +41,12 @@ bool ProxyThread::Start() { zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub); - // Enable XPUB_VERBOSE unconditional to enforce nodes receiving - // notifications about any new subscriptions, even if they have - // seen them before. This is needed to for the subscribe callback - // functionality to work reliably. - xpub.set(zmq::sockopt::xpub_verbose, 1); + // Enable XPUB_VERBOSER unconditional to enforce nodes receiving + // notifications about any new and removed subscriptions, even if + // they have seen them before. This is needed for the subscribe + // callback and shared subscription removal notification to work + // reliably. + xpub.set(zmq::sockopt::xpub_verboser, 1); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out new file mode 100644 index 0000000000..8d2c4d989f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out @@ -0,0 +1,14 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A node_up, worker-1 +A node_up, worker-2 +B subscription, /test/worker-1 +B subscription, /test/worker-2 +B subscription, /test/worker-common +C subscribing to /test/manager-common +D unsubscription, /test/worker-1 +D unsubscription, /test/worker-2 +D unsubscription, /test/worker-common +E unsubscribing from /test/manager-common +Z node_down, worker-1 +Z node_down, worker-2 diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek new file mode 100644 index 0000000000..4548a2a98a --- /dev/null +++ b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek @@ -0,0 +1,139 @@ +# @TEST-DOC: Regression test for shared unsubscriptions not happening. +# +# Scenario: +# * manager waits for two workers and sends start_test() event +# * workers create subscriptions for /test/worker-1, /test/worker-2 and /test/worker-common +# * manager: Seeing all these subscriptions, subscribe to /test/manager-common +# * workers: Seeing /test/manager-common subscription, unsubscribe /test/worker-common +# * manager: Observes unsubscription for /test/worker-common, unsubscribes from /test/manager-common +# * workers: terminate() when seeing the unsubscription for /test/manager-common +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-wait 30 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-1/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-2/out + + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global start_test: event() &is_used; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global worker_subs_seen = 0; +global worker_common_seen = F; +global subscribe_done = F; +global nodes_up = 0; +global nodes_down = 0; + +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/worker-1" || topic == "/test/worker-2" ) + ++worker_subs_seen; + + if ( topic == "/test/worker-common" ) + worker_common_seen = T; + + if ( ! subscribe_done && worker_common_seen && worker_subs_seen == 2 ) + { + print "C subscribing to /test/manager-common"; + Cluster::subscribe("/test/manager-common"); + subscribe_done = T; + } + } + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) + { + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "D unsubscription", topic; + + if ( topic == "/test/worker-common" ) + { + print "E unsubscribing from /test/manager-common"; + Cluster::unsubscribe("/test/manager-common"); + } + } + +event Cluster::node_up(name: string, id: string) + { + print "A node_up", name; + ++nodes_up; + + if ( nodes_up == 2 ) + Cluster::publish(Cluster::worker_topic, start_test); + } + +event Cluster::node_down(name: string, id: string) + { + print "Z node_down", name; + ++nodes_down; + if ( nodes_down == 2 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event start_test() + { + print "A start_test"; + Cluster::subscribe("/test/worker-common"); + Cluster::subscribe("/test/" + Cluster::node); + } + +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/manager-common" ) + { + print "C unsubscribe from /test/worker-common"; + Cluster::unsubscribe("/test/worker-common"); + } + } + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) + { + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "C unsubscription", topic; + + if ( topic == "/test/manager-common" ) + { + print "D /test/manager-common unsubscribed, terminate()"; + terminate(); + } + } +# @TEST-END-FILE From 888af244b27cc6ac3e90bf0872bb2d73d3f15cd1 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 14 Mar 2025 11:43:12 +0100 Subject: [PATCH 2/7] btest/cluster/websocket: Harden multi-client tests These test were very sensible to the speed at which ZeroMQ distributes subscriptions in the cluster and showed to be unreliably when testing with zeek/btest#113. The main fix here is to have individual WebSocket clients subscribe to unique topics, e.g /test/client-0 and /test/client-1, instead of just a shared topic. This ensures the WebSocket handshake completes only when they observed their own subscriptions and not prematurely when observing the shared topic. This seems mainly relevant for tests: In the real world one shouldn't rely on subscription visibility - you miss messages if you're too late to the party. --- .../cluster.websocket.three/..manager.out | 23 ------- .../..manager.out.sorted | 28 +++++++++ .../..client.out | 60 +++++++++--------- .../..manager.out.sorted | 8 ++- testing/btest/cluster/websocket/three.zeek | 40 +++++++----- .../cluster/websocket/two-pipelining.zeek | 61 +++++++++++++------ 6 files changed, 131 insertions(+), 89 deletions(-) delete mode 100644 testing/btest/Baseline/cluster.websocket.three/..manager.out create mode 100644 testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out b/testing/btest/Baseline/cluster.websocket.three/..manager.out deleted file mode 100644 index 30dca05f31..0000000000 --- a/testing/btest/Baseline/cluster.websocket.three/..manager.out +++ /dev/null @@ -1,23 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::websocket_client_added, 1, [/test/clients] -Cluster::websocket_client_added, 2, [/test/clients] -Cluster::websocket_client_added, 3, [/test/clients] -got ping: ws1, 0 -got ping: ws2, 1 -got ping: ws3, 2 -got ping: ws1, 3 -got ping: ws2, 4 -got ping: ws3, 5 -got ping: ws1, 6 -got ping: ws2, 7 -got ping: ws3, 8 -got ping: ws1, 9 -got ping: ws2, 10 -got ping: ws3, 11 -got ping: ws1, 12 -got ping: ws2, 13 -got ping: ws3, 14 -got ping: ws1, 15 -Cluster::websocket_client_lost, 1 -Cluster::websocket_client_lost, 2 -Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted new file mode 100644 index 0000000000..69d725b682 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted @@ -0,0 +1,28 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A subscription, /test/client-0 +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager +B Cluster::websocket_client_added, [/test/client-0, /test/clients] +B Cluster::websocket_client_added, [/test/client-1, /test/clients] +B Cluster::websocket_client_added, [/test/client-2, /test/clients] +C got ping: ws1, 0 +C got ping: ws1, 12 +C got ping: ws1, 15 +C got ping: ws1, 3 +C got ping: ws1, 6 +C got ping: ws1, 9 +C got ping: ws2, 1 +C got ping: ws2, 10 +C got ping: ws2, 13 +C got ping: ws2, 4 +C got ping: ws2, 7 +C got ping: ws3, 11 +C got ping: ws3, 14 +C got ping: ws3, 2 +C got ping: ws3, 5 +C got ping: ws3, 8 +D Cluster::websocket_client_lost, 1 +D Cluster::websocket_client_lost, 2 +D Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out index 616a268fed..79efa77003 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out @@ -12,43 +12,43 @@ Sending ping 4 - ws1 Sending ping 4 - ws2 Receiving ack - ws1 Receiving ack - ws2 -Receiving pong 0 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 0 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] -Receiving pong 1 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 0 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 1 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] -Receiving pong 2 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 1 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] Receiving pong 2 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] -Receiving pong 3 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 2 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] Receiving pong 3 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] -Receiving pong 4 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 3 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] Receiving pong 4 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] -Receiving pong 5 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +Receiving pong 4 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] Receiving pong 5 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] -Receiving pong 6 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +Receiving pong 5 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] Receiving pong 6 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] -Receiving pong 7 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +Receiving pong 6 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] Receiving pong 7 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] -Receiving pong 8 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +Receiving pong 7 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] Receiving pong 8 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] -Receiving pong 9 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +Receiving pong 8 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] Receiving pong 9 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +Receiving pong 9 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted index ae8f79c02d..8b8657ed17 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted @@ -1,6 +1,10 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -A Cluster::websocket_client_added, 1, [/zeek/event/to_client] -A Cluster::websocket_client_added, 2, [/zeek/event/to_client] +A Cluster::websocket_client_added, [/test/client-1, /test/clients] +A Cluster::websocket_client_added, [/test/client-2, /test/clients] +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager B got ping: ws1, 0 B got ping: ws1, 1 B got ping: ws1, 2 diff --git a/testing/btest/cluster/websocket/three.zeek b/testing/btest/cluster/websocket/three.zeek index 3810c04e3c..ca6f998be9 100644 --- a/testing/btest/cluster/websocket/three.zeek +++ b/testing/btest/cluster/websocket/three.zeek @@ -20,7 +20,8 @@ # @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" # # @TEST-EXEC: btest-bg-wait 30 -# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted # @TEST-EXEC: btest-diff ./manager/.stderr # @TEST-EXEC: btest-diff ./client/out # @TEST-EXEC: btest-diff ./client/.stderr @@ -34,35 +35,44 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/test/manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } +global added = 0; +global lost = 0; event ping(msg: string, n: count) &is_used { ++ping_count; - print fmt("got ping: %s, %s", msg, n); + print fmt("C got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); Cluster::publish("/test/clients", e); } -global added = 0; -global lost = 0; - event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "Cluster::websocket_client_added", added, subscriptions; + print "B Cluster::websocket_client_added", subscriptions; } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) { ++lost; - print "Cluster::websocket_client_lost", lost; + print "D Cluster::websocket_client_lost", lost; if ( lost == 3 ) terminate(); +} + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); } # @TEST-END-FILE @@ -100,8 +110,10 @@ def run(ws_url): clients = [ws1, ws2, ws3] print("Connected!") ids = set() - for c in clients: - c.send(json.dumps([topic])) + for i, c in enumerate(clients): + client_topic = f"/test/client-{i}" + c.send(json.dumps([topic, client_topic])) + for c in clients: ack = json.loads(c.recv()) assert "type" in ack, repr(ack) diff --git a/testing/btest/cluster/websocket/two-pipelining.zeek b/testing/btest/cluster/websocket/two-pipelining.zeek index cfef86522f..92ec628c7b 100644 --- a/testing/btest/cluster/websocket/two-pipelining.zeek +++ b/testing/btest/cluster/websocket/two-pipelining.zeek @@ -35,12 +35,6 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/zeek/event/to_manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } - global added = 0; global lost = 0; @@ -51,10 +45,26 @@ type Item: record { global queue: vector of Item; +function is_ready(): bool + { + return added == 2; + } + +function drain_if_ready() + { + if ( is_ready() && |queue| > 0 ) + { + for ( _, item in queue ) + event ping(item$msg, item$n); + + delete queue; + } + } + event ping(msg: string, n: count) &is_used { # Queue the pings if we haven't seen both clients yet. - if ( added < 2 ) + if ( ! is_ready() ) { queue += Item($msg=msg, $n=n); return; @@ -63,20 +73,15 @@ event ping(msg: string, n: count) &is_used ++ping_count; print fmt("B got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, "my-message", ping_count); - Cluster::publish("/zeek/event/to_client", e); + Cluster::publish("/test/clients", e); } event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "A Cluster::websocket_client_added", added, subscriptions; + print "A Cluster::websocket_client_added", subscriptions; - if ( added == 2 ) - { - # Anything in the queue? - for ( _, item in queue ) - event ping(item$msg, item$n); - } + drain_if_ready(); } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) @@ -86,6 +91,21 @@ event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) if ( lost == 2 ) terminate(); } + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); + } # @TEST-END-FILE @@ -95,12 +115,12 @@ from websockets.sync.client import connect ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' -topic = '/zeek/event/to_client' +topic = '/test/clients' def make_ping(c, who): return { "type": "data-message", - "topic": "/zeek/event/to_manager", + "topic": "/test/manager", "@data-type": "vector", "data": [ {"@data-type": "count", "data": 1}, # Format @@ -121,8 +141,9 @@ def run(ws_url): clients = [ws1, ws2] print("Connected!") # Send subscriptions - for ws in clients: - ws.send(json.dumps([topic])) + for c, ws in enumerate(clients, 1): + client_topic = f"/test/client-{c}" + ws.send(json.dumps([topic, client_topic])) for i in range(5): for c, ws in enumerate(clients, 1): @@ -138,7 +159,7 @@ def run(ws_url): assert "version" in ack for i in range(10): - for c, ws in enumerate(clients): + for c, ws in enumerate(clients, 1): print(f"Receiving pong {i} - ws{c}") pong = json.loads(ws.recv()) assert pong["@data-type"] == "vector" From 26441e0c24040712f59dfb8581e068f847153a9a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 14 Mar 2025 15:37:22 +0100 Subject: [PATCH 3/7] cluster/websocket: Fix null deref at WebSocket server shutdown WebSocket clients that connected with the wrong URL do not have a backend attached. If a dispatcher is terminated while these clients are still connected, a null deref would happen. This was found while running all cluster/websocket tests in a loop for a long time, tickling a segfault during the bad-url test. --- src/cluster/websocket/WebSocket.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 115db9aec8..fa2efb51ae 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -231,7 +231,9 @@ void WebSocketEventDispatcher::Terminate() { wsc->getRemotePort()); QueueReply(WebSocketCloseReply{wsc, 1001, "Terminating"}); - backend->Terminate(); + + if ( backend ) + backend->Terminate(); } clients.clear(); From 2963c49f27cb7d746e06a64fdb40c2d40054c316 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Sun, 16 Mar 2025 14:56:27 +0100 Subject: [PATCH 4/7] cluster/zeromq: Fix node_topic() and nodeid_topic() Due to prefix matching, worker-1's node_topic() also matched worker-10, worker-11, etc. Suffix the node topic with a `.`. The original implementation came from NATS, where subjects are separated by `.`. Adapt nodeid_topic() for consistency. --- .../cluster/backend/zeromq/main.zeek | 8 +- .../..manager.out | 15 +++ .../..worker-1.out | 4 + .../..worker-10.out | 4 + .../..worker-2.out | 4 + .../..worker-20.out | 4 + testing/btest/Baseline/save-load-seeds.zeek | 16 +++ .../cluster/zeromq/node-topic-prefix.zeek | 117 ++++++++++++++++++ 8 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out create mode 100644 testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out create mode 100644 testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out create mode 100644 testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out create mode 100644 testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out create mode 100644 testing/btest/Baseline/save-load-seeds.zeek create mode 100644 testing/btest/cluster/zeromq/node-topic-prefix.zeek diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 1befbd105d..c34608d437 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -226,11 +226,11 @@ redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER; function zeromq_node_topic(name: string): string { - return node_topic_prefix + "." + name; + return node_topic_prefix + "." + name + "."; } function zeromq_nodeid_topic(id: string): string { - return nodeid_topic_prefix + "." + id; + return nodeid_topic_prefix + "." + id + "."; } # Unique identifier for this node with some debug information. @@ -345,7 +345,7 @@ event Cluster::Backend::ZeroMQ::subscription(topic: string) if ( ! starts_with(topic, prefix) ) return; - local nodeid = topic[|prefix|:]; + local nodeid = topic[|prefix|:][:-1]; # Do not say hello to ourselves - we won't see it anyhow. if ( nodeid == Cluster::node_id() ) @@ -417,7 +417,7 @@ event Cluster::Backend::ZeroMQ::unsubscription(topic: string) if ( ! starts_with(topic, prefix) ) return; - local gone_node_id = topic[|prefix|:]; + local gone_node_id = topic[|prefix|:][:-1]; local name = ""; for ( node_name, n in Cluster::nodes ) { if ( n?$id && n$id == gone_node_id ) { diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out new file mode 100644 index 0000000000..80062fd5aa --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A, manager +B node_up - sending ping to 'worker-1' +B node_up - sending ping to 'worker-10' +B node_up - sending ping to 'worker-2' +B node_up - sending ping to 'worker-20' +C pong from 'worker-1' to 'manager' +C pong from 'worker-10' to 'manager' +C pong from 'worker-2' to 'manager' +C pong from 'worker-20' to 'manager' +D node_down from 'worker-1' +D node_down from 'worker-10' +D node_down from 'worker-2' +D node_down from 'worker-20' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out new file mode 100644 index 0000000000..61b4b1d727 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-1 +B ping from 'manager' to 'worker-1' +C finish from 'manager' to 'worker-1' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out new file mode 100644 index 0000000000..411269df99 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-2 +B ping from 'manager' to 'worker-2' +C finish from 'manager' to 'worker-2' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out new file mode 100644 index 0000000000..83f0373f05 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-10 +B ping from 'manager' to 'worker-10' +C finish from 'manager' to 'worker-10' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out new file mode 100644 index 0000000000..340404aa95 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-20 +B ping from 'manager' to 'worker-20' +C finish from 'manager' to 'worker-20' diff --git a/testing/btest/Baseline/save-load-seeds.zeek b/testing/btest/Baseline/save-load-seeds.zeek new file mode 100644 index 0000000000..f51f16bb2e --- /dev/null +++ b/testing/btest/Baseline/save-load-seeds.zeek @@ -0,0 +1,16 @@ +# @TEST-DOC: Save seeds and read and assure the UIDs are the same. Regression test for #4209 +# +# @TEST-EXEC: zeek --save-seeds myseeds -r $TRACES/http/get.trace %INPUT +# @TEST-EXEC: mkdir save && mv *log save +# @TEST-EXEC: zeek-cut -m uid history service < save/conn.log >save/conn.log.cut +# +# @TEST-EXEC: zeek --load-seeds myseeds -r $TRACES/http/get.trace %INPUT +# @TEST-EXEC: mkdir load && mv *log load +# @TEST-EXEC: zeek-cut -m uid history service < load/conn.log >load/conn.log.cut +# +# @TEST-EXEC: btest-diff load/conn.log.cut +# @TEST-EXEC: btest-diff save/conn.log.cut +# @TEST-EXEC: diff load/conn.log.cut save/conn.log.cut + +@load base/protocols/conn +@load base/protocols/http diff --git a/testing/btest/cluster/zeromq/node-topic-prefix.zeek b/testing/btest/cluster/zeromq/node-topic-prefix.zeek new file mode 100644 index 0000000000..c489993f04 --- /dev/null +++ b/testing/btest/cluster/zeromq/node-topic-prefix.zeek @@ -0,0 +1,117 @@ +# @TEST-DOC: Ensure that worker-1 does not observe messages to worker-20 on its Cluster::node_topic() +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-10 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-10 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-20 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-20 zeek -b ../worker.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./worker-1/out +# @TEST-EXEC: btest-diff ./worker-2/out +# @TEST-EXEC: btest-diff ./worker-10/out +# @TEST-EXEC: btest-diff ./worker-20/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global ping: event(from: string, to: string); +global pong: event(from: string, to: string); +global finish: event(from: string, to: string); + +event zeek_init() + { + print "A", Cluster::node; + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_down = 0; +global nodes_up = 0; + +event send_pings() + { + for ( name, n in Cluster::nodes ) + if ( n$node_type == Cluster::WORKER ) + Cluster::publish(Cluster::node_topic(name), ping, Cluster::node, name); + } + +# If a node comes up, send it a ping +event Cluster::node_up(name: string, id: string) + { + print fmt("B node_up - sending ping to '%s'", name); + ++nodes_up; + + if ( nodes_up == 4 ) + event send_pings(); + } + +event ping(from: string, to: string) + { + # manager node should never see ping events. + print "XXX FAIL ping", from, to; + } + +event pong(from: string, to: string) + { + print fmt("C pong from '%s' to '%s'", from ,to); + Cluster::publish(Cluster::node_topic(from), finish, Cluster::node, from); + } + +# If the worker vanishes, finish the test. +event Cluster::node_down(name: string, id: string) + { + print fmt("D node_down from '%s'", name); + ++nodes_down; + if ( nodes_down == 4 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +# Reply to a ping with a pong. +event ping(from: string, to: string) + { + if ( to != Cluster::node ) + print fmt("FAIL: got ping destined to '%s'", to); + + print fmt("B ping from '%s' to '%s'", from, to); + Cluster::publish(Cluster::node_topic(from), pong, Cluster::node, from); + } + +event finish(from: string, to: string) &is_used + { + print fmt("C finish from '%s' to '%s'", from, to); + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::manager_is_logger = T; +redef Log::default_rotation_interval = 0.0sec; + +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-10"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-20"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], +}; +# @TEST-END-FILE From 387237e9c255a5ac32b22c4b5b26cba37802d441 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 20 Mar 2025 11:56:02 +0100 Subject: [PATCH 5/7] cluster/OnLoop: Switch to condition variable The busy polling wasn't clever and usually resulted in delays. For now, switch to mutex/condition variable and log an error if the timeouts are immense. --- src/cluster/OnLoop.h | 79 +++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 88edfa21bc..d0aae42d45 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,8 @@ #include "zeek/iosource/Manager.h" namespace zeek::detail { + + /** * Template class allowing work items to be queued by threads and processed * in Zeek's main thread. @@ -38,12 +41,15 @@ public: * * @param proc The instance processing. * @param tag The tag to use as the IOSource's tag. + * @param max_queue_size How many messages to queue before blocking the producing thread. + * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. + * @param main_thread_id The ID of the main thread for usage checks. */ - OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 10, - std::chrono::microseconds block_duration = std::chrono::microseconds(100), + OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, + std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::thread::id main_thread_id = std::this_thread::get_id()) - : max_queue_size(max_queue_size), - block_duration(block_duration), + : cond_timeout(cond_timeout), + max_queue_size(max_queue_size), proc(proc), tag(std::move(tag)), main_thread_id(main_thread_id) {} @@ -76,6 +82,9 @@ public: std::scoped_lock lock(mtx); SetClosed(true); + // Wake a process stuck in queueing. + cond.notify_one(); + // Don't attempt to Process anymore. proc = nullptr; } @@ -92,12 +101,21 @@ public: */ void Process() override { std::list to_process; + bool notify = false; { std::scoped_lock lock(mtx); + if ( queue.size() >= max_queue_size ) + notify = true; + to_process.splice(to_process.end(), queue); flare.Extinguish(); } + // The queue was full before and is now empty, + // wake up any pending thread. + if ( notify ) + cond.notify_one(); + // We've been closed, so proc will most likely // be invalid at this point and we'll discard // whatever was left to do. @@ -139,43 +157,58 @@ public: bool fire = false; size_t qs = 0; - while ( ! to_queue.empty() ) { - { - std::scoped_lock lock(mtx); + int timeouts = 0; - if ( ! IsOpen() ) { - // IO Source is being removed. - fire = false; - break; - } + { + std::unique_lock lock(mtx); - qs = queue.size(); - if ( qs < max_queue_size ) { - queue.splice(queue.end(), to_queue); - fire = fire || qs == 0; - assert(to_queue.empty()); - assert(! queue.empty()); - } + // Wait for room in the queue. + while ( IsOpen() && queue.size() >= max_queue_size ) { + auto status = cond.wait_for(lock, cond_timeout); + if ( status == std::cv_status::timeout && IsOpen() ) + ++timeouts; } - if ( ! to_queue.empty() ) { - std::this_thread::sleep_for(block_duration); - fire = true; + if ( IsOpen() ) { + assert(queue.size() < max_queue_size); + assert(to_queue.size() == 1); + queue.splice(queue.end(), to_queue); + fire = queue.size() == 1; // first element in queue triggers processing. + } + else { + // IO Source is being or was removed. + fire = false; } } + if ( fire ) flare.Fire(); + if ( timeouts > 0 ) { + // XXX: Should this invoke some callback or change the return value + // so users can react on this? + // + // We could also do suicidal snail pattern here. If the event + // loop is unable to process, we may as well knock ourselves out. + std::fprintf(stderr, "timeouts %d!\n", timeouts); + } + --queuers; } private: + // Flare to notify Zeek's IO loop. zeek::detail::Flare flare; + + // Mutex, condition and timeout protecting access to queue. std::mutex mtx; + std::condition_variable cond; + std::chrono::microseconds cond_timeout; + std::list queue; size_t max_queue_size; - std::chrono::microseconds block_duration; + Proc* proc; std::string tag; std::atomic queuers = 0; From 50b26fcea853752489e4cde3a02515cebd92ead8 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Sun, 23 Mar 2025 15:35:08 +0100 Subject: [PATCH 6/7] btest/cluster/websocket: ZeroMQ backend test This test ensures that WebSocket clients connected to the same node see each other's messages. --- .../cluster.websocket.zeromq/..client..stderr | 1 + .../cluster.websocket.zeromq/..client.out | 6 + .../..manager..stderr | 2 + .../cluster.websocket.zeromq/..manager.out | 9 + .../..worker-1..stderr | 2 + .../cluster.websocket.zeromq/..worker-1.out | 4 + testing/btest/cluster/websocket/zeromq.zeek | 178 ++++++++++++++++++ 7 files changed, 202 insertions(+) create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..client.out create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..manager.out create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out create mode 100644 testing/btest/cluster/websocket/zeromq.zeek diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client.out b/testing/btest/Baseline/cluster.websocket.zeromq/..client.out new file mode 100644 index 0000000000..d2723da11c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..client.out @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +unique ids 3 +ws1 timeout +ws2 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] +ws3 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out b/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out new file mode 100644 index 0000000000..6cd1db5faa --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out @@ -0,0 +1,9 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::node_up, worker-1 +Cluster::websocket_client_added, 1, [/test/pings/, /test/ws/1] +Cluster::websocket_client_added, 2, [/test/pings/, /test/ws/2] +Cluster::websocket_client_added, 3, [/test/pings/, /test/ws/3] +got ping: python-websocket-client, 42 +Cluster::websocket_client_lost, 1 +Cluster::websocket_client_lost, 2 +Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out new file mode 100644 index 0000000000..7dcd0fc9ff --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::node_up, manager +got ping: python-websocket-client, 42 +Cluster::node_up, manager diff --git a/testing/btest/cluster/websocket/zeromq.zeek b/testing/btest/cluster/websocket/zeromq.zeek new file mode 100644 index 0000000000..f49942703f --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq.zeek @@ -0,0 +1,178 @@ +# @TEST-DOC: Test WebSockets clients when the ZeroMQ cluster backend is enabled. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek worker.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./worker-1/out +# @TEST-EXEC: btest-diff ./worker-1/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +@TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +@TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; + +redef Broker::disable_ssl = T; + +global ping_count = 0; + +event zeek_init() + { + Cluster::subscribe("/test/pings/"); + } + +event Cluster::node_up(name: string, id: string) + { + print "Cluster::node_up", name; + + # Delay listening on WebSocket clients until worker-1 is around. + if ( name == "worker-1" ) + Cluster::listen_websocket([ + $listen_host="127.0.0.1", + $listen_port=to_port(getenv("WEBSOCKET_PORT")) + ]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); + Cluster::publish("/test/clients", e); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "Cluster::websocket_client_added", added, subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "Cluster::websocket_client_lost", lost; + if ( lost == 3 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event zeek_init() + { + Cluster::subscribe("/test/pings/"); + } + +event Cluster::node_up(name: string, id: string) + { + print "Cluster::node_up", name; + } + +event Cluster::node_down(name: string, id: string) + { + print "Cluster::node_up", name; + terminate(); + } + +event ping(msg: string, n: count) + { + print fmt("got ping: %s, %s", msg, n); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' + +def make_ping(topic, c): + return { + "type": "data-message", + "topic": topic + str(c), + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": f"python-websocket-client"}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws1: + with connect(ws_url) as ws2: + with connect(ws_url) as ws3: + clients = [ws1, ws2, ws3] + print("Connected!") + ids = set() + for i, c in enumerate(clients, 1): + c.send(json.dumps([f"/test/ws/{i}", "/test/pings/"])) + ack = json.loads(c.recv()) + assert "type" in ack, repr(ack) + assert ack["type"] == "ack" + assert "endpoint" in ack, repr(ack) + assert "version" in ack + ids.add(ack["endpoint"]) + + print("unique ids", len(ids)) + ws1.send(json.dumps(make_ping("/test/pings/", 42))) + + # Client 2 and client 3 receive the ping from client 1, client 1 gets a timeout. + for name, ws in [("ws1", ws1), ("ws2", ws2), ("ws3", ws3)]: + try: + data = json.loads(ws.recv(timeout=0.1)) + ev = data["data"][2]["data"] + print(name, "ev: topic", data["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + except TimeoutError: + print(name, "timeout") + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE From 52143a5712f2e745907dcb1808e916125fa49546 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 24 Mar 2025 18:35:44 +0100 Subject: [PATCH 7/7] cluster/OnLoop: Add metric for queue stalling instead of fprintf --- src/cluster/OnLoop.h | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index d0aae42d45..0c488d24da 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -13,6 +13,7 @@ #include "zeek/Reporter.h" #include "zeek/iosource/IOSource.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" namespace zeek::detail { @@ -52,7 +53,13 @@ public: max_queue_size(max_queue_size), proc(proc), tag(std::move(tag)), - main_thread_id(main_thread_id) {} + main_thread_id(main_thread_id), + total_queue_stalls_metric( + zeek::telemetry_mgr + ->CounterFamily( + "zeek", "cluster_onloop_queue_stalls", {"tag"}, + "Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") + ->GetOrAdd({{"tag", this->tag}})) {} /** * Register this instance with the IO loop. @@ -139,9 +146,9 @@ public: /** * Queue the given Work item to be processed on Zeek's main thread. * - * If there's too many items in the queue, this method sleeps using - * std::this_thread::sleep() for the *block_duration* passed to the - * constructor. + * If there's too many items in the queue, this method blocks until + * there's more room available. The zeek_cluster_onloop_queue_stalls_total + * metric will be increased once for every cond_timeout being blocked. * * Calling this method from the main thread will result in an abort(). */ @@ -157,16 +164,13 @@ public: bool fire = false; size_t qs = 0; - int timeouts = 0; - { std::unique_lock lock(mtx); // Wait for room in the queue. while ( IsOpen() && queue.size() >= max_queue_size ) { - auto status = cond.wait_for(lock, cond_timeout); - if ( status == std::cv_status::timeout && IsOpen() ) - ++timeouts; + total_queue_stalls_metric->Inc(); + cond.wait_for(lock, cond_timeout); } if ( IsOpen() ) { @@ -185,15 +189,6 @@ public: if ( fire ) flare.Fire(); - if ( timeouts > 0 ) { - // XXX: Should this invoke some callback or change the return value - // so users can react on this? - // - // We could also do suicidal snail pattern here. If the event - // loop is unable to process, we may as well knock ourselves out. - std::fprintf(stderr, "timeouts %d!\n", timeouts); - } - --queuers; } @@ -213,6 +208,9 @@ private: std::string tag; std::atomic queuers = 0; std::thread::id main_thread_id; + + // Track queue stalling. + telemetry::CounterPtr total_queue_stalls_metric; };