From c2ca92d77297ee40e5bcc24a37ba00b6661ebce9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 8 Aug 2022 17:15:02 +0200 Subject: [PATCH] Try adding Broker::metrics_import_topics, stuck --- scripts/base/frameworks/broker/main.zeek | 18 ++++++++++++++++++ src/broker/Manager.cc | 21 +++++++++++++++++++++ src/broker/Manager.h | 7 +++++++ src/broker/comm.bif | 14 ++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 7cf8d6991c..a5b1405eff 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -152,6 +152,13 @@ export { ## BROKER_METRICS_EXPORT_TOPIC is defined. option metrics_export_topic = ""; + ## Topics imported for metrics. This should be set on the node that + ## is importing metrics from other nodes. + ## Zeek overrides any value provided in zeek_init or earlier at + ## startup if the environment variable + ## BROKER_METRICS_IMPORT_TOPICS is defined. + option metrics_import_topics: vector of string = vector(); + ## ID for the metrics exporter. When setting a target topic for the ## exporter, Broker sets this option to the suffix of the new topic *unless* ## the ID is a non-empty string. Since setting a topic starts the periodic @@ -462,6 +469,12 @@ function update_metrics_export_topic(id: string, val: string): string return val; } +function update_metrics_import_topics(id: string, topics: vector of string): vector of string + { + Broker::__set_metrics_import_topics(topics); + return topics; + } + function update_metrics_export_endpoint_name(id: string, val: string): string { Broker::__set_metrics_export_endpoint_name(val); @@ -487,6 +500,11 @@ event zeek_init() Broker::metrics_export_topic); Option::set_change_handler("Broker::metrics_export_topic", update_metrics_export_topic); + # import topics + update_metrics_import_topics("Broker::metrics_import_topics", + Broker::metrics_import_topics); + Option::set_change_handler("Broker::metrics_import_topics", + update_metrics_import_topics); # endpoint name update_metrics_export_endpoint_name("Broker::metrics_export_endpoint_name", Broker::metrics_export_endpoint_name); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index f3e330056b..3f727d2eee 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -401,6 +401,21 @@ void Manager::InitPostScript() opt.broker_write(std::move(str_ls)); } } + WITH_OPT_MAPPING("broker.metrics.import.topics", "Broker::metrics_import_topics") + { + if ( auto str = opt.broker_read>() ) + { + opt.zeek_write(*str); + } + else + { + auto ptr = opt.zeek_read()->AsVectorVal(); + std::vector str_ls; + for ( unsigned index = 0; index < ptr->Size(); ++index ) + str_ls.emplace_back(ptr->StringValAt(index)->ToStdString()); + opt.broker_write(std::move(str_ls)); + } + } auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); bstate = std::make_shared(std::move(config), cqs); @@ -2076,6 +2091,12 @@ void Manager::SetMetricsExportTopic(std::string value) bstate->endpoint.metrics_exporter().set_target(std::move(value)); } +void Manager::SetMetricsImportTopics(std::vector topics) + { + // XXX: Uhm... + // bstate->endpoint.metrics_exporter().set_prefixes(std::move(topics)); + } + void Manager::SetMetricsExportEndpointName(std::string value) { bstate->endpoint.metrics_exporter().set_id(std::move(value)); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index afa485869b..27945d892b 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -394,6 +394,13 @@ public: */ void SetMetricsExportTopic(std::string value); + /** + * Sets the import topics for a node importing metrics. + * + * @param topics List of topics from which to import metrics. + */ + void SetMetricsImportTopics(std::vector topics); + /** * Sets a new ID for the metrics exporter. Passing an empty string has no * effect. diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 9c8cf22150..286822b75a 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -199,6 +199,20 @@ function Broker::__set_metrics_export_topic%(value: string%): bool return zeek::val_mgr->True(); %} +function Broker::__set_metrics_import_topics%(filter: string_vec%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + if ( broker_mgr ) + { + std::vector slist; + auto* vval = filter->AsVectorVal(); + for ( unsigned index = 0; index < vval->Size(); ++index ) + slist.emplace_back(vval->StringValAt(index)->ToStdString()); + broker_mgr->SetMetricsImportTopics(std::move(slist)); + } + return zeek::val_mgr->True(); + %} + function Broker::__set_metrics_export_endpoint_name%(value: string%): bool %{ zeek::Broker::Manager::ScriptScopeGuard ssg;