broker/Manager: Add MakeHub() and ReleaseHub()

These are used by WebSocket clients to create broker::hub instances
This commit is contained in:
Arne Welzel 2025-04-09 17:50:25 +02:00
parent c9d7418a23
commit 591e3400d4
2 changed files with 23 additions and 3 deletions

View file

@ -282,6 +282,7 @@ Manager::Manager(bool arg_use_real_time) : Backend("Broker", nullptr, nullptr, n
bound_port = 0;
use_real_time = arg_use_real_time;
peer_count = 0;
hub_count = 0;
log_batch_size = 0;
log_topic_func = nullptr;
log_id_type = nullptr;
@ -509,7 +510,7 @@ bool Manager::Active() {
if ( bound_port > 0 )
return true;
return peer_count > 0;
return peer_count > 0 || hub_count > 0;
}
void Manager::AdvanceTime(double seconds_since_unix_epoch) {
@ -652,7 +653,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
if ( peer_count == 0 && hub_count == 0 )
return true;
broker::zeek::Event ev(name, args, broker::to_timestamp(ts));
@ -666,7 +667,7 @@ bool Manager::PublishEvent(string topic, RecordVal* args) {
if ( bstate->endpoint.is_shutdown() )
return true;
if ( peer_count == 0 )
if ( peer_count == 0 && hub_count == 0 )
return true;
if ( ! args->HasField(0) )
@ -1987,4 +1988,11 @@ void Manager::PrepareForwarding(const std::string& name) {
DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str());
}
broker::hub Manager::MakeHub(broker::filter_type ft) {
++hub_count;
return bstate->endpoint.make_hub(std::move(ft));
}
void Manager::DestroyHub(broker::hub&& hub) { --hub_count; }
} // namespace zeek::Broker

View file

@ -6,6 +6,7 @@
#include <broker/backend_options.hh>
#include <broker/detail/hash.hh>
#include <broker/endpoint_info.hh>
#include <broker/hub.hh>
#include <broker/peer_info.hh>
#include <broker/store.hh>
#include <broker/zeek.hh>
@ -478,6 +479,16 @@ private:
const char* Tag() override { return "Broker::Manager"; }
double GetNextTimeout() override { return -1; }
// Allow WebSocketShim access to MakeHub() and DestroyHub()
friend class WebSocketState;
// Create a hub for WebSocket clients.
broker::hub MakeHub(broker::filter_type ft);
// This hub is to be destroyed.
void DestroyHub(broker::hub&& hub);
struct LogBuffer {
// Indexed by topic string.
std::unordered_map<std::string, broker::zeek::BatchBuilder> msgs;
@ -511,6 +522,7 @@ private:
uint16_t bound_port;
bool use_real_time;
int peer_count;
int hub_count;
size_t log_batch_size;
Func* log_topic_func;