diff --git a/NEWS b/NEWS index f5ed50e96d..63fc31dc44 100644 --- a/NEWS +++ b/NEWS @@ -86,6 +86,11 @@ New Functionality that client may still be in transit and later executed, even on the node running the WebSocket server. + WebSocket clients connecting to a server started using ``Cluster::listen_websocket()`` + are automatically subscribed to a client specific topic produced by a new function + ``Cluster::websocket_client_topic()`` in the ``Cluster`` module, even if they did + not provide any subscriptions themselves. + Changed Functionality --------------------- diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 30511f9b82..6f5ac9ad3b 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -59,6 +59,18 @@ export { ## a unique node in a cluster. Used with broker-enabled cluster communication. const nodeid_topic_prefix = "zeek/cluster/nodeid/" &redef; + ## Parts used for automatic WebSocket client topic subscription. + ## + ## Every WebSocket client is automatically subscribed to a topic + ## produced by joining and suffixing this vector and the WebSocket + ## client's identifier with :zeek:see:`Cluster::websocket_topic_sep`. + const websocket_topic_prefix_parts = vector("zeek", "cluster", "websocket", "client") &redef; + + ## Separator used for creating automatic WebSocket client topic subscriptions. + ## + ## See also :zeek:see:`Cluster::websocket_topic_prefix_parts`. + const websocket_topic_sep = "/" &redef; + ## Name of the node on which master data stores will be created if no other ## has already been specified by the user in :zeek:see:`Cluster::stores`. ## An empty value means "use whatever name corresponds to the manager @@ -298,6 +310,14 @@ export { ## a given cluster node. global nodeid_topic: function(id: string): string &redef; + ## Retrieve the topic associated with a WebSocket client in the cluster. + ## + ## id: the id for the WebSocket client (:zeek:see:`Cluster::websocket_client_added`) + ## + ## Returns: a topic string that may used to send a message exclusively to + ## a given WebSocket client. + global websocket_client_topic: function(id: string): string; + ## Retrieve the cluster-level naming of a node based on its node ID, ## a backend-specific identifier. ## @@ -473,6 +493,14 @@ function nodeid_topic(id: string): string return nodeid_topic_prefix + id + "/"; } +function websocket_client_topic(id: string): string + { + local v = copy(websocket_topic_prefix_parts); + v += id; + v += ""; + return join_string_vec(v, websocket_topic_sep); + } + function nodeid_to_node(id: string): NamedNode { for ( name, n in nodes ) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..4dfc5d8bae 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -261,6 +261,7 @@ redef Cluster::worker_pool_spec = Cluster::PoolSpec( $topic = "zeek.cluster.pool.worker", $node_type = Cluster::WORKER); +redef Cluster::websocket_topic_sep = "."; # Configure listen_log_endpoint based on port in cluster-layout, if any. @if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 1f2610a99b..f0d10590b5 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -8,7 +8,10 @@ #include #include +#include "zeek/Func.h" +#include "zeek/ID.h" #include "zeek/Reporter.h" +#include "zeek/Type.h" #include "zeek/cluster/Backend.h" #include "zeek/cluster/BifSupport.h" #include "zeek/cluster/Manager.h" @@ -194,8 +197,10 @@ const std::vector WebSocketClient::GetSubscriptions() const { std::vector subs; subs.reserve(subscriptions_state.size()); - for ( const auto& [topic, _] : subscriptions_state ) - subs.emplace_back(topic); + for ( const auto& [topic, _] : subscriptions_state ) { + if ( topic != auto_topic ) + subs.emplace_back(topic); + } return subs; } @@ -375,6 +380,17 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { HandleSubscriptionsActive(entry); } +// Invokes Cluster::websocket_client_topic() and returns the result as std::string, empty on error. +std::string WebSocketEventDispatcher::GetAutoTopic(const std::string& nid) const { + static const auto& func = zeek::id::find_func("Cluster::websocket_client_topic"); + auto result = func->Invoke(zeek::make_intrusive(nid)); + + if ( ! result || ! IsString(result->GetType()->Tag()) ) + return std::string{}; + + return result->AsStringVal()->ToStdString(); +} + void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { rapidjson::Document doc; doc.Parse(buf.data(), buf.size()); @@ -395,15 +411,16 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, subscriptions.emplace_back(doc[i].GetString()); } - entry.wsc->SetSubscriptions(subscriptions); - - // Short-circuit setting up subscriptions and directly reply with - // an ack if the client didn't request any topic subscriptions. - if ( subscriptions.empty() ) { - assert(entry.wsc->AllSubscriptionsActive()); - HandleSubscriptionsActive(entry); - return; + // Add the auto topic to this WebSocket client. + auto auto_topic = GetAutoTopic(entry.backend->NodeId()); + if ( ! auto_topic.empty() ) { + subscriptions.emplace_back(auto_topic); + entry.wsc->SetAutoTopic(auto_topic); } + else + zeek::reporter->InternalWarning("Failed to get auto topic for WebSocket client %s!", entry.id.c_str()); + + entry.wsc->SetSubscriptions(subscriptions); auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 4dd0965372..e5c8aadd18 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -100,7 +100,7 @@ public: void SetSubscriptions(const std::vector& topic_prefixes); /** - * @return The client's subscriptions. + * @return The client's subscriptions, excluding the auto topic. */ const std::vector GetSubscriptions() const; @@ -114,9 +114,22 @@ public: */ bool AllSubscriptionsActive() const; + /** + * Set the WebSocket client's automatically created subscription topic. + * + * @param topic The auto topic. + */ + void SetAutoTopic(const std::string& topic) { auto_topic = topic; } + + /** + * @return the WebSocket client's auto topic. + */ + const std::string& GetAutoTopic() const { return auto_topic; } + private: bool acked = false; std::map subscriptions_state; + std::string auto_topic; // Internally generated topic for this websocket client. }; // An new WebSocket client connected. Client is locally identified by `id`. @@ -226,6 +239,9 @@ private: std::list queue; }; + // Invoke Zeek script function Cluster::websocket_client_topic() + // with a WebSocket client's identifier to produce the auto topic. + std::string GetAutoTopic(const std::string& nid) const; void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout new file mode 100644 index 0000000000..a211c39354 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..client..stdout @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connected +got ack +ack[endpoint] in topic_parts True +event [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 42}]}] diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout new file mode 100644 index 0000000000..7c3460d16c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.auto-subscription/..manager..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +ping, 42 +Cluster::websocket_client_lost diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout index bb622d5ce2..48f97ff996 100644 --- a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout @@ -1,3 +1,65 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. Cluster::websocket_client_added, [] -got ping, 42 +got ping, 42, 1 +Cluster::websocket_client_added, [] +got ping, 42, 2 +Cluster::websocket_client_added, [] +got ping, 42, 3 +Cluster::websocket_client_added, [] +got ping, 42, 4 +Cluster::websocket_client_added, [] +got ping, 42, 5 +Cluster::websocket_client_added, [] +got ping, 42, 6 +Cluster::websocket_client_added, [] +got ping, 42, 7 +Cluster::websocket_client_added, [] +got ping, 42, 8 +Cluster::websocket_client_added, [] +got ping, 42, 9 +Cluster::websocket_client_added, [] +got ping, 42, 10 +Cluster::websocket_client_added, [] +got ping, 42, 11 +Cluster::websocket_client_added, [] +got ping, 42, 12 +Cluster::websocket_client_added, [] +got ping, 42, 13 +Cluster::websocket_client_added, [] +got ping, 42, 14 +Cluster::websocket_client_added, [] +got ping, 42, 15 +Cluster::websocket_client_added, [] +got ping, 42, 16 +Cluster::websocket_client_added, [] +got ping, 42, 17 +Cluster::websocket_client_added, [] +got ping, 42, 18 +Cluster::websocket_client_added, [] +got ping, 42, 19 +Cluster::websocket_client_added, [] +got ping, 42, 20 +Cluster::websocket_client_added, [] +got ping, 42, 21 +Cluster::websocket_client_added, [] +got ping, 42, 22 +Cluster::websocket_client_added, [] +got ping, 42, 23 +Cluster::websocket_client_added, [] +got ping, 42, 24 +Cluster::websocket_client_added, [] +got ping, 42, 25 +Cluster::websocket_client_added, [] +got ping, 42, 26 +Cluster::websocket_client_added, [] +got ping, 42, 27 +Cluster::websocket_client_added, [] +got ping, 42, 28 +Cluster::websocket_client_added, [] +got ping, 42, 29 +Cluster::websocket_client_added, [] +got ping, 42, 30 +Cluster::websocket_client_added, [] +got ping, 42, 31 +Cluster::websocket_client_added, [] +got ping, 42, 32 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout new file mode 100644 index 0000000000..a211c39354 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..client..stdout @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connected +got ack +ack[endpoint] in topic_parts True +event [{'@data-type': 'string', 'data': 'pong'}, {'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 42}]}] diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout new file mode 100644 index 0000000000..7c3460d16c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.auto-subscription/..manager..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +ping, 42 +Cluster::websocket_client_lost diff --git a/testing/btest/cluster/websocket/broker/auto-subscription.zeek b/testing/btest/cluster/websocket/broker/auto-subscription.zeek new file mode 100644 index 0000000000..1ba4ea7e97 --- /dev/null +++ b/testing/btest/cluster/websocket/broker/auto-subscription.zeek @@ -0,0 +1,88 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], +}; +# @TEST-END-FILE +# +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; +redef Broker::disable_ssl = T; + +global ws_client_topic = ""; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global pong: event(c: count) &is_used; + +event ping(c: count) &is_used + { + print "ping", c; + # Reply with a pong on the WebSocket client's auto topic. + Cluster::publish(ws_client_topic, pong, c); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + ws_client_topic = Cluster::websocket_client_topic(info$id); + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + print("connected") + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + print("got ack") + assert ack.get("type") == "ack", f"{ack}" + + # Send a ping to the manager. + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + pong = tc.recv_json(timeout=3) + topic, event = pong["topic"], pong["data"][2]["data"][0:2] + topic_parts = topic.split("/") + print("ack[endpoint] in topic_parts", ack["endpoint"] in topic_parts) + print("event", event) + + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/no-subscriptions.zeek index 09701e5fc9..2450ce58e6 100644 --- a/testing/btest/cluster/websocket/no-subscriptions.zeek +++ b/testing/btest/cluster/websocket/no-subscriptions.zeek @@ -36,10 +36,15 @@ event zeek_init() Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); } +global ping_count = 0; +const ping_count_expected = 32; + event ping(c: count) &is_used { - print "got ping", c; - terminate(); + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); } event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) @@ -53,11 +58,12 @@ event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions import wstest def run(ws_url): - with wstest.connect("ws1", ws_url) as tc: - tc.send_json([]) # Send no subscriptions - ack = tc.recv_json() - assert ack.get("type") == "ack", f"{ack}" - tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) if __name__ == "__main__": wstest.main(run, wstest.WS4_URL_V1) diff --git a/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek b/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek new file mode 100644 index 0000000000..1b9b13db96 --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq/auto-subscription.zeek @@ -0,0 +1,86 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global ws_client_topic = ""; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global pong: event(c: count) &is_used; + +event ping(c: count) &is_used + { + print "ping", c; + # Reply with a pong on the WebSocket client's auto topic. + Cluster::publish(ws_client_topic, pong, c); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + ws_client_topic = Cluster::websocket_client_topic(info$id); + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "Cluster::websocket_client_lost"; + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + print("connected") + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + print("got ack") + assert ack.get("type") == "ack", f"{ack}" + + # Send a ping to the manager. + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + pong = tc.recv_json(timeout=3) + topic, event = pong["topic"], pong["data"][2]["data"][0:2] + topic_parts = topic.split(".") + print("ack[endpoint] in topic_parts", ack["endpoint"] in topic_parts) + print("event", event) + + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE