diff --git a/CHANGES b/CHANGES index e5077fe1e9..c4a6264c40 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,32 @@ +7.2.0-dev.427 | 2025-03-24 19:45:24 +0100 + + * cluster/OnLoop: Add metric for queue stalling instead of fprintf (Arne Welzel, Corelight) + + * btest/cluster/websocket: ZeroMQ backend test (Arne Welzel, Corelight) + + * cluster/OnLoop: Switch to condition variable (Arne Welzel, Corelight) + + The busy polling wasn't clever and usually resulted in delays. For now, + switch to mutex/condition variable and log an error if the timeouts are + immense. + + * cluster/zeromq: Fix node_topic() and nodeid_topic() (Arne Welzel, Corelight) + + Due to prefix matching, worker-1's node_topic() also matched worker-10, + worker-11, etc. Suffix the node topic with a `.`. The original implementation + came from NATS, where subjects are separated by `.`. + + * cluster/websocket: Fix null deref at WebSocket server shutdown (Arne Welzel, Corelight) + + * btest/cluster/websocket: Harden multi-client tests (Arne Welzel, Corelight) + + * cluster/zeromq: Fix unsubscription visibility (Arne Welzel, Corelight) + + When two workers connect to zeek.cluster.worker, the central ZeroMQ + proxy would not propagate unsubscription information to other nodes + once they both left. Set ZMQ_XPUB_VERBOSER on the proxies XPUB socket + for visibility. + 7.2.0-dev.418 | 2025-03-21 11:56:40 -0700 * Redis: Handle other errors from requests, fix KEY_EXISTS for put operations (Tim Wojtulewicz, Corelight) diff --git a/VERSION b/VERSION index 305268305b..36cac5402c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.418 +7.2.0-dev.427 diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 1befbd105d..c34608d437 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -226,11 +226,11 @@ redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER; function zeromq_node_topic(name: string): string { - return node_topic_prefix + "." + name; + return node_topic_prefix + "." + name + "."; } function zeromq_nodeid_topic(id: string): string { - return nodeid_topic_prefix + "." + id; + return nodeid_topic_prefix + "." + id + "."; } # Unique identifier for this node with some debug information. @@ -345,7 +345,7 @@ event Cluster::Backend::ZeroMQ::subscription(topic: string) if ( ! starts_with(topic, prefix) ) return; - local nodeid = topic[|prefix|:]; + local nodeid = topic[|prefix|:][:-1]; # Do not say hello to ourselves - we won't see it anyhow. if ( nodeid == Cluster::node_id() ) @@ -417,7 +417,7 @@ event Cluster::Backend::ZeroMQ::unsubscription(topic: string) if ( ! starts_with(topic, prefix) ) return; - local gone_node_id = topic[|prefix|:]; + local gone_node_id = topic[|prefix|:][:-1]; local name = ""; for ( node_name, n in Cluster::nodes ) { if ( n?$id && n$id == gone_node_id ) { diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 88edfa21bc..0c488d24da 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -12,8 +13,11 @@ #include "zeek/Reporter.h" #include "zeek/iosource/IOSource.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" namespace zeek::detail { + + /** * Template class allowing work items to be queued by threads and processed * in Zeek's main thread. @@ -38,15 +42,24 @@ public: * * @param proc The instance processing. * @param tag The tag to use as the IOSource's tag. + * @param max_queue_size How many messages to queue before blocking the producing thread. + * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. + * @param main_thread_id The ID of the main thread for usage checks. */ - OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 10, - std::chrono::microseconds block_duration = std::chrono::microseconds(100), + OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, + std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::thread::id main_thread_id = std::this_thread::get_id()) - : max_queue_size(max_queue_size), - block_duration(block_duration), + : cond_timeout(cond_timeout), + max_queue_size(max_queue_size), proc(proc), tag(std::move(tag)), - main_thread_id(main_thread_id) {} + main_thread_id(main_thread_id), + total_queue_stalls_metric( + zeek::telemetry_mgr + ->CounterFamily( + "zeek", "cluster_onloop_queue_stalls", {"tag"}, + "Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.") + ->GetOrAdd({{"tag", this->tag}})) {} /** * Register this instance with the IO loop. @@ -76,6 +89,9 @@ public: std::scoped_lock lock(mtx); SetClosed(true); + // Wake a process stuck in queueing. + cond.notify_one(); + // Don't attempt to Process anymore. proc = nullptr; } @@ -92,12 +108,21 @@ public: */ void Process() override { std::list to_process; + bool notify = false; { std::scoped_lock lock(mtx); + if ( queue.size() >= max_queue_size ) + notify = true; + to_process.splice(to_process.end(), queue); flare.Extinguish(); } + // The queue was full before and is now empty, + // wake up any pending thread. + if ( notify ) + cond.notify_one(); + // We've been closed, so proc will most likely // be invalid at this point and we'll discard // whatever was left to do. @@ -121,9 +146,9 @@ public: /** * Queue the given Work item to be processed on Zeek's main thread. * - * If there's too many items in the queue, this method sleeps using - * std::this_thread::sleep() for the *block_duration* passed to the - * constructor. + * If there's too many items in the queue, this method blocks until + * there's more room available. The zeek_cluster_onloop_queue_stalls_total + * metric will be increased once for every cond_timeout being blocked. * * Calling this method from the main thread will result in an abort(). */ @@ -139,31 +164,28 @@ public: bool fire = false; size_t qs = 0; - while ( ! to_queue.empty() ) { - { - std::scoped_lock lock(mtx); + { + std::unique_lock lock(mtx); - if ( ! IsOpen() ) { - // IO Source is being removed. - fire = false; - break; - } - - qs = queue.size(); - if ( qs < max_queue_size ) { - queue.splice(queue.end(), to_queue); - fire = fire || qs == 0; - assert(to_queue.empty()); - assert(! queue.empty()); - } + // Wait for room in the queue. + while ( IsOpen() && queue.size() >= max_queue_size ) { + total_queue_stalls_metric->Inc(); + cond.wait_for(lock, cond_timeout); } - if ( ! to_queue.empty() ) { - std::this_thread::sleep_for(block_duration); - fire = true; + if ( IsOpen() ) { + assert(queue.size() < max_queue_size); + assert(to_queue.size() == 1); + queue.splice(queue.end(), to_queue); + fire = queue.size() == 1; // first element in queue triggers processing. + } + else { + // IO Source is being or was removed. + fire = false; } } + if ( fire ) flare.Fire(); @@ -171,15 +193,24 @@ public: } private: + // Flare to notify Zeek's IO loop. zeek::detail::Flare flare; + + // Mutex, condition and timeout protecting access to queue. std::mutex mtx; + std::condition_variable cond; + std::chrono::microseconds cond_timeout; + std::list queue; size_t max_queue_size; - std::chrono::microseconds block_duration; + Proc* proc; std::string tag; std::atomic queuers = 0; std::thread::id main_thread_id; + + // Track queue stalling. + telemetry::CounterPtr total_queue_stalls_metric; }; diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc index 5cf36fbefc..d52a8df7d5 100644 --- a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -41,11 +41,12 @@ bool ProxyThread::Start() { zmq::socket_t xpub(ctx, zmq::socket_type::xpub); zmq::socket_t xsub(ctx, zmq::socket_type::xsub); - // Enable XPUB_VERBOSE unconditional to enforce nodes receiving - // notifications about any new subscriptions, even if they have - // seen them before. This is needed to for the subscribe callback - // functionality to work reliably. - xpub.set(zmq::sockopt::xpub_verbose, 1); + // Enable XPUB_VERBOSER unconditional to enforce nodes receiving + // notifications about any new and removed subscriptions, even if + // they have seen them before. This is needed for the subscribe + // callback and shared subscription removal notification to work + // reliably. + xpub.set(zmq::sockopt::xpub_verboser, 1); xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 115db9aec8..fa2efb51ae 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -231,7 +231,9 @@ void WebSocketEventDispatcher::Terminate() { wsc->getRemotePort()); QueueReply(WebSocketCloseReply{wsc, 1001, "Terminating"}); - backend->Terminate(); + + if ( backend ) + backend->Terminate(); } clients.clear(); diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out b/testing/btest/Baseline/cluster.websocket.three/..manager.out deleted file mode 100644 index 30dca05f31..0000000000 --- a/testing/btest/Baseline/cluster.websocket.three/..manager.out +++ /dev/null @@ -1,23 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::websocket_client_added, 1, [/test/clients] -Cluster::websocket_client_added, 2, [/test/clients] -Cluster::websocket_client_added, 3, [/test/clients] -got ping: ws1, 0 -got ping: ws2, 1 -got ping: ws3, 2 -got ping: ws1, 3 -got ping: ws2, 4 -got ping: ws3, 5 -got ping: ws1, 6 -got ping: ws2, 7 -got ping: ws3, 8 -got ping: ws1, 9 -got ping: ws2, 10 -got ping: ws3, 11 -got ping: ws1, 12 -got ping: ws2, 13 -got ping: ws3, 14 -got ping: ws1, 15 -Cluster::websocket_client_lost, 1 -Cluster::websocket_client_lost, 2 -Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted new file mode 100644 index 0000000000..69d725b682 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted @@ -0,0 +1,28 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A subscription, /test/client-0 +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager +B Cluster::websocket_client_added, [/test/client-0, /test/clients] +B Cluster::websocket_client_added, [/test/client-1, /test/clients] +B Cluster::websocket_client_added, [/test/client-2, /test/clients] +C got ping: ws1, 0 +C got ping: ws1, 12 +C got ping: ws1, 15 +C got ping: ws1, 3 +C got ping: ws1, 6 +C got ping: ws1, 9 +C got ping: ws2, 1 +C got ping: ws2, 10 +C got ping: ws2, 13 +C got ping: ws2, 4 +C got ping: ws2, 7 +C got ping: ws3, 11 +C got ping: ws3, 14 +C got ping: ws3, 2 +C got ping: ws3, 5 +C got ping: ws3, 8 +D Cluster::websocket_client_lost, 1 +D Cluster::websocket_client_lost, 2 +D Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out index 616a268fed..79efa77003 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out @@ -12,43 +12,43 @@ Sending ping 4 - ws1 Sending ping 4 - ws2 Receiving ack - ws1 Receiving ack - ws2 -Receiving pong 0 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 0 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] -Receiving pong 1 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 0 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 1 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] -Receiving pong 2 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 1 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] Receiving pong 2 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] -Receiving pong 3 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 2 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] Receiving pong 3 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] -Receiving pong 4 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 3 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] Receiving pong 4 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] -Receiving pong 5 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +Receiving pong 4 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] Receiving pong 5 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] -Receiving pong 6 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +Receiving pong 5 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] Receiving pong 6 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] -Receiving pong 7 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +Receiving pong 6 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] Receiving pong 7 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] -Receiving pong 8 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +Receiving pong 7 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] Receiving pong 8 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] -Receiving pong 9 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +Receiving pong 8 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] Receiving pong 9 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +Receiving pong 9 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted index ae8f79c02d..8b8657ed17 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted @@ -1,6 +1,10 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -A Cluster::websocket_client_added, 1, [/zeek/event/to_client] -A Cluster::websocket_client_added, 2, [/zeek/event/to_client] +A Cluster::websocket_client_added, [/test/client-1, /test/clients] +A Cluster::websocket_client_added, [/test/client-2, /test/clients] +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager B got ping: ws1, 0 B got ping: ws1, 1 B got ping: ws1, 2 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client.out b/testing/btest/Baseline/cluster.websocket.zeromq/..client.out new file mode 100644 index 0000000000..d2723da11c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..client.out @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +unique ids 3 +ws1 timeout +ws2 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] +ws3 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out b/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out new file mode 100644 index 0000000000..6cd1db5faa --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out @@ -0,0 +1,9 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::node_up, worker-1 +Cluster::websocket_client_added, 1, [/test/pings/, /test/ws/1] +Cluster::websocket_client_added, 2, [/test/pings/, /test/ws/2] +Cluster::websocket_client_added, 3, [/test/pings/, /test/ws/3] +got ping: python-websocket-client, 42 +Cluster::websocket_client_lost, 1 +Cluster::websocket_client_lost, 2 +Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out new file mode 100644 index 0000000000..7dcd0fc9ff --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::node_up, manager +got ping: python-websocket-client, 42 +Cluster::node_up, manager diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out new file mode 100644 index 0000000000..80062fd5aa --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..manager.out @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A, manager +B node_up - sending ping to 'worker-1' +B node_up - sending ping to 'worker-10' +B node_up - sending ping to 'worker-2' +B node_up - sending ping to 'worker-20' +C pong from 'worker-1' to 'manager' +C pong from 'worker-10' to 'manager' +C pong from 'worker-2' to 'manager' +C pong from 'worker-20' to 'manager' +D node_down from 'worker-1' +D node_down from 'worker-10' +D node_down from 'worker-2' +D node_down from 'worker-20' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out new file mode 100644 index 0000000000..61b4b1d727 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-1.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-1 +B ping from 'manager' to 'worker-1' +C finish from 'manager' to 'worker-1' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out new file mode 100644 index 0000000000..411269df99 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-10.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-2 +B ping from 'manager' to 'worker-2' +C finish from 'manager' to 'worker-2' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out new file mode 100644 index 0000000000..83f0373f05 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-2.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-10 +B ping from 'manager' to 'worker-10' +C finish from 'manager' to 'worker-10' diff --git a/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out new file mode 100644 index 0000000000..340404aa95 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.node-topic-prefix/..worker-20.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A, worker-20 +B ping from 'manager' to 'worker-20' +C finish from 'manager' to 'worker-20' diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out new file mode 100644 index 0000000000..8d2c4d989f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..manager.out @@ -0,0 +1,14 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A node_up, worker-1 +A node_up, worker-2 +B subscription, /test/worker-1 +B subscription, /test/worker-2 +B subscription, /test/worker-common +C subscribing to /test/manager-common +D unsubscription, /test/worker-1 +D unsubscription, /test/worker-2 +D unsubscription, /test/worker-common +E unsubscribing from /test/manager-common +Z node_down, worker-1 +Z node_down, worker-2 diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-1.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out new file mode 100644 index 0000000000..5dde5fa1e0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.unsubscribe-two-workers/..worker-2.out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +### NOTE: This file has been sorted with diff-sort. +A start_test +B subscription, /test/manager-common +C unsubscribe from /test/worker-common +C unsubscription, /test/manager-common +D /test/manager-common unsubscribed, terminate() diff --git a/testing/btest/Baseline/save-load-seeds.zeek b/testing/btest/Baseline/save-load-seeds.zeek new file mode 100644 index 0000000000..f51f16bb2e --- /dev/null +++ b/testing/btest/Baseline/save-load-seeds.zeek @@ -0,0 +1,16 @@ +# @TEST-DOC: Save seeds and read and assure the UIDs are the same. Regression test for #4209 +# +# @TEST-EXEC: zeek --save-seeds myseeds -r $TRACES/http/get.trace %INPUT +# @TEST-EXEC: mkdir save && mv *log save +# @TEST-EXEC: zeek-cut -m uid history service < save/conn.log >save/conn.log.cut +# +# @TEST-EXEC: zeek --load-seeds myseeds -r $TRACES/http/get.trace %INPUT +# @TEST-EXEC: mkdir load && mv *log load +# @TEST-EXEC: zeek-cut -m uid history service < load/conn.log >load/conn.log.cut +# +# @TEST-EXEC: btest-diff load/conn.log.cut +# @TEST-EXEC: btest-diff save/conn.log.cut +# @TEST-EXEC: diff load/conn.log.cut save/conn.log.cut + +@load base/protocols/conn +@load base/protocols/http diff --git a/testing/btest/cluster/websocket/three.zeek b/testing/btest/cluster/websocket/three.zeek index 3810c04e3c..ca6f998be9 100644 --- a/testing/btest/cluster/websocket/three.zeek +++ b/testing/btest/cluster/websocket/three.zeek @@ -20,7 +20,8 @@ # @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" # # @TEST-EXEC: btest-bg-wait 30 -# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted # @TEST-EXEC: btest-diff ./manager/.stderr # @TEST-EXEC: btest-diff ./client/out # @TEST-EXEC: btest-diff ./client/.stderr @@ -34,35 +35,44 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/test/manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } +global added = 0; +global lost = 0; event ping(msg: string, n: count) &is_used { ++ping_count; - print fmt("got ping: %s, %s", msg, n); + print fmt("C got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); Cluster::publish("/test/clients", e); } -global added = 0; -global lost = 0; - event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "Cluster::websocket_client_added", added, subscriptions; + print "B Cluster::websocket_client_added", subscriptions; } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) { ++lost; - print "Cluster::websocket_client_lost", lost; + print "D Cluster::websocket_client_lost", lost; if ( lost == 3 ) terminate(); +} + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); } # @TEST-END-FILE @@ -100,8 +110,10 @@ def run(ws_url): clients = [ws1, ws2, ws3] print("Connected!") ids = set() - for c in clients: - c.send(json.dumps([topic])) + for i, c in enumerate(clients): + client_topic = f"/test/client-{i}" + c.send(json.dumps([topic, client_topic])) + for c in clients: ack = json.loads(c.recv()) assert "type" in ack, repr(ack) diff --git a/testing/btest/cluster/websocket/two-pipelining.zeek b/testing/btest/cluster/websocket/two-pipelining.zeek index cfef86522f..92ec628c7b 100644 --- a/testing/btest/cluster/websocket/two-pipelining.zeek +++ b/testing/btest/cluster/websocket/two-pipelining.zeek @@ -35,12 +35,6 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/zeek/event/to_manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } - global added = 0; global lost = 0; @@ -51,10 +45,26 @@ type Item: record { global queue: vector of Item; +function is_ready(): bool + { + return added == 2; + } + +function drain_if_ready() + { + if ( is_ready() && |queue| > 0 ) + { + for ( _, item in queue ) + event ping(item$msg, item$n); + + delete queue; + } + } + event ping(msg: string, n: count) &is_used { # Queue the pings if we haven't seen both clients yet. - if ( added < 2 ) + if ( ! is_ready() ) { queue += Item($msg=msg, $n=n); return; @@ -63,20 +73,15 @@ event ping(msg: string, n: count) &is_used ++ping_count; print fmt("B got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, "my-message", ping_count); - Cluster::publish("/zeek/event/to_client", e); + Cluster::publish("/test/clients", e); } event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "A Cluster::websocket_client_added", added, subscriptions; + print "A Cluster::websocket_client_added", subscriptions; - if ( added == 2 ) - { - # Anything in the queue? - for ( _, item in queue ) - event ping(item$msg, item$n); - } + drain_if_ready(); } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) @@ -86,6 +91,21 @@ event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) if ( lost == 2 ) terminate(); } + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); + } # @TEST-END-FILE @@ -95,12 +115,12 @@ from websockets.sync.client import connect ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' -topic = '/zeek/event/to_client' +topic = '/test/clients' def make_ping(c, who): return { "type": "data-message", - "topic": "/zeek/event/to_manager", + "topic": "/test/manager", "@data-type": "vector", "data": [ {"@data-type": "count", "data": 1}, # Format @@ -121,8 +141,9 @@ def run(ws_url): clients = [ws1, ws2] print("Connected!") # Send subscriptions - for ws in clients: - ws.send(json.dumps([topic])) + for c, ws in enumerate(clients, 1): + client_topic = f"/test/client-{c}" + ws.send(json.dumps([topic, client_topic])) for i in range(5): for c, ws in enumerate(clients, 1): @@ -138,7 +159,7 @@ def run(ws_url): assert "version" in ack for i in range(10): - for c, ws in enumerate(clients): + for c, ws in enumerate(clients, 1): print(f"Receiving pong {i} - ws{c}") pong = json.loads(ws.recv()) assert pong["@data-type"] == "vector" diff --git a/testing/btest/cluster/websocket/zeromq.zeek b/testing/btest/cluster/websocket/zeromq.zeek new file mode 100644 index 0000000000..f49942703f --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq.zeek @@ -0,0 +1,178 @@ +# @TEST-DOC: Test WebSockets clients when the ZeroMQ cluster backend is enabled. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek worker.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./worker-1/out +# @TEST-EXEC: btest-diff ./worker-1/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +@TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +@TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; + +redef Broker::disable_ssl = T; + +global ping_count = 0; + +event zeek_init() + { + Cluster::subscribe("/test/pings/"); + } + +event Cluster::node_up(name: string, id: string) + { + print "Cluster::node_up", name; + + # Delay listening on WebSocket clients until worker-1 is around. + if ( name == "worker-1" ) + Cluster::listen_websocket([ + $listen_host="127.0.0.1", + $listen_port=to_port(getenv("WEBSOCKET_PORT")) + ]); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("got ping: %s, %s", msg, n); + local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); + Cluster::publish("/test/clients", e); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print "Cluster::websocket_client_added", added, subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print "Cluster::websocket_client_lost", lost; + if ( lost == 3 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event zeek_init() + { + Cluster::subscribe("/test/pings/"); + } + +event Cluster::node_up(name: string, id: string) + { + print "Cluster::node_up", name; + } + +event Cluster::node_down(name: string, id: string) + { + print "Cluster::node_up", name; + terminate(); + } + +event ping(msg: string, n: count) + { + print fmt("got ping: %s, %s", msg, n); + } +# @TEST-END-FILE + + +@TEST-START-FILE client.py +import json, os, time +from websockets.sync.client import connect + +ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] +ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' + +def make_ping(topic, c): + return { + "type": "data-message", + "topic": topic + str(c), + "@data-type": "vector", + "data": [ + {"@data-type": "count", "data": 1}, # Format + {"@data-type": "count", "data": 1}, # Type + {"@data-type": "vector", "data": [ + { "@data-type": "string", "data": "ping"}, # Event name + { "@data-type": "vector", "data": [ # event args + {"@data-type": "string", "data": f"python-websocket-client"}, + {"@data-type": "count", "data": c}, + ], }, + ], }, + ], + } + +def run(ws_url): + with connect(ws_url) as ws1: + with connect(ws_url) as ws2: + with connect(ws_url) as ws3: + clients = [ws1, ws2, ws3] + print("Connected!") + ids = set() + for i, c in enumerate(clients, 1): + c.send(json.dumps([f"/test/ws/{i}", "/test/pings/"])) + ack = json.loads(c.recv()) + assert "type" in ack, repr(ack) + assert ack["type"] == "ack" + assert "endpoint" in ack, repr(ack) + assert "version" in ack + ids.add(ack["endpoint"]) + + print("unique ids", len(ids)) + ws1.send(json.dumps(make_ping("/test/pings/", 42))) + + # Client 2 and client 3 receive the ping from client 1, client 1 gets a timeout. + for name, ws in [("ws1", ws1), ("ws2", ws2), ("ws3", ws3)]: + try: + data = json.loads(ws.recv(timeout=0.1)) + ev = data["data"][2]["data"] + print(name, "ev: topic", data["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) + except TimeoutError: + print(name, "timeout") + +def main(): + for _ in range(100): + try: + run(ws_url) + break + except ConnectionRefusedError: + time.sleep(0.1) + +if __name__ == "__main__": + main() +@TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/node-topic-prefix.zeek b/testing/btest/cluster/zeromq/node-topic-prefix.zeek new file mode 100644 index 0000000000..c489993f04 --- /dev/null +++ b/testing/btest/cluster/zeromq/node-topic-prefix.zeek @@ -0,0 +1,117 @@ +# @TEST-DOC: Ensure that worker-1 does not observe messages to worker-20 on its Cluster::node_topic() +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only manager.zeek worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-10 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-10 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-20 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-20 zeek -b ../worker.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./worker-1/out +# @TEST-EXEC: btest-diff ./worker-2/out +# @TEST-EXEC: btest-diff ./worker-10/out +# @TEST-EXEC: btest-diff ./worker-20/out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global ping: event(from: string, to: string); +global pong: event(from: string, to: string); +global finish: event(from: string, to: string); + +event zeek_init() + { + print "A", Cluster::node; + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_down = 0; +global nodes_up = 0; + +event send_pings() + { + for ( name, n in Cluster::nodes ) + if ( n$node_type == Cluster::WORKER ) + Cluster::publish(Cluster::node_topic(name), ping, Cluster::node, name); + } + +# If a node comes up, send it a ping +event Cluster::node_up(name: string, id: string) + { + print fmt("B node_up - sending ping to '%s'", name); + ++nodes_up; + + if ( nodes_up == 4 ) + event send_pings(); + } + +event ping(from: string, to: string) + { + # manager node should never see ping events. + print "XXX FAIL ping", from, to; + } + +event pong(from: string, to: string) + { + print fmt("C pong from '%s' to '%s'", from ,to); + Cluster::publish(Cluster::node_topic(from), finish, Cluster::node, from); + } + +# If the worker vanishes, finish the test. +event Cluster::node_down(name: string, id: string) + { + print fmt("D node_down from '%s'", name); + ++nodes_down; + if ( nodes_down == 4 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +# Reply to a ping with a pong. +event ping(from: string, to: string) + { + if ( to != Cluster::node ) + print fmt("FAIL: got ping destined to '%s'", to); + + print fmt("B ping from '%s' to '%s'", from, to); + Cluster::publish(Cluster::node_topic(from), pong, Cluster::node, from); + } + +event finish(from: string, to: string) &is_used + { + print fmt("C finish from '%s' to '%s'", from, to); + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::manager_is_logger = T; +redef Log::default_rotation_interval = 0.0sec; + +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-10"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-20"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], +}; +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek new file mode 100644 index 0000000000..4548a2a98a --- /dev/null +++ b/testing/btest/cluster/zeromq/unsubscribe-two-workers.zeek @@ -0,0 +1,139 @@ +# @TEST-DOC: Regression test for shared unsubscriptions not happening. +# +# Scenario: +# * manager waits for two workers and sends start_test() event +# * workers create subscriptions for /test/worker-1, /test/worker-2 and /test/worker-common +# * manager: Seeing all these subscriptions, subscribe to /test/manager-common +# * workers: Seeing /test/manager-common subscription, unsubscribe /test/worker-common +# * manager: Observes unsubscription for /test/worker-common, unsubscribes from /test/manager-common +# * workers: terminate() when seeing the unsubscription for /test/manager-common +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: zeek --parse-only ./manager.zeek ./worker.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-wait 30 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./manager/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-1/out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff ./worker-2/out + + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global start_test: event() &is_used; +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global worker_subs_seen = 0; +global worker_common_seen = F; +global subscribe_done = F; +global nodes_up = 0; +global nodes_down = 0; + +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/worker-1" || topic == "/test/worker-2" ) + ++worker_subs_seen; + + if ( topic == "/test/worker-common" ) + worker_common_seen = T; + + if ( ! subscribe_done && worker_common_seen && worker_subs_seen == 2 ) + { + print "C subscribing to /test/manager-common"; + Cluster::subscribe("/test/manager-common"); + subscribe_done = T; + } + } + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) + { + if ( ! starts_with(topic, "/test/worker") ) + return; + + print "D unsubscription", topic; + + if ( topic == "/test/worker-common" ) + { + print "E unsubscribing from /test/manager-common"; + Cluster::unsubscribe("/test/manager-common"); + } + } + +event Cluster::node_up(name: string, id: string) + { + print "A node_up", name; + ++nodes_up; + + if ( nodes_up == 2 ) + Cluster::publish(Cluster::worker_topic, start_test); + } + +event Cluster::node_down(name: string, id: string) + { + print "Z node_down", name; + ++nodes_down; + if ( nodes_down == 2 ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event start_test() + { + print "A start_test"; + Cluster::subscribe("/test/worker-common"); + Cluster::subscribe("/test/" + Cluster::node); + } + +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "B subscription", topic; + + if ( topic == "/test/manager-common" ) + { + print "C unsubscribe from /test/worker-common"; + Cluster::unsubscribe("/test/worker-common"); + } + } + +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) + { + if ( ! starts_with(topic, "/test/manager") ) + return; + + print "C unsubscription", topic; + + if ( topic == "/test/manager-common" ) + { + print "D /test/manager-common unsubscribed, terminate()"; + terminate(); + } + } +# @TEST-END-FILE