From 591e3400d4af9b0199fbd9a9bc495b83973a4e8f Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 9 Apr 2025 17:50:25 +0200 Subject: [PATCH] broker/Manager: Add MakeHub() and ReleaseHub() These are used by WebSocket clients to create broker::hub instances --- src/broker/Manager.cc | 14 +++++++++++--- src/broker/Manager.h | 12 ++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 5cb63149c0..69d543ad79 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -282,6 +282,7 @@ Manager::Manager(bool arg_use_real_time) : Backend("Broker", nullptr, nullptr, n bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; + hub_count = 0; log_batch_size = 0; log_topic_func = nullptr; log_id_type = nullptr; @@ -509,7 +510,7 @@ bool Manager::Active() { if ( bound_port > 0 ) return true; - return peer_count > 0; + return peer_count > 0 || hub_count > 0; } void Manager::AdvanceTime(double seconds_since_unix_epoch) { @@ -652,7 +653,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, if ( bstate->endpoint.is_shutdown() ) return true; - if ( peer_count == 0 ) + if ( peer_count == 0 && hub_count == 0 ) return true; broker::zeek::Event ev(name, args, broker::to_timestamp(ts)); @@ -666,7 +667,7 @@ bool Manager::PublishEvent(string topic, RecordVal* args) { if ( bstate->endpoint.is_shutdown() ) return true; - if ( peer_count == 0 ) + if ( peer_count == 0 && hub_count == 0 ) return true; if ( ! args->HasField(0) ) @@ -1987,4 +1988,11 @@ void Manager::PrepareForwarding(const std::string& name) { DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); } +broker::hub Manager::MakeHub(broker::filter_type ft) { + ++hub_count; + return bstate->endpoint.make_hub(std::move(ft)); +} + +void Manager::DestroyHub(broker::hub&& hub) { --hub_count; } + } // namespace zeek::Broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 215e9efdf5..b1fdc8105c 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -478,6 +479,16 @@ private: const char* Tag() override { return "Broker::Manager"; } double GetNextTimeout() override { return -1; } + + // Allow WebSocketShim access to MakeHub() and DestroyHub() + friend class WebSocketState; + + // Create a hub for WebSocket clients. + broker::hub MakeHub(broker::filter_type ft); + + // This hub is to be destroyed. + void DestroyHub(broker::hub&& hub); + struct LogBuffer { // Indexed by topic string. std::unordered_map msgs; @@ -511,6 +522,7 @@ private: uint16_t bound_port; bool use_real_time; int peer_count; + int hub_count; size_t log_batch_size; Func* log_topic_func;