From e7a876da3569bb6c1df3d97214f72735c230db95 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 16:17:09 +0200 Subject: [PATCH 1/4] cluster/Backend: Add ReadyToPublishCallback() API Provide a mechanism to allow a cluster backend report when it is ready for publish operations. This is primarily useful for ZeroMQ which has sender-side filtering and is only really ready for publishing when it has learned about subscriptions from other nodes. --- src/cluster/Backend.cc | 5 +++++ src/cluster/Backend.h | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 575d0aeaf7..cbadf97950 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -111,6 +111,11 @@ std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsS return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp}; } +void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { + Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success}; + cb(info); +} + // Default implementation doing the serialization. bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) { byte_buffer buf; diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 1b09fb4c88..b8a5a11143 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -269,6 +269,31 @@ public: */ bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + /** + * Information passed to a ready callback. + */ + using ReadyCallbackInfo = SubscriptionCallbackInfo; + + using ReadyCallback = std::function; + + /** + * Register a "ready to publish" callback. + * + * Some cluster backend implementations may not be immediately ready for + * publish operations. For example, ZeroMQ has sender-side subscription + * filtering and discards messages until the XPUB socket learns about + * subscriptions in a cluster. + * + * The callback mechanism allows backends to notify the caller that it + * has now determined readiness for publish operations. + * + * Callers should be prepared that \a cb is invoked immediately as that + * is the default implementation for DoReadyToPublishCallback(). + * + * @param cb The callback to invoke when the backend is ready for publish operations. + */ + void ReadyToPublishCallback(ReadyCallback cb) { DoReadyToPublishCallback(std::move(cb)); } + /** * Publish multiple log records. * @@ -450,6 +475,13 @@ private: */ virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0; + /** + * Register a "ready to publish" callback. + * + * @param cb The callback to invoke when the backend is ready for publish operations. + */ + virtual void DoReadyToPublishCallback(ReadyCallback cb); + /** * Serialize a log batch, then forward it to DoPublishLogWrites() below. From 643b9266259c4fb78f662988674552c5fd17dd05 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 16:20:07 +0200 Subject: [PATCH 2/4] cluster/zeromq: Implement DoReadyToPublishCallback() The ZeroMQ heuristic for "ready to publish" is to create an unique and ephemeral subscription using the XSUB socket and observe it arrive on the XPUB socket. At this point, visibility into other node's subscriptions is provided. --- .../cluster/backend/zeromq/main.zeek | 12 +++++++ src/cluster/backend/zeromq/ZeroMQ.cc | 31 +++++++++++++++++++ src/cluster/backend/zeromq/ZeroMQ.h | 4 +++ 3 files changed, 47 insertions(+) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..34c127ddf3 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -218,6 +218,18 @@ export { ## subscriptions and hello messages from other ## nodes. These expirations trigger reporter warnings. const hello_expiration: interval = 10sec &redef; + + ## The topic prefix used for internal ZeroMQ specific communication. + ## + ## This is used for the "ready to publish callback" topics. + ## + ## Zeek creates a short-lived subscription for a auto-generated + ## topic name with this prefix and waits for it to be confirmed + ## on its XPUB socket. Once this happens, the XPUB socket should've + ## also received all other active subscriptions of other nodes in a + ## cluster from the central XPUB/XSUB proxy and therefore can be + ## deemed ready for publish operations. + const internal_topic_prefix = "zeek.zeromq.internal." &redef; } redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index a620c8d82c..077100c000 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -16,6 +16,7 @@ #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" +#include "zeek/ID.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" @@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() { linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); + internal_topic_prefix = + zeek::id::find_const("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString(); proxy_io_threads = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get()); @@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { } } +void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) { + // Setup an ephemeral subscription for a topic produced with the internal + // topic prefix, this backend's node identifier and an incrementing counter. + // When the SubscribeCallback for the subscription is invoked, meaning it + // has become visible on the XPUB socket, call the provided ready callback + // and cancel the subscription by unsubscribing from the topic again. + // + // The heuristic here is that seeing a subscription created by the node itself + // also leads to the XPUB/XSUB proxy having sent all subscriptions from other + // nodes in the cluster. + // + // Without this heuristic, short-lived WebSocket clients may fail to publish + // messages as ZeroMQ implements sender-side subscription filtering and simply + // discards messages to topics for which it hasn't seen any subscriptions yet. + static int ready_topic_counter = 0; + ++ready_topic_counter; + + auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) { + Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message}; + cb(info); + + // Unsubscribe again, we're not actually interested in this topic. + Unsubscribe(topic_prefix); + }; + + std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter); + Subscribe(topic, std::move(scb)); +} } // namespace cluster::zeromq } // namespace zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index f09bfa09db..dae822160b 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -71,6 +71,8 @@ private: bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override; + void DoReadyToPublishCallback(ReadyCallback cb) override; + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint; @@ -84,6 +86,8 @@ private: zeek_uint_t poll_max_messages = 0; zeek_uint_t debug_flags = 0; + std::string internal_topic_prefix; + EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription; From 2cd2a2b8a6bcda2e1260cdc34e7dc5cf6318cda9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 16:21:24 +0200 Subject: [PATCH 3/4] cluster/websocket: Leverage ReadyToPublishCallback() Change WebSocket client handling to return only when the ready to publish callback has been invoked. --- src/cluster/websocket/WebSocket.cc | 39 ++++++++++++++++++++++++------ src/cluster/websocket/WebSocket.h | 10 +++++++- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 1f2610a99b..8ca19a91de 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -372,6 +372,30 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { return; } + if ( ! entry.ready_to_publish ) { + // Still waiting for the backend to be ready. + return; + } + + HandleSubscriptionsActive(entry); +} + +void WebSocketEventDispatcher::Process(const WebSocketBackendReadyToPublish& ready) { + const auto& it = clients.find(ready.id); + if ( it == clients.end() ) { + reporter->Error("Backend ready from non-existing WebSocket client with id %s!", ready.id.c_str()); + return; + } + + auto& entry = it->second; + + entry.ready_to_publish = true; + + if ( ! entry.wsc->AllSubscriptionsActive() ) { + // More subscriptions to come! + return; + } + HandleSubscriptionsActive(entry); } @@ -397,14 +421,6 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, entry.wsc->SetSubscriptions(subscriptions); - // Short-circuit setting up subscriptions and directly reply with - // an ack if the client didn't request any topic subscriptions. - if ( subscriptions.empty() ) { - assert(entry.wsc->AllSubscriptionsActive()); - HandleSubscriptionsActive(entry); - return; - } - auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { if ( info.status == Backend::CallbackStatus::Error ) { @@ -424,6 +440,13 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"}); } } + + // Register a callback to be invoked when the backend is ready for publishing. + entry.backend->ReadyToPublishCallback([this, id = entry.id](const auto& info) { + // Ready callbacks are supposed to run on the main thread, + // so we can just start processing a WebSocketBackendReady. + Process(WebSocketBackendReadyToPublish{id}); + }); } void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) { diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 4dd0965372..296c9a731d 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -145,7 +145,13 @@ struct WebSocketSubscribeFinished { std::string topic_prefix; }; -using WebSocketEvent = std::variant; +// Internally created when the backend of a Websocket client is ready. +struct WebSocketBackendReadyToPublish { + std::string id; +}; + +using WebSocketEvent = std::variant; struct WebSocketSendReply { std::shared_ptr wsc; @@ -211,6 +217,7 @@ private: void Process(const WebSocketOpen& open); void Process(const WebSocketSubscribeFinished& fin); + void Process(const WebSocketBackendReadyToPublish& ready); void Process(const WebSocketMessage& msg); void Process(const WebSocketClose& close); @@ -222,6 +229,7 @@ private: std::string id; std::shared_ptr wsc; std::shared_ptr backend; + bool ready_to_publish = false; uint64_t msg_count = 0; std::list queue; }; From 43a1bab960ed53e59d47c6e7f4beca9e48672cf3 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 16:22:40 +0200 Subject: [PATCH 4/4] btest/cluster/websocket: Move no-subscriptions test ...and also add one for broker. --- .../..client..stderr | 0 .../..client..stdout | 0 .../..manager..stderr | 0 .../..manager..stdout | 33 ++++++++++ .../..client..stderr} | 2 - .../..client..stdout | 1 + .../..manager..stderr | 2 + .../..manager..stdout | 33 ++++++++++ .../websocket/broker/no-subscriptions.zeek | 66 +++++++++++++++++++ .../{ => zeromq}/no-subscriptions.zeek | 26 ++++---- 10 files changed, 148 insertions(+), 15 deletions(-) rename testing/btest/Baseline/{cluster.websocket.no-subscriptions => cluster.websocket.broker.no-subscriptions}/..client..stderr (100%) rename testing/btest/Baseline/{cluster.websocket.no-subscriptions => cluster.websocket.broker.no-subscriptions}/..client..stdout (100%) rename testing/btest/Baseline/{cluster.websocket.no-subscriptions => cluster.websocket.broker.no-subscriptions}/..manager..stderr (100%) create mode 100644 testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout rename testing/btest/Baseline/{cluster.websocket.no-subscriptions/..manager..stdout => cluster.websocket.zeromq.no-subscriptions/..client..stderr} (70%) create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout create mode 100644 testing/btest/cluster/websocket/broker/no-subscriptions.zeek rename testing/btest/cluster/websocket/{ => zeromq}/no-subscriptions.zeek (76%) diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stderr diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stdout similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stdout diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stderr diff --git a/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..944dde8125 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 1, 1 +got ping, 2, 2 +got ping, 3, 3 +got ping, 4, 4 +got ping, 5, 5 +got ping, 6, 6 +got ping, 7, 7 +got ping, 8, 8 +got ping, 9, 9 +got ping, 10, 10 +got ping, 11, 11 +got ping, 12, 12 +got ping, 13, 13 +got ping, 14, 14 +got ping, 15, 15 +got ping, 16, 16 +got ping, 17, 17 +got ping, 18, 18 +got ping, 19, 19 +got ping, 20, 20 +got ping, 21, 21 +got ping, 22, 22 +got ping, 23, 23 +got ping, 24, 24 +got ping, 25, 25 +got ping, 26, 26 +got ping, 27, 27 +got ping, 28, 28 +got ping, 29, 29 +got ping, 30, 30 +got ping, 31, 31 +got ping, 32, 32 diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr similarity index 70% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout rename to testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr index bb622d5ce2..49d861c74c 100644 --- a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr @@ -1,3 +1 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::websocket_client_added, [] -got ping, 42 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..944dde8125 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 1, 1 +got ping, 2, 2 +got ping, 3, 3 +got ping, 4, 4 +got ping, 5, 5 +got ping, 6, 6 +got ping, 7, 7 +got ping, 8, 8 +got ping, 9, 9 +got ping, 10, 10 +got ping, 11, 11 +got ping, 12, 12 +got ping, 13, 13 +got ping, 14, 14 +got ping, 15, 15 +got ping, 16, 16 +got ping, 17, 17 +got ping, 18, 18 +got ping, 19, 19 +got ping, 20, 20 +got ping, 21, 21 +got ping, 22, 22 +got ping, 23, 23 +got ping, 24, 24 +got ping, 25, 25 +got ping, 26, 26 +got ping, 27, 27 +got ping, 28, 28 +got ping, 29, 29 +got ping, 30, 30 +got ping, 31, 31 +got ping, 32, 32 diff --git a/testing/btest/cluster/websocket/broker/no-subscriptions.zeek b/testing/btest/cluster/websocket/broker/no-subscriptions.zeek new file mode 100644 index 0000000000..968e5b40bc --- /dev/null +++ b/testing/btest/cluster/websocket/broker/no-subscriptions.zeek @@ -0,0 +1,66 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], +}; +# @TEST-END-FILE +# +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; +redef Broker::disable_ssl = T; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global ping_count = 0; +const ping_count_expected = 32; + +event ping(c: count) &is_used + { + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek similarity index 76% rename from testing/btest/cluster/websocket/no-subscriptions.zeek rename to testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek index 09701e5fc9..305b0919eb 100644 --- a/testing/btest/cluster/websocket/no-subscriptions.zeek +++ b/testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek @@ -36,28 +36,28 @@ event zeek_init() Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); } +global ping_count = 0; +const ping_count_expected = 32; + event ping(c: count) &is_used { - print "got ping", c; - terminate(); - } - -event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) - { - print "Cluster::websocket_client_added", subscriptions; + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); } # @TEST-END-FILE - # @TEST-START-FILE client.py import wstest def run(ws_url): - with wstest.connect("ws1", ws_url) as tc: - tc.send_json([]) # Send no subscriptions - ack = tc.recv_json() - assert ack.get("type") == "ack", f"{ack}" - tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1])) if __name__ == "__main__": wstest.main(run, wstest.WS4_URL_V1)