Try adding Broker::metrics_import_topics, stuck

This commit is contained in:
Arne Welzel 2022-08-08 17:15:02 +02:00
parent 857b21ae73
commit c2ca92d772
4 changed files with 60 additions and 0 deletions

View file

@ -152,6 +152,13 @@ export {
## BROKER_METRICS_EXPORT_TOPIC is defined. ## BROKER_METRICS_EXPORT_TOPIC is defined.
option metrics_export_topic = ""; option metrics_export_topic = "";
## Topics imported for metrics. This should be set on the node that
## is importing metrics from other nodes.
## Zeek overrides any value provided in zeek_init or earlier at
## startup if the environment variable
## BROKER_METRICS_IMPORT_TOPICS is defined.
option metrics_import_topics: vector of string = vector();
## ID for the metrics exporter. When setting a target topic for the ## ID for the metrics exporter. When setting a target topic for the
## exporter, Broker sets this option to the suffix of the new topic *unless* ## exporter, Broker sets this option to the suffix of the new topic *unless*
## the ID is a non-empty string. Since setting a topic starts the periodic ## the ID is a non-empty string. Since setting a topic starts the periodic
@ -462,6 +469,12 @@ function update_metrics_export_topic(id: string, val: string): string
return val; return val;
} }
function update_metrics_import_topics(id: string, topics: vector of string): vector of string
{
Broker::__set_metrics_import_topics(topics);
return topics;
}
function update_metrics_export_endpoint_name(id: string, val: string): string function update_metrics_export_endpoint_name(id: string, val: string): string
{ {
Broker::__set_metrics_export_endpoint_name(val); Broker::__set_metrics_export_endpoint_name(val);
@ -487,6 +500,11 @@ event zeek_init()
Broker::metrics_export_topic); Broker::metrics_export_topic);
Option::set_change_handler("Broker::metrics_export_topic", Option::set_change_handler("Broker::metrics_export_topic",
update_metrics_export_topic); update_metrics_export_topic);
# import topics
update_metrics_import_topics("Broker::metrics_import_topics",
Broker::metrics_import_topics);
Option::set_change_handler("Broker::metrics_import_topics",
update_metrics_import_topics);
# endpoint name # endpoint name
update_metrics_export_endpoint_name("Broker::metrics_export_endpoint_name", update_metrics_export_endpoint_name("Broker::metrics_export_endpoint_name",
Broker::metrics_export_endpoint_name); Broker::metrics_export_endpoint_name);

View file

@ -401,6 +401,21 @@ void Manager::InitPostScript()
opt.broker_write(std::move(str_ls)); opt.broker_write(std::move(str_ls));
} }
} }
WITH_OPT_MAPPING("broker.metrics.import.topics", "Broker::metrics_import_topics")
{
if ( auto str = opt.broker_read<std::vector<std::string>>() )
{
opt.zeek_write(*str);
}
else
{
auto ptr = opt.zeek_read()->AsVectorVal();
std::vector<std::string> str_ls;
for ( unsigned index = 0; index < ptr->Size(); ++index )
str_ls.emplace_back(ptr->StringValAt(index)->ToStdString());
opt.broker_write(std::move(str_ls));
}
}
auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs); bstate = std::make_shared<BrokerState>(std::move(config), cqs);
@ -2076,6 +2091,12 @@ void Manager::SetMetricsExportTopic(std::string value)
bstate->endpoint.metrics_exporter().set_target(std::move(value)); bstate->endpoint.metrics_exporter().set_target(std::move(value));
} }
void Manager::SetMetricsImportTopics(std::vector<std::string> topics)
{
// XXX: Uhm...
// bstate->endpoint.metrics_exporter().set_prefixes(std::move(topics));
}
void Manager::SetMetricsExportEndpointName(std::string value) void Manager::SetMetricsExportEndpointName(std::string value)
{ {
bstate->endpoint.metrics_exporter().set_id(std::move(value)); bstate->endpoint.metrics_exporter().set_id(std::move(value));

View file

@ -394,6 +394,13 @@ public:
*/ */
void SetMetricsExportTopic(std::string value); void SetMetricsExportTopic(std::string value);
/**
* Sets the import topics for a node importing metrics.
*
* @param topics List of topics from which to import metrics.
*/
void SetMetricsImportTopics(std::vector<std::string> topics);
/** /**
* Sets a new ID for the metrics exporter. Passing an empty string has no * Sets a new ID for the metrics exporter. Passing an empty string has no
* effect. * effect.

View file

@ -199,6 +199,20 @@ function Broker::__set_metrics_export_topic%(value: string%): bool
return zeek::val_mgr->True(); return zeek::val_mgr->True();
%} %}
function Broker::__set_metrics_import_topics%(filter: string_vec%): bool
%{
zeek::Broker::Manager::ScriptScopeGuard ssg;
if ( broker_mgr )
{
std::vector<std::string> slist;
auto* vval = filter->AsVectorVal();
for ( unsigned index = 0; index < vval->Size(); ++index )
slist.emplace_back(vval->StringValAt(index)->ToStdString());
broker_mgr->SetMetricsImportTopics(std::move(slist));
}
return zeek::val_mgr->True();
%}
function Broker::__set_metrics_export_endpoint_name%(value: string%): bool function Broker::__set_metrics_export_endpoint_name%(value: string%): bool
%{ %{
zeek::Broker::Manager::ScriptScopeGuard ssg; zeek::Broker::Manager::ScriptScopeGuard ssg;