From 47206d6a8af539fe2209cba6556e2ea262f56849 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 23 Apr 2025 18:01:57 +0200 Subject: [PATCH 1/2] cluster/websocket: Factor out active subscription handling --- src/cluster/websocket/WebSocket.cc | 37 +++++++++++++++++------------- src/cluster/websocket/WebSocket.h | 4 ++++ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 2ceff5b9d6..428101fedf 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -361,7 +361,6 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { } auto& entry = it->second; - auto& wsc = entry.wsc; entry.wsc->SetSubscriptionActive(fin.topic_prefix); @@ -370,21 +369,7 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { return; } - auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(), - wsc->getRemotePort(), TRANSPORT_TCP); - auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions()); - zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec)); - - entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version()); - - WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", fin.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), - entry.backend->NodeId().c_str()); - - // Process any queued messages now. - for ( auto& msg : entry.queue ) { - assert(entry.msg_count > 1); - Process(msg); - } + HandleSubscriptionsActive(entry); } void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { @@ -430,6 +415,26 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, } } +void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) { + auto& wsc = entry.wsc; + + auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(), + wsc->getRemotePort(), TRANSPORT_TCP); + auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions()); + zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec)); + + entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version()); + + WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", entry.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), + entry.backend->NodeId().c_str()); + + // Process any queued messages now. + for ( auto& msg : entry.queue ) { + assert(entry.msg_count > 1); + Process(msg); + } +} + void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) { // Unserialize the message as an event. broker::variant res; diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index a635c7dbd8..4dd0965372 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -228,6 +228,10 @@ private: void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); + + // Raise the websocket_client_added event and send the ack to the client contained in entry. + void HandleSubscriptionsActive(const WebSocketClientEntry& entry); + void HandleEvent(WebSocketClientEntry& entry, std::string_view buf); // Allow access to Process(WebSocketEvent) From 23f0370e915c4aa846810b5230d687ac28c19125 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 23 Apr 2025 18:12:51 +0200 Subject: [PATCH 2/2] cluster/websocket: Short-circuit clients without subscriptions --- src/cluster/websocket/WebSocket.cc | 8 +++ .../..client..stderr | 1 + .../..client..stdout | 1 + .../..manager..stderr | 2 + .../..manager..stdout | 3 + .../cluster/websocket/no-subscriptions.zeek | 67 +++++++++++++++++++ 6 files changed, 82 insertions(+) create mode 100644 testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout create mode 100644 testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout create mode 100644 testing/btest/cluster/websocket/no-subscriptions.zeek diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 428101fedf..c70fd1c728 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -394,6 +394,14 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, entry.wsc->SetSubscriptions(subscriptions); + // Short-circuit setting up subscriptions and directly reply with + // an ack if the client didn't request any topic subscriptions. + if ( subscriptions.empty() ) { + assert(entry.wsc->AllSubscriptionsActive()); + HandleSubscriptionsActive(entry); + return; + } + auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { if ( info.status == Backend::CallbackStatus::Error ) { diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..bb622d5ce2 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +got ping, 42 diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/no-subscriptions.zeek new file mode 100644 index 0000000000..50405ea74b --- /dev/null +++ b/testing/btest/cluster/websocket/no-subscriptions.zeek @@ -0,0 +1,67 @@ +# @TEST-DOC: Regression test: A WebSocket client sending no subscriptions wasn't receiving back an ack. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global expected_ping_count = 100; +global ping_count = 0; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(c: count) &is_used + { + print "got ping", c; + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE