cluster/websocket: Factor out active subscription handling

This commit is contained in:
Arne Welzel 2025-04-23 18:01:57 +02:00
parent c27d74b2f9
commit 47206d6a8a
2 changed files with 25 additions and 16 deletions

View file

@ -361,7 +361,6 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
}
auto& entry = it->second;
auto& wsc = entry.wsc;
entry.wsc->SetSubscriptionActive(fin.topic_prefix);
@ -370,21 +369,7 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
return;
}
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
wsc->getRemotePort(), TRANSPORT_TCP);
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version());
WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", fin.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(),
entry.backend->NodeId().c_str());
// Process any queued messages now.
for ( auto& msg : entry.queue ) {
assert(entry.msg_count > 1);
Process(msg);
}
HandleSubscriptionsActive(entry);
}
void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf) {
@ -430,6 +415,26 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
}
}
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {
auto& wsc = entry.wsc;
auto rec = zeek::cluster::detail::bif::make_endpoint_info(entry.backend->NodeId(), wsc->getRemoteIp(),
wsc->getRemotePort(), TRANSPORT_TCP);
auto subscriptions_vec = zeek::cluster::detail::bif::make_string_vec(wsc->GetSubscriptions());
zeek::event_mgr.Enqueue(Cluster::websocket_client_added, std::move(rec), std::move(subscriptions_vec));
entry.wsc->SendAck(entry.backend->NodeId(), zeek::zeek_version());
WS_DEBUG("Sent Ack to client %s (%s:%d) %s\n", entry.id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(),
entry.backend->NodeId().c_str());
// Process any queued messages now.
for ( auto& msg : entry.queue ) {
assert(entry.msg_count > 1);
Process(msg);
}
}
void WebSocketEventDispatcher::HandleEvent(WebSocketClientEntry& entry, std::string_view buf) {
// Unserialize the message as an event.
broker::variant res;

View file

@ -228,6 +228,10 @@ private:
void HandleSubscriptions(WebSocketClientEntry& entry, std::string_view buf);
// Raise the websocket_client_added event and send the ack to the client contained in entry.
void HandleSubscriptionsActive(const WebSocketClientEntry& entry);
void HandleEvent(WebSocketClientEntry& entry, std::string_view buf);
// Allow access to Process(WebSocketEvent)