diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 0369e09446..0d8e5421ca 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -297,12 +297,32 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) { // Generate an ID for this client. auto ws_id = cluster::backend->NodeId() + "-websocket-" + id; - const auto& event_serializer_val = id::find_val("Cluster::event_serializer"); + // If the globally configured backend is CLUSTER_BACKEND_BROKER, then switch + // the WebSocket client's backend to CLUSTER_BACKEND_BROKER_WEBSOCKET_SHIM + // so that pub/sub is using the local broker endpoint via its hub functionality + // instead of instantiating a new Broker manager. + static const auto& event_serializer_val = id::find_val("Cluster::event_serializer"); auto event_serializer = cluster::manager->InstantiateEventSerializer(event_serializer_val); - const auto& cluster_backend_val = id::find_val("Cluster::backend"); + static const auto& cluster_backend_val = id::find_val("Cluster::backend"); + auto effective_backend_val = cluster_backend_val; + + static const auto& broker_enum_val = zeek::id::find_val("Cluster::CLUSTER_BACKEND_BROKER"); + static const auto& broker_ws_shim_enum_val = + zeek::id::find_val("Cluster::CLUSTER_BACKEND_BROKER_WEBSOCKET_SHIM"); + if ( effective_backend_val == broker_enum_val ) { + WS_DEBUG("Using broker websocket shim"); + effective_backend_val = broker_ws_shim_enum_val; + } + auto event_handling_strategy = std::make_unique(wsc, this); - auto backend = zeek::cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), nullptr, - std::move(event_handling_strategy)); + auto backend = zeek::cluster::manager->InstantiateBackend(effective_backend_val, std::move(event_serializer), + nullptr, std::move(event_handling_strategy)); + + if ( ! backend ) { + reporter->Error("Failed to instantiate backend for client with id %s!", id.c_str()); + QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"}); + return; + } WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(), wsc->getRemotePort(), ws_id.c_str(), backend.get());