diff --git a/CHANGES b/CHANGES index f984e795b4..e7842cbd4c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,9 @@ +7.2.0-dev.617 | 2025-04-24 08:17:08 +0200 + + * cluster/websocket: Short-circuit clients without subscriptions (Arne Welzel, Corelight) + + * cluster/websocket: Factor out active subscription handling (Arne Welzel, Corelight) + 7.2.0-dev.613 | 2025-04-23 12:14:48 -0700 * Statically lookup field offsets for connection values in UDP and ICMP analyzers (Tim Wojtulewicz, Corelight) diff --git a/VERSION b/VERSION index f99f0890ae..09c3c6cd07 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.613 +7.2.0-dev.617 diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 2ceff5b9d6..c70fd1c728 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -361,7 +361,6 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { } auto& entry = it->second; - auto& wsc = entry.wsc; entry.wsc->SetSubscriptionActive(fin.topic_prefix); @@ -370,21 +369,7 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { return; } - auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(), - wsc->getRemotePort(), TRANSPORT_TCP); - auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions()); - zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec)); - - entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version()); - - WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", fin.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), - entry.backend->NodeId().c_str()); - - // Process any queued messages now. - for ( auto& msg : entry.queue ) { - assert(entry.msg_count > 1); - Process(msg); - } + HandleSubscriptionsActive(entry); } void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { @@ -409,6 +394,14 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, entry.wsc->SetSubscriptions(subscriptions); + // Short-circuit setting up subscriptions and directly reply with + // an ack if the client didn't request any topic subscriptions. + if ( subscriptions.empty() ) { + assert(entry.wsc->AllSubscriptionsActive()); + HandleSubscriptionsActive(entry); + return; + } + auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { if ( info.status == Backend::CallbackStatus::Error ) { @@ -430,6 +423,26 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, } } +void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) { + auto& wsc = entry.wsc; + + auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(), + wsc->getRemotePort(), TRANSPORT_TCP); + auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions()); + zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec)); + + entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version()); + + WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", entry.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), + entry.backend->NodeId().c_str()); + + // Process any queued messages now. + for ( auto& msg : entry.queue ) { + assert(entry.msg_count > 1); + Process(msg); + } +} + void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) { // Unserialize the message as an event. broker::variant res; diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index a635c7dbd8..4dd0965372 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -228,6 +228,10 @@ private: void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); + + // Raise the websocket_client_added event and send the ack to the client contained in entry. + void HandleSubscriptionsActive(const WebSocketClientEntry& entry); + void HandleEvent(WebSocketClientEntry& entry, std::string_view buf); // Allow access to Process(WebSocketEvent) diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..bb622d5ce2 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Cluster::websocket_client_added, [] +got ping, 42 diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/no-subscriptions.zeek new file mode 100644 index 0000000000..50405ea74b --- /dev/null +++ b/testing/btest/cluster/websocket/no-subscriptions.zeek @@ -0,0 +1,67 @@ +# @TEST-DOC: Regression test: A WebSocket client sending no subscriptions wasn't receiving back an ack. +# +# @TEST-REQUIRES: have-zeromq +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +@load ./zeromq-test-bootstrap +redef exit_only_after_terminate = T; + +global expected_ping_count = 100; +global ping_count = 0; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +event ping(c: count) &is_used + { + print "got ping", c; + terminate(); + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "Cluster::websocket_client_added", subscriptions; + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE