Merge remote-tracking branch 'origin/topic/awelzel/websocket-empty-subscriptions'

* origin/topic/awelzel/websocket-empty-subscriptions:
  cluster/websocket: Short-circuit clients without subscriptions
  cluster/websocket: Factor out active subscription handling
This commit is contained in:
Arne Welzel 2025-04-24 08:17:08 +02:00
commit 79c4fdb237
9 changed files with 114 additions and 17 deletions

View file

@ -1,3 +1,9 @@
7.2.0-dev.617 | 2025-04-24 08:17:08 +0200
* cluster/websocket: Short-circuit clients without subscriptions (Arne Welzel, Corelight)
* cluster/websocket: Factor out active subscription handling (Arne Welzel, Corelight)
7.2.0-dev.613 | 2025-04-23 12:14:48 -0700 7.2.0-dev.613 | 2025-04-23 12:14:48 -0700
* Statically lookup field offsets for connection values in UDP and ICMP analyzers (Tim Wojtulewicz, Corelight) * Statically lookup field offsets for connection values in UDP and ICMP analyzers (Tim Wojtulewicz, Corelight)

View file

@ -1 +1 @@
7.2.0-dev.613 7.2.0-dev.617

View file

@ -361,7 +361,6 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
} }
auto& entry = it->second; auto& entry = it->second;
auto& wsc = entry.wsc;
entry.wsc->SetSubscriptionActive(fin.topic_prefix); entry.wsc->SetSubscriptionActive(fin.topic_prefix);
@ -370,21 +369,7 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
return; return;
} }
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(), HandleSubscriptionsActive(entry);
wsc->getRemotePort(), TRANSPORT_TCP);
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version());
WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", fin.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(),
entry.backend->NodeId().c_str());
// Process any queued messages now.
for ( auto& msg : entry.queue ) {
assert(entry.msg_count > 1);
Process(msg);
}
} }
void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) { void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) {
@ -409,6 +394,14 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
entry.wsc->SetSubscriptions(subscriptions); entry.wsc->SetSubscriptions(subscriptions);
// Short-circuit setting up subscriptions and directly reply with
// an ack if the client didn't request any topic subscriptions.
if ( subscriptions.empty() ) {
assert(entry.wsc->AllSubscriptionsActive());
HandleSubscriptionsActive(entry);
return;
}
auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic,
const Backend::SubscriptionCallbackInfo& info) { const Backend::SubscriptionCallbackInfo& info) {
if ( info.status == Backend::CallbackStatus::Error ) { if ( info.status == Backend::CallbackStatus::Error ) {
@ -430,6 +423,26 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
} }
} }
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {
auto& wsc = entry.wsc;
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
wsc->getRemotePort(), TRANSPORT_TCP);
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version());
WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", entry.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(),
entry.backend->NodeId().c_str());
// Process any queued messages now.
for ( auto& msg : entry.queue ) {
assert(entry.msg_count > 1);
Process(msg);
}
}
void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) { void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) {
// Unserialize the message as an event. // Unserialize the message as an event.
broker::variant res; broker::variant res;

View file

@ -228,6 +228,10 @@ private:
void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf); void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf);
// Raise the websocket_client_added event and send the ack to the client contained in entry.
void HandleSubscriptionsActive(const WebSocketClientEntry& entry);
void HandleEvent(WebSocketClientEntry& entry, std::string_view buf); void HandleEvent(WebSocketClientEntry& entry, std::string_view buf);
// Allow access to Process(WebSocketEvent) // Allow access to Process(WebSocketEvent)

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, []
got ping, 42

View file

@ -0,0 +1,67 @@
# @TEST-DOC: Regression test: A WebSocket client sending no subscriptions wasn't receiving back an ack.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/.stdout
# @TEST-EXEC: btest-diff ./manager/.stderr
# @TEST-EXEC: btest-diff ./client/.stdout
# @TEST-EXEC: btest-diff ./client/.stderr
# @TEST-START-FILE manager.zeek
@load ./zeromq-test-bootstrap
redef exit_only_after_terminate = T;
global expected_ping_count = 100;
global ping_count = 0;
event zeek_init()
{
Cluster::subscribe("/test/pings");
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
event ping(c: count) &is_used
{
print "got ping", c;
terminate();
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import wstest
def run(ws_url):
with wstest.connect("ws1", ws_url) as tc:
tc.send_json([]) # Send no subscriptions
ack = tc.recv_json()
assert ack.get("type") == "ack", f"{ack}"
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42]))
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE