cluster/websocket: Special case broker backend for shim usage

When Cluster::backend is configured with CLUSTER_BACKEND_BROKER, switch
WebSocketClients to CLUSTER_BACKEND_BROKER_WEBSOCKET_SHIM instead.

Instead of the special case, we could also add something to Backend
called NewWebSocketBackend(), but if it only affects broker, I think
the special case is okay for now.
This commit is contained in:
Arne Welzel 2025-04-09 17:52:12 +02:00
parent 76c508f001
commit 193350483e

View file

@ -297,12 +297,32 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
// Generate an ID for this client. // Generate an ID for this client.
auto ws_id = cluster::backend->NodeId() + "-websocket-" + id; auto ws_id = cluster::backend->NodeId() + "-websocket-" + id;
const auto& event_serializer_val = id::find_val<zeek::EnumVal>("Cluster::event_serializer"); // If the globally configured backend is CLUSTER_BACKEND_BROKER, then switch
// the WebSocket client's backend to CLUSTER_BACKEND_BROKER_WEBSOCKET_SHIM
// so that pub/sub is using the local broker endpoint via its hub functionality
// instead of instantiating a new Broker manager.
static const auto& event_serializer_val = id::find_val<zeek::EnumVal>("Cluster::event_serializer");
auto event_serializer = cluster::manager->InstantiateEventSerializer(event_serializer_val); auto event_serializer = cluster::manager->InstantiateEventSerializer(event_serializer_val);
const auto& cluster_backend_val = id::find_val<zeek::EnumVal>("Cluster::backend"); static const auto& cluster_backend_val = id::find_val<zeek::EnumVal>("Cluster::backend");
auto effective_backend_val = cluster_backend_val;
static const auto& broker_enum_val = zeek::id::find_val<EnumVal>("Cluster::CLUSTER_BACKEND_BROKER");
static const auto& broker_ws_shim_enum_val =
zeek::id::find_val<EnumVal>("Cluster::CLUSTER_BACKEND_BROKER_WEBSOCKET_SHIM");
if ( effective_backend_val == broker_enum_val ) {
WS_DEBUG("Using broker websocket shim");
effective_backend_val = broker_ws_shim_enum_val;
}
auto event_handling_strategy = std::make_unique<WebSocketEventHandlingStrategy>(wsc, this); auto event_handling_strategy = std::make_unique<WebSocketEventHandlingStrategy>(wsc, this);
auto backend = zeek::cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), nullptr, auto backend = zeek::cluster::manager->InstantiateBackend(effective_backend_val, std::move(event_serializer),
std::move(event_handling_strategy)); nullptr, std::move(event_handling_strategy));
if ( ! backend ) {
reporter->Error("Failed to instantiate backend for client with id %s!", id.c_str());
QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"});
return;
}
WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(), WS_DEBUG("New WebSocket client %s (%s:%d) - using id %s backend=%p", id.c_str(), wsc->getRemoteIp().c_str(),
wsc->getRemotePort(), ws_id.c_str(), backend.get()); wsc->getRemotePort(), ws_id.c_str(), backend.get());