From 8edec9885a496ae80843e873923bd75b4917af01 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 13:53:16 +0200 Subject: [PATCH] cluster/websocket: Automatic WebSocket client topic subscription Subscribe every WebSocket client to a unique topic, by default under zeek/cluster/websocket/client// Add tests that verify that WebSocket clients receive messages on these topics even if they didn't explicitly pass them in their handshake message. This is somewhere between feature and bug fix. It aids the ZeroMQ backend implementation: A WebSocket client that doesn't provide any subscriptions and immediately starts publishing would discard events until receiving other nodes subscriptions from the central XPUB/XSUB proxy. ZeroMQ does sender side topic filtering. When using subscriptions, the client waits until its own subscriptions are returned from the central XPUB/XSUB proxy, thereby also learning about other node's subscriptions. Also, make the no-subscriptions.zeek test use 32 clients sequentially to trigger potential issues more quickly. --- NEWS | 5 ++ scripts/base/frameworks/cluster/main.zeek | 28 ++++++ .../cluster/backend/zeromq/main.zeek | 1 + src/cluster/websocket/WebSocket.cc | 37 +++++--- src/cluster/websocket/WebSocket.h | 18 +++- .../..client..stderr | 1 + .../..client..stdout | 5 ++ .../..manager..stderr | 2 + .../..manager..stdout | 4 + .../..manager..stdout | 64 +++++++++++++- .../..client..stderr | 1 + .../..client..stdout | 5 ++ .../..manager..stderr | 2 + .../..manager..stdout | 4 + .../websocket/broker/auto-subscription.zeek | 88 +++++++++++++++++++ .../cluster/websocket/no-subscriptions.zeek | 20 +++-- .../websocket/zeromq/auto-subscription.zeek | 86 ++++++++++++++++++ 17 files changed, 352 insertions(+), 19 deletions(-) create mode 100644 testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout create mode 100644 testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout create mode 100644 testing/btest/cluster/websocket/broker/auto-subscription.zeek create mode 100644 testing/btest/cluster/websocket/zeromq/auto-subscription.zeek diff --git a/NEWS b/NEWS index f5ed50e96d..63fc31dc44 100644 --- a/NEWS +++ b/NEWS @@ -86,6 +86,11 @@ New Functionality that client may still be in transit and later executed, even on the node running the WebSocket server. + WebSocket clients connecting to a server started using ``Cluster::listen_websocket()`` + are automatically subscribed to a client specific topic produced by a new function + ``Cluster::websocket_client_topic()`` in the ``Cluster`` module, even if they did + not provide any subscriptions themselves. + Changed Functionality --------------------- diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 30511f9b82..6f5ac9ad3b 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -59,6 +59,18 @@ export { ## a unique node in a cluster. Used with broker-enabled cluster communication. const nodeid_topic_prefix = "zeek/cluster/nodeid/" &redef; + ## Parts used for automatic WebSocket client topic subscription. + ## + ## Every WebSocket client is automatically subscribed to a topic + ## produced by joining and suffixing this vector and the WebSocket + ## client's identifier with :zeek:see:`Cluster::websocket_topic_sep`. + const websocket_topic_prefix_parts = vector("zeek", "cluster", "websocket", "client") &redef; + + ## Separator used for creating automatic WebSocket client topic subscriptions. + ## + ## See also :zeek:see:`Cluster::websocket_topic_prefix_parts`. + const websocket_topic_sep = "/" &redef; + ## Name of the node on which master data stores will be created if no other ## has already been specified by the user in :zeek:see:`Cluster::stores`. ## An empty value means "use whatever name corresponds to the manager @@ -298,6 +310,14 @@ export { ## a given cluster node. global nodeid_topic: function(id: string): string &redef; + ## Retrieve the topic associated with a WebSocket client in the cluster. + ## + ## id: the id for the WebSocket client (:zeek:see:`Cluster::websocket_client_added`) + ## + ## Returns: a topic string that may used to send a message exclusively to + ## a given WebSocket client. + global websocket_client_topic: function(id: string): string; + ## Retrieve the cluster-level naming of a node based on its node ID, ## a backend-specific identifier. ## @@ -473,6 +493,14 @@ function nodeid_topic(id: string): string return nodeid_topic_prefix + id + "/"; } +function websocket_client_topic(id: string): string + { + local v = copy(websocket_topic_prefix_parts); + v += id; + v += ""; + return join_string_vec(v, websocket_topic_sep); + } + function nodeid_to_node(id: string): NamedNode { for ( name, n in nodes ) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..4dfc5d8bae 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -261,6 +261,7 @@ redef Cluster::worker_pool_spec = Cluster::PoolSpec( $topic = "zeek.cluster.pool.worker", $node_type = Cluster::WORKER); +redef Cluster::websocket_topic_sep = "."; # Configure listen_log_endpoint based on port in cluster-layout, if any. @if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 1f2610a99b..f0d10590b5 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -8,7 +8,10 @@ #include #include +#include "zeek/Func.h" +#include "zeek/ID.h" #include "zeek/Reporter.h" +#include "zeek/Type.h" #include "zeek/cluster/Backend.h" #include "zeek/cluster/BifSupport.h" #include "zeek/cluster/Manager.h" @@ -194,8 +197,10 @@ const std::vector WebSocketClient::GetSubscriptions() const { std::vector subs; subs.reserve(subscriptions_state.size()); - for ( const auto& [topic, _] : subscriptions_state ) - subs.emplace_back(topic); + for ( const auto& [topic, _] : subscriptions_state ) { + if ( topic != auto_topic ) + subs.emplace_back(topic); + } return subs; } @@ -375,6 +380,17 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { HandleSubscriptionsActive(entry); } +// Invokes Cluster::websocket_client_topic() and returns the result as std::string, empty on error. +std::string WebSocketEventDispatcher::GetAutoTopic(const std::string& nid) const { + static const auto& func = zeek::id::find_func("Cluster::websocket_client_topic"); + auto result = func->Invoke(zeek::make_intrusive(nid)); + + if ( ! result || ! IsString(result->GetType()->Tag()) ) + return std::string{}; + + return result->AsStringVal()->ToStdString(); +} + void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { rapidjson::Document doc; doc.Parse(buf.data(), buf.size()); @@ -395,15 +411,16 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, subscriptions.emplace_back(doc[i].GetString()); } - entry.wsc->SetSubscriptions(subscriptions); - - // Short-circuit setting up subscriptions and directly reply with - // an ack if the client didn't request any topic subscriptions. - if ( subscriptions.empty() ) { - assert(entry.wsc->AllSubscriptionsActive()); - HandleSubscriptionsActive(entry); - return; + // Add the auto topic to this WebSocket client. + auto auto_topic = GetAutoTopic(entry.backend->NodeId()); + if ( ! auto_topic.empty() ) { + subscriptions.emplace_back(auto_topic); + entry.wsc->SetAutoTopic(auto_topic); } + else + zeek::reporter->InternalWarning("Failed to get auto topic for WebSocket client %s!", entry.id.c_str()); + + entry.wsc->SetSubscriptions(subscriptions); auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 4dd0965372..e5c8aadd18 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -100,7 +100,7 @@ public: void SetSubscriptions(const std::vector& topic_prefixes); /** - * @return The client's subscriptions. + * @return The client's subscriptions, excluding the auto topic. */ const std::vector GetSubscriptions() const; @@ -114,9 +114,22 @@ public: */ bool AllSubscriptionsActive() const; + /** + * Set the WebSocket client's automatically created subscription topic. + * + * @param topic The auto topic. + */ + void SetAutoTopic(const std::string& topic) { auto_topic = topic; } + + /** + * @return the WebSocket client's auto topic. + */ + const std::string& GetAutoTopic() const { return auto_topic; } + private: bool acked = false; std::map subscriptions_state; + std::string auto_topic; // Internally generated topic for this websocket client. }; // An new WebSocket client connected. Client is locally identified by `id`. @@ -226,6 +239,9 @@ private: std::list queue; }; + // Invoke Zeek script function Cluster::websocket_client_topic() + // with a WebSocket client's identifier to produce the auto topic. + std::string GetAutoTopic(const std::string& nid) const; void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout new file mode 100644 index 0000000000..a211c39354 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connected +got ack +ack[endpoint] in topic_parts True +event [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 42}]}] diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout new file mode 100644 index 0000000000..7c3460d16c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +ping, 42 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout index bb622d5ce2..48f97ff996 100644 --- a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout @@ -1,3 +1,65 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. Cluster::websocket_client_added, [] -got ping, 42 +got ping, 42, 1 +Cluster::websocket_client_added, [] +got ping, 42, 2 +Cluster::websocket_client_added, [] +got ping, 42, 3 +Cluster::websocket_client_added, [] +got ping, 42, 4 +Cluster::websocket_client_added, [] +got ping, 42, 5 +Cluster::websocket_client_added, [] +got ping, 42, 6 +Cluster::websocket_client_added, [] +got ping, 42, 7 +Cluster::websocket_client_added, [] +got ping, 42, 8 +Cluster::websocket_client_added, [] +got ping, 42, 9 +Cluster::websocket_client_added, [] +got ping, 42, 10 +Cluster::websocket_client_added, [] +got ping, 42, 11 +Cluster::websocket_client_added, [] +got ping, 42, 12 +Cluster::websocket_client_added, [] +got ping, 42, 13 +Cluster::websocket_client_added, [] +got ping, 42, 14 +Cluster::websocket_client_added, [] +got ping, 42, 15 +Cluster::websocket_client_added, [] +got ping, 42, 16 +Cluster::websocket_client_added, [] +got ping, 42, 17 +Cluster::websocket_client_added, [] +got ping, 42, 18 +Cluster::websocket_client_added, [] +got ping, 42, 19 +Cluster::websocket_client_added, [] +got ping, 42, 20 +Cluster::websocket_client_added, [] +got ping, 42, 21 +Cluster::websocket_client_added, [] +got ping, 42, 22 +Cluster::websocket_client_added, [] +got ping, 42, 23 +Cluster::websocket_client_added, [] +got ping, 42, 24 +Cluster::websocket_client_added, [] +got ping, 42, 25 +Cluster::websocket_client_added, [] +got ping, 42, 26 +Cluster::websocket_client_added, [] +got ping, 42, 27 +Cluster::websocket_client_added, [] +got ping, 42, 28 +Cluster::websocket_client_added, [] +got ping, 42, 29 +Cluster::websocket_client_added, [] +got ping, 42, 30 +Cluster::websocket_client_added, [] +got ping, 42, 31 +Cluster::websocket_client_added, [] +got ping, 42, 32 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout new file mode 100644 index 0000000000..a211c39354 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connected +got ack +ack[endpoint] in topic_parts True +event [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 42}]}] diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout new file mode 100644 index 0000000000..7c3460d16c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +ping, 42 +Cluster::websocket_client_lost diff --git a/testing/btest/cluster/websocket/broker/auto-subscription.zeek b/testing/btest/cluster/websocket/broker/auto-subscription.zeek new file mode 100644 index 0000000000..1ba4ea7e97 --- /dev/null +++ b/testing/btest/cluster/websocket/broker/auto-subscription.zeek @@ -0,0 +1,88 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], +}; +# @TEST-END-FILE +# +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; +redef Broker::disable_ssl = T; + +global ws_client_topic = ""; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global pong: event(c: count) &is_used; + +event ping(c: count) &is_used + { + print "ping", c; + # Reply with a pong on the WebSocket client's auto topic. + Cluster::publish(ws_client_topic, pong, c); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + ws_client_topic = Cluster::websocket_client_topic(info$id); + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + print("connected") + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + print("got ack") + assert ack.get("type") == "ack", f"{ack}" + + # Send a ping to the manager. + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + pong = tc.recv_json(timeout=3) + topic, event = pong["topic"], pong["data"][2]["data"][0:2] + topic_parts = topic.split("/") + print("ack[endpoint] in topic_parts", ack["endpoint"] in topic_parts) + print("event", event) + + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/no-subscriptions.zeek index 09701e5fc9..2450ce58e6 100644 --- a/testing/btest/cluster/websocket/no-subscriptions.zeek +++ b/testing/btest/cluster/websocket/no-subscriptions.zeek @@ -36,10 +36,15 @@ event zeek_init() Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); } +global ping_count = 0; +const ping_count_expected = 32; + event ping(c: count) &is_used { - print "got ping", c; - terminate(); + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); } event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) @@ -53,11 +58,12 @@ event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions import wstest def run(ws_url): - with wstest.connect("ws1", ws_url) as tc: - tc.send_json([]) # Send no subscriptions - ack = tc.recv_json() - assert ack.get("type") == "ack", f"{ack}" - tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) if __name__ == "__main__": wstest.main(run, wstest.WS4_URL_V1) diff --git a/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek b/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek new file mode 100644 index 0000000000..1b9b13db96 --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek @@ -0,0 +1,86 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ws_client_topic = ""; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global pong: event(c: count) &is_used; + +event ping(c: count) &is_used + { + print "ping", c; + # Reply with a pong on the WebSocket client's auto topic. + Cluster::publish(ws_client_topic, pong, c); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + ws_client_topic = Cluster::websocket_client_topic(info$id); + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + print("connected") + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + print("got ack") + assert ack.get("type") == "ack", f"{ack}" + + # Send a ping to the manager. + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + pong = tc.recv_json(timeout=3) + topic, event = pong["topic"], pong["data"][2]["data"][0:2] + topic_parts = topic.split(".") + print("ack[endpoint] in topic_parts", ack["endpoint"] in topic_parts) + print("event", event) + + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE