diff --git a/auxil/broker b/auxil/broker index 6ba5a134b0..c8aa2715d0 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 6ba5a134b0a6c29997bda652ca3a8b5b4da7cd9a +Subproject commit c8aa2715d0b29f15a73f7f77599e8f99fdf37a2a diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index b058a8a9bf..f3b620b708 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -31,7 +31,7 @@ export { ## authenticated. const disable_ssl = F &redef; - ## Path to a file containing concatenated trusted certificates + ## Path to a file containing concatenated trusted certificates ## in PEM format. If set, Zeek will require valid certificates for ## all peers. const ssl_cafile = "" &redef; @@ -122,6 +122,25 @@ export { ## done reading the pcap. option peer_counts_as_iosource = T; + ## Frequency for publishing scraped metrics to the target topic. + option metrics_exporter_interval = 1 sec; + + ## Target topic for the metrics. Setting a non-empty string starts the + #periodic publishing of local metrics. + option metrics_exporter_target = ""; + + ## ID for the metrics exporter. When setting a target topic for the + # exporter, Broker sets this option to the suffix of the new topic *unless* + # the ID is a non-empty string. Since setting a topic starts the periodic + # publishing of events, we recommend setting the ID always first (or avoid + # setting it at all if the topic suffix serves as a good ID). + option metrics_exporter_id = ""; + + ## Selects prefixes from the local metrics. Only metrics with prefixes + # listed in this variable are included when publishing local metrics. + # Setting an empty vector selects *all* metrics. + option metrics_exporter_prefixes: vector of string = vector(); + ## The default topic prefix where logs will be published. The log's stream ## id is appended when writing to a particular stream. const default_log_topic_prefix = "zeek/logs/" &redef; @@ -385,9 +404,41 @@ event Broker::log_flush() &priority=10 schedule Broker::log_batch_interval { Broker::log_flush() }; } +function update_metrics_exporter_interval(id: string, val: interval): interval + { + Broker::__set_metrics_exporter_interval(val); + return val; + } + +function update_metrics_exporter_target(id: string, val: string): string + { + Broker::__set_metrics_exporter_target(val); + return val; + } + +function update_metrics_exporter_id(id: string, val: string): string + { + Broker::__set_metrics_exporter_id(val); + return val; + } + +function update_metrics_exporter_prefixes(id: string, filter: vector of string): vector of string + { + Broker::__set_metrics_exporter_prefixes(filter); + return filter; + } + event zeek_init() { schedule Broker::log_batch_interval { Broker::log_flush() }; + Option::set_change_handler("Broker::metrics_exporter_interval", + update_metrics_exporter_interval); + Option::set_change_handler("Broker::metrics_exporter_target", + update_metrics_exporter_target); + Option::set_change_handler("Broker::metrics_exporter_id", + update_metrics_exporter_id); + Option::set_change_handler("Broker::metrics_exporter_prefixes", + update_metrics_exporter_prefixes); } event retry_listen(a: string, p: port, retry: interval) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 8128dcf8b7..a08959615d 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -1807,6 +1807,28 @@ void Manager::PrepareForwarding(const std::string &name) DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); } +void Manager::SetMetricsExporterInterval(double value) + { + broker::timespan ts; + if ( broker::convert(value, ts) ) + bstate->endpoint.metrics_exporter().set_interval(ts); + } + +void Manager::SetMetricsExporterTarget(std::string value) + { + bstate->endpoint.metrics_exporter().set_target(std::move(value)); + } + +void Manager::SetMetricsExporterId(std::string value) + { + bstate->endpoint.metrics_exporter().set_id(std::move(value)); + } + +void Manager::SetMetricsExporterPrefixes(std::vector filter) + { + bstate->endpoint.metrics_exporter().set_prefixes(std::move(filter)); + } + std::unique_ptr Manager::NewTelemetryManager() { // The telemetry Manager actually only has a dependency on the actor system, diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 670a9bcd79..6b8d4791b4 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -359,6 +359,35 @@ public: ~ScriptScopeGuard() { --script_scope; } }; + /** + * Changes the frequency for publishing scraped metrics to the target topic. + * Passing a zero-length interval has no effect. + * @param value Interval between two scrapes in seconds. + */ + void SetMetricsExporterInterval(double value); + + /** + * Sets a new target topic for the metrics. Passing an empty string has no + * effect. + * @param value The new topic for publishing local metrics to. + */ + void SetMetricsExporterTarget(std::string value); + + /** + * Sets a new ID for the metrics exporter. Passing an empty string has no + * effect. + * @param value The new ID of the exporter in published metrics. + */ + void SetMetricsExporterId(std::string value); + + /** + * Sets a prefix selection for the metrics exporter. An empty vector selects + * *all* metrics. + * @param filter List of selected metric prefixes or an empty vector for + * selecting all metrics. + */ + void SetMetricsExporterPrefixes(std::vector filter); + /** * Allocates a new manager for telemetry data. */ diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 35cb69df87..ee29cd5226 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -145,3 +145,35 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeID()); %} + +function Broker::__set_metrics_exporter_interval%(value: interval%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + broker_mgr->SetMetricsExporterInterval(value); + return zeek::val_mgr->True(); + %} + +function Broker::__set_metrics_exporter_target%(value: string%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + broker_mgr->SetMetricsExporterTarget(value->ToStdString()); + return zeek::val_mgr->True(); + %} + +function Broker::__set_metrics_exporter_id%(value: string%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + broker_mgr->SetMetricsExporterId(value->ToStdString()); + return zeek::val_mgr->True(); + %} + +function Broker::__set_metrics_exporter_prefixes%(filter: string_vec%): bool + %{ + zeek::Broker::Manager::ScriptScopeGuard ssg; + std::vector slist; + auto* vval = filter->AsVectorVal(); + for ( unsigned index = 0; index < vval->Size(); ++index ) + slist.emplace_back(vval->StringValAt(index)->ToStdString()); + broker_mgr->SetMetricsExporterPrefixes(std::move(slist)); + return zeek::val_mgr->True(); + %} diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 35ef76d80b..078f82b0fc 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -474,6 +474,14 @@ 0.000000 MetaHookPost CallFunction(Notice::want_pp, , ()) -> 0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (ActiveHTTP::default_max_time, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> 0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (ActiveHTTP::default_method, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_id, Broker::update_metrics_exporter_id{ Broker::__set_metrics_exporter_id(Broker::val)return (Broker::val)}, 0)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_id, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_interval, Broker::update_metrics_exporter_interval{ Broker::__set_metrics_exporter_interval(Broker::val)return (Broker::val)}, 0)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_interval, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_prefixes, Broker::update_metrics_exporter_prefixes{ Broker::__set_metrics_exporter_prefixes(Broker::filter)return (Broker::filter)}, 0)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_prefixes, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_target, Broker::update_metrics_exporter_target{ Broker::__set_metrics_exporter_target(Broker::val)return (Broker::val)}, 0)) -> +0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_target, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> 0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Broker::peer_counts_as_iosource, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> 0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Conn::analyzer_inactivity_timeouts, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> 0.000000 MetaHookPost CallFunction(Option::set_change_handler, , (Conn::default_extract, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) -> @@ -1484,6 +1492,14 @@ 0.000000 MetaHookPre CallFunction(Notice::want_pp, , ()) 0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (ActiveHTTP::default_max_time, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) 0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (ActiveHTTP::default_method, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_id, Broker::update_metrics_exporter_id{ Broker::__set_metrics_exporter_id(Broker::val)return (Broker::val)}, 0)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_id, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_interval, Broker::update_metrics_exporter_interval{ Broker::__set_metrics_exporter_interval(Broker::val)return (Broker::val)}, 0)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_interval, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_prefixes, Broker::update_metrics_exporter_prefixes{ Broker::__set_metrics_exporter_prefixes(Broker::filter)return (Broker::filter)}, 0)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_prefixes, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_target, Broker::update_metrics_exporter_target{ Broker::__set_metrics_exporter_target(Broker::val)return (Broker::val)}, 0)) +0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::metrics_exporter_target, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) 0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Broker::peer_counts_as_iosource, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) 0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Conn::analyzer_inactivity_timeouts, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) 0.000000 MetaHookPre CallFunction(Option::set_change_handler, , (Conn::default_extract, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)) @@ -2493,6 +2509,14 @@ 0.000000 | HookCallFunction Notice::want_pp() 0.000000 | HookCallFunction Option::set_change_handler(ActiveHTTP::default_max_time, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) 0.000000 | HookCallFunction Option::set_change_handler(ActiveHTTP::default_method, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_id, Broker::update_metrics_exporter_id{ Broker::__set_metrics_exporter_id(Broker::val)return (Broker::val)}, 0) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_id, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_interval, Broker::update_metrics_exporter_interval{ Broker::__set_metrics_exporter_interval(Broker::val)return (Broker::val)}, 0) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_interval, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_prefixes, Broker::update_metrics_exporter_prefixes{ Broker::__set_metrics_exporter_prefixes(Broker::filter)return (Broker::filter)}, 0) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_prefixes, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_target, Broker::update_metrics_exporter_target{ Broker::__set_metrics_exporter_target(Broker::val)return (Broker::val)}, 0) +0.000000 | HookCallFunction Option::set_change_handler(Broker::metrics_exporter_target, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) 0.000000 | HookCallFunction Option::set_change_handler(Broker::peer_counts_as_iosource, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) 0.000000 | HookCallFunction Option::set_change_handler(Conn::analyzer_inactivity_timeouts, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100) 0.000000 | HookCallFunction Option::set_change_handler(Conn::default_extract, Config::config_option_changed{ Config::log = (coerce [$ts=network_time(), $id=Config::ID, $old_value=Config::format_value(lookup_ID(Config::ID)), $new_value=Config::format_value(Config::new_value)] to Config::Info)if ( != Config::location) Config::log$location = Config::locationLog::write(Config::LOG, Config::log)return (Config::new_value)}, -100)