diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 1f2610a99b..8ca19a91de 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -372,6 +372,30 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { return; } + if ( ! entry.ready_to_publish ) { + // Still waiting for the backend to be ready. + return; + } + + HandleSubscriptionsActive(entry); +} + +void WebSocketEventDispatcher::Process(const WebSocketBackendReadyToPublish& ready) { + const auto& it = clients.find(ready.id); + if ( it == clients.end() ) { + reporter->Error("Backend ready from non-existing WebSocket client with id %s!", ready.id.c_str()); + return; + } + + auto& entry = it->second; + + entry.ready_to_publish = true; + + if ( ! entry.wsc->AllSubscriptionsActive() ) { + // More subscriptions to come! + return; + } + HandleSubscriptionsActive(entry); } @@ -397,14 +421,6 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, entry.wsc->SetSubscriptions(subscriptions); - // Short-circuit setting up subscriptions and directly reply with - // an ack if the client didn't request any topic subscriptions. - if ( subscriptions.empty() ) { - assert(entry.wsc->AllSubscriptionsActive()); - HandleSubscriptionsActive(entry); - return; - } - auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { if ( info.status == Backend::CallbackStatus::Error ) { @@ -424,6 +440,13 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"}); } } + + // Register a callback to be invoked when the backend is ready for publishing. + entry.backend->ReadyToPublishCallback([this, id = entry.id](const auto& info) { + // Ready callbacks are supposed to run on the main thread, + // so we can just start processing a WebSocketBackendReady. + Process(WebSocketBackendReadyToPublish{id}); + }); } void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) { diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 4dd0965372..296c9a731d 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -145,7 +145,13 @@ struct WebSocketSubscribeFinished { std::string topic_prefix; }; -using WebSocketEvent = std::variant; +// Internally created when the backend of a Websocket client is ready. +struct WebSocketBackendReadyToPublish { + std::string id; +}; + +using WebSocketEvent = std::variant; struct WebSocketSendReply { std::shared_ptr wsc; @@ -211,6 +217,7 @@ private: void Process(const WebSocketOpen& open); void Process(const WebSocketSubscribeFinished& fin); + void Process(const WebSocketBackendReadyToPublish& ready); void Process(const WebSocketMessage& msg); void Process(const WebSocketClose& close); @@ -222,6 +229,7 @@ private: std::string id; std::shared_ptr wsc; std::shared_ptr backend; + bool ready_to_publish = false; uint64_t msg_count = 0; std::list queue; };