Remove everything related to aggregation

This commit is contained in:
Tim Wojtulewicz 2024-03-12 09:56:56 -07:00
parent 643bb38419
commit 4718e5cf00
11 changed files with 60 additions and 166 deletions

View file

@ -7,38 +7,16 @@ module Telemetry;
export { export {
## Port used to make metric data available to Prometheus scrapers via ## Port used to make metric data available to Prometheus scrapers via
## HTTP. Zeek overrides any value provided in zeek_init or earlier at ## HTTP. Zeek overrides any value provided in zeek_init or earlier at
## startup if the environment variable ZEEK_METRICS_PORT is defined. ## startup if the environment variable ZEEK_METRICS_PORT is defined.
const metrics_port = 0/unknown &redef; const metrics_port = 0/unknown &redef;
## Frequency for publishing scraped metrics to the target topic. Zeek ## ID for the metrics exporter. This is used as the 'endpoint' label
## value when exporting data to Prometheus. In a cluster setup, this
## defaults to the name of the node in the cluster configuration. Zeek
## overrides any value provided in zeek_init or earlier at startup if ## overrides any value provided in zeek_init or earlier at startup if
## the environment variable ZEEK_METRICS_EXPORT_INTERVAL is defined. ## the environment variable ZEEK_METRICS_ENDPOINT_NAME is defined.
const metrics_export_interval = 1 sec &redef; const metrics_endpoint_name = "" &redef;
## Target topic for the metrics. Setting a non-empty string starts the
## periodic publishing of local metrics. Zeek overrides any value
## provided in zeek_init or earlier at startup if the environment
## variable ZEEK_METRICS_EXPORT_TOPIC is defined.
const metrics_export_topic = "" &redef;
## Topics for the telmeetry framework for collecting metrics from other
## peers in the network and including them in the output. Has no effect
## when not exporting the metrics to Prometheus.
##
## Zeek overrides any value provided in zeek_init or earlier at startup
## if the environment variable ZEEK_METRICS_IMPORT_TOPICS is defined.
const metrics_import_topics: vector of string = vector() &redef;
## ID for the metrics exporter. When setting a target topic for the
## exporter, Broker sets this option to the suffix of the new topic
## *unless* the ID is a non-empty string. Since setting a topic starts
## the periodic publishing of events, we recommend setting the ID always
## first or avoid setting it at all if the topic suffix serves as a
## good-enough ID. Zeek overrides any value provided in zeek_init or
## earlier at startup if the environment variable
## ZEEK_METRICS_ENDPOINT_NAME is defined.
const metrics_export_endpoint_name = "" &redef;
## Selects prefixes from the local metrics. Only metrics with prefixes ## Selects prefixes from the local metrics. Only metrics with prefixes
## listed in this variable are included when publishing local metrics. ## listed in this variable are included when publishing local metrics.

View file

@ -13,16 +13,10 @@
@if ( Cluster::is_enabled() ) @if ( Cluster::is_enabled() )
# Use Cluster::node as "endpoint" label redef Telemetry::metrics_endpoint_name = Cluster::node;
redef Telemetry::metrics_export_endpoint_name = Cluster::node;
# The manager opens port 9911 and imports metrics from all nodes by default.
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
redef Telemetry::metrics_port = 9911/tcp; redef Telemetry::metrics_port = 9911/tcp;
redef Telemetry::metrics_import_topics = vector("zeek/cluster/metrics/");
@else
redef Telemetry::metrics_export_topic = "zeek/cluster/metrics/";
@endif @endif
@endif @endif

View file

@ -6,3 +6,9 @@ zeek_add_subdir_library(
ProcessStats.cc ProcessStats.cc
BIFS BIFS
telemetry.bif) telemetry.bif)
# We don't need to include the civetweb headers across the whole project, only
# here in the telemetry framework.
target_include_directories(
zeek_telemetry_obj BEFORE
PUBLIC ${PROJECT_SOURCE_DIR}/auxil/prometheus-cpp/3rdparty/civetweb/include)

View file

@ -125,8 +125,7 @@ public:
for ( const auto& ctr : counters ) { for ( const auto& ctr : counters ) {
auto label_values_vec = make_intrusive<VectorVal>(string_vec_type); auto label_values_vec = make_intrusive<VectorVal>(string_vec_type);
for ( const auto& [label_key, label] : ctr->Labels() ) for ( const auto& [label_key, label] : ctr->Labels() )
if ( label_key != "endpoint" ) label_values_vec->Append(make_intrusive<StringVal>(label));
label_values_vec->Append(make_intrusive<StringVal>(label));
auto r = make_intrusive<zeek::RecordVal>(metric_record_type); auto r = make_intrusive<zeek::RecordVal>(metric_record_type);
r->Assign(labels_idx, label_values_vec); r->Assign(labels_idx, label_values_vec);

View file

@ -157,8 +157,7 @@ public:
for ( const auto& g : gauges ) { for ( const auto& g : gauges ) {
auto label_values_vec = make_intrusive<VectorVal>(string_vec_type); auto label_values_vec = make_intrusive<VectorVal>(string_vec_type);
for ( const auto& [label_key, label] : g->Labels() ) for ( const auto& [label_key, label] : g->Labels() )
if ( label_key != "endpoint" ) label_values_vec->Append(make_intrusive<StringVal>(label));
label_values_vec->Append(make_intrusive<StringVal>(label));
auto r = make_intrusive<zeek::RecordVal>(metric_record_type); auto r = make_intrusive<zeek::RecordVal>(metric_record_type);
r->Assign(labels_idx, label_values_vec); r->Assign(labels_idx, label_values_vec);

View file

@ -138,8 +138,7 @@ public:
for ( const auto& h : histograms ) { for ( const auto& h : histograms ) {
auto label_values_vec = make_intrusive<VectorVal>(string_vec_type); auto label_values_vec = make_intrusive<VectorVal>(string_vec_type);
for ( const auto& [label_key, label] : h->Labels() ) for ( const auto& [label_key, label] : h->Labels() )
if ( label_key != "endpoint" ) label_values_vec->Append(make_intrusive<StringVal>(label));
label_values_vec->Append(make_intrusive<StringVal>(label));
auto r = make_intrusive<zeek::RecordVal>(histogram_metric_type); auto r = make_intrusive<zeek::RecordVal>(histogram_metric_type);
r->Assign(labels_idx, label_values_vec); r->Assign(labels_idx, label_values_vec);

View file

@ -2,6 +2,12 @@
#include "zeek/telemetry/Manager.h" #include "zeek/telemetry/Manager.h"
#define RAPIDJSON_HAS_STDSTRING 1
// CivetServer is from the civetweb submodule in prometheus-cpp
#include <CivetServer.h>
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <algorithm> #include <algorithm>
#include <thread> #include <thread>
#include <variant> #include <variant>
@ -19,14 +25,10 @@ namespace zeek::telemetry {
Manager::Manager() { prometheus_registry = std::make_shared<prometheus::Registry>(); } Manager::Manager() { prometheus_registry = std::make_shared<prometheus::Registry>(); }
void Manager::InitPostScript() { void Manager::InitPostScript() {
// Metrics port setting is used to calculate a URL for prometheus scraping
std::string prometheus_url; std::string prometheus_url;
if ( auto env = getenv("ZEEK_METRICS_PORT") ) if ( auto env = getenv("ZEEK_METRICS_PORT") )
prometheus_url = util::fmt("localhost:%s", env); prometheus_url = util::fmt("localhost:%s", env);
else if ( auto env = getenv("BROKER_METRICS_PORT") ) {
// Remove this in v7.1 when the Broker variables are removed
reporter->Warning("BROKER_METRICS_PORT is deprecated, use ZEEK_METRICS_PORT.");
prometheus_url = util::fmt("localhost:%s", env);
}
else { else {
auto metrics_port = id::find_val("Telemetry::metrics_port")->AsPortVal(); auto metrics_port = id::find_val("Telemetry::metrics_port")->AsPortVal();
if ( metrics_port->Port() == 0 ) if ( metrics_port->Port() == 0 )
@ -38,114 +40,35 @@ void Manager::InitPostScript() {
} }
if ( ! prometheus_url.empty() ) { if ( ! prometheus_url.empty() ) {
printf("prometheus configured\n"); printf("prometheus configured: %s\n", prometheus_url.c_str());
prometheus_exposer = std::make_unique<prometheus::Exposer>(prometheus_url); CivetCallbacks* callbacks = nullptr;
// if ( ! request_topic.empty() ) {
// callbacks = new CivetCallbacks();
// callbacks->begin_request = [](struct mg_connection* conn) -> int {
// printf("begin_request\n");
// // We only care about requests made to the /metrics endpoint. There are other request
// // made to the server that we can ignore, such as favicon.ico.
// auto req_info = mg_get_request_info(conn);
// if ( strcmp(req_info->request_uri, "/metrics") == 0 ) {
// // send a request to a topic for data from workers
// printf("posting event\n");
// broker_mgr->PublishEvent(telemetry_mgr->RequestTopic(), "Telemetry::remote_request",
// broker::vector{});
// // wait a few seconds for workers to respond
// // TODO: do we wait for all workers to respond or just go ahead and
// // respond after a few seconds with the understanding that some workers
// // might be out of date?
// // TODO: the 4 seconds here is completely arbitrary
// std::this_thread::sleep_for(std::chrono::seconds(4));
// }
// return 0;
// };
// }
prometheus_exposer = std::make_unique<prometheus::Exposer>(prometheus_url, 2, callbacks);
prometheus_exposer->RegisterCollectable(prometheus_registry); prometheus_exposer->RegisterCollectable(prometheus_registry);
// Import topics are only enabled if Prometheus is enabled, because we don't care
// to get imported metrics if we're just going to drop them on the floor.
auto topics = import_topics;
if ( auto env = getenv("ZEEK_METRICS_IMPORT_TOPICS") ) {
topics = util::split(std::string{env}, ":");
}
else if ( auto env = getenv("BROKER_METRICS_IMPORT_TOPICS") ) {
// Remove this in v7.1 when the Broker variables are removed
reporter->Warning("BROKER_METRICS_IMPORT_TOPICS is deprecated, use ZEEK_METRICS_IMPORT_TOPICS.");
topics = util::split(std::string{env}, ":");
}
else {
auto script_topics = id::find_val("Telemetry::metrics_import_topics")->AsVectorVal();
if ( script_topics->Size() == 0 )
// Remove this in v7.1 when the Broker variables are removed
script_topics = id::find_val("Broker::metrics_import_topics")->AsVectorVal();
for ( int i = 0; i < script_topics->Size(); i++ )
topics.push_back(script_topics->StringValAt(i)->ToStdString());
}
for ( const auto& topic : topics ) {
broker_mgr->Subscribe(topic);
}
}
if ( export_topic.empty() ) {
if ( auto env = getenv("ZEEK_METRICS_EXPORT_TOPIC") )
export_topic = env;
else if ( auto env = getenv("BROKER_METRICS_EXPORT_TOPIC") ) {
// Remove this in v7.1 when the Broker variables are removed
reporter->Warning("BROKER_METRICS_EXPORT_TOPIC is deprecated, use ZEEK_METRICS_EXPORT_TOPIC.");
export_topic = env;
}
else {
auto script_topic = id::find_val("Telemetry::metrics_export_topic")->AsStringVal();
if ( script_topic->Len() == 0 )
// Remove this in v7.1 when the Broker variables are removed
script_topic = id::find_val("Broker::metrics_export_topic")->AsStringVal();
export_topic = script_topic->ToStdString();
}
}
if ( export_endpoint.empty() ) {
if ( auto env = getenv("ZEEK_METRICS_ENDPOINT_NAME") )
export_endpoint = env;
else if ( auto env = getenv("BROKER_METRICS_ENDPOINT_NAME") ) {
// Remove this in v7.1 when the Broker variables are removed
reporter->Warning("BROKER_METRICS_ENDPOINT_NAME is deprecated, use ZEEK_METRICS_ENDPOINT_NAME.");
export_endpoint = env;
}
else {
auto script_endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal();
if ( script_endpoint->Len() == 0 )
// Remove this in v7.1 when the Broker variables are removed
script_endpoint = id::find_val("Broker::metrics_export_endpoint_name")->AsStringVal();
export_endpoint = script_endpoint->ToStdString();
}
}
if ( export_interval == 0 ) {
if ( auto env = getenv("ZEEK_METRICS_EXPORT_INTERVAL") )
export_interval = std::strtod(env, nullptr);
else if ( auto env = getenv("BROKER_METRICS_EXPORT_INTERVAL") ) {
reporter->Warning("BROKER_METRICS_EXPORT_INTERVAL is deprecated, use ZEEK_METRICS_EXPORT_INTERVAL.");
export_interval = std::strtod(env, nullptr);
}
else {
export_interval = id::find_val("Telemetry::metrics_export_interval")->AsInterval();
if ( export_interval == 0 )
// Remove this in v7.1 when the Broker variables are removed
export_interval = id::find_val("Broker::metrics_export_interval")->AsInterval();
}
}
if ( export_prefixes.empty() ) {
if ( auto env = getenv("ZEEK_METRICS_EXPORT_PREFIXES") ) {
export_prefixes = util::split(std::string{env}, ":");
}
else if ( auto env = getenv("BROKER_METRICS_EXPORT_PREFIXES") ) {
reporter->Warning("BROKER_METRICS_EXPORT_PREFIXES is deprecated, use ZEEK_METRICS_EXPORT_PREFIXES.");
export_prefixes = util::split(std::string{env}, ":");
}
else {
auto script_topics = id::find_val("Telemetry::metrics_export_prefixes")->AsVectorVal();
if ( script_topics->Size() == 0 )
// Remove this in v7.1 when the Broker variables are removed
script_topics = id::find_val("Broker::metrics_export_prefixes")->AsVectorVal();
for ( int i = 0; i < script_topics->Size(); i++ )
export_prefixes.push_back(script_topics->StringValAt(i)->ToStdString());
}
}
// printf("topic: %s\n", export_topic.c_str());
// printf("endpoint: %s\n", export_endpoint.c_str());
// printf("interval: %f\n", export_interval);
// printf("prefixes: %zu\n", export_prefixes.size());
if ( ! export_topic.empty() && ! export_endpoint.empty() && export_interval > 0 ) {
printf("topic exporter configured\n");
} }
#ifdef HAVE_PROCESS_STAT_METRICS #ifdef HAVE_PROCESS_STAT_METRICS

View file

@ -364,11 +364,8 @@ private:
std::shared_ptr<DblGauge> cpu_gauge; std::shared_ptr<DblGauge> cpu_gauge;
std::shared_ptr<IntGauge> fds_gauge; std::shared_ptr<IntGauge> fds_gauge;
std::string export_topic; std::string endpoint_name;
std::vector<std::string> import_topics;
std::string export_endpoint;
std::vector<std::string> export_prefixes; std::vector<std::string> export_prefixes;
double export_interval = 0.0;
std::shared_ptr<prometheus::Registry> prometheus_registry; std::shared_ptr<prometheus::Registry> prometheus_registry;
std::unique_ptr<prometheus::Exposer> prometheus_exposer; std::unique_ptr<prometheus::Exposer> prometheus_exposer;

View file

@ -42,6 +42,7 @@ RecordValPtr MetricFamily::GetMetricOptsRecord() const {
record_val->Assign(is_total_idx, val_mgr->Bool(is_sum)); record_val->Assign(is_total_idx, val_mgr->Bool(is_sum));
auto label_names_vec = make_intrusive<zeek::VectorVal>(string_vec_type); auto label_names_vec = make_intrusive<zeek::VectorVal>(string_vec_type);
label_names_vec->Append(make_intrusive<StringVal>("endpoint"));
for ( const auto& lbl : labels ) for ( const auto& lbl : labels )
label_names_vec->Append(make_intrusive<StringVal>(lbl)); label_names_vec->Append(make_intrusive<StringVal>(lbl));
@ -68,8 +69,9 @@ prometheus::Labels MetricFamily::BuildPrometheusLabels(Span<const LabelView> lab
} }
if ( ! found_endpoint ) { if ( ! found_endpoint ) {
auto endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal(); auto endpoint = id::find_val("Telemetry::metrics_endpoint_name")->AsStringVal();
p_labels.emplace("endpoint", endpoint->ToStdString()); if ( endpoint && endpoint->Len() > 0 )
p_labels.emplace("endpoint", endpoint->ToStdString());
} }
return p_labels; return p_labels;

View file

@ -811,6 +811,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
RecordType::InitPostScript(); RecordType::InitPostScript();
telemetry_mgr->InitPostScript();
iosource_mgr->InitPostScript(); iosource_mgr->InitPostScript();
log_mgr->InitPostScript(); log_mgr->InitPostScript();
plugin_mgr->InitPostScript(); plugin_mgr->InitPostScript();
@ -819,10 +820,6 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
timer_mgr->InitPostScript(); timer_mgr->InitPostScript();
event_mgr.InitPostScript(); event_mgr.InitPostScript();
// telemetry_mgr has be initialized after broker manager since it might
// register for a topic and would fail to do so otherwise.
telemetry_mgr->InitPostScript();
if ( supervisor_mgr ) if ( supervisor_mgr )
supervisor_mgr->InitPostScript(); supervisor_mgr->InitPostScript();

View file

@ -33,6 +33,8 @@ redef Cluster::nodes = {
@load base/frameworks/telemetry @load base/frameworks/telemetry
@load base/utils/active-http @load base/utils/active-http
@if ( Cluster::node == "manager-1" )
# Query the Prometheus endpoint using ActiveHTTP for testing, oh my. # Query the Prometheus endpoint using ActiveHTTP for testing, oh my.
event run_test() event run_test()
{ {
@ -67,7 +69,6 @@ event run_test()
} }
} }
@if ( Cluster::node == "manager-1" )
# Use a dynamic metrics port for testing to avoid colliding on 9911/tcp # Use a dynamic metrics port for testing to avoid colliding on 9911/tcp
# when running tests in parallel. # when running tests in parallel.
global orig_metrics_port = Telemetry::metrics_port; global orig_metrics_port = Telemetry::metrics_port;
@ -80,9 +81,8 @@ event zeek_init()
event Cluster::Experimental::cluster_started() event Cluster::Experimental::cluster_started()
{ {
# Run the test once all nodes are up and metrics_export_interval # Run the test once all nodes are up
# has passed at least once. schedule 2 secs { run_test() };
schedule 2 * Telemetry::metrics_export_interval { run_test() };
} }
@endif @endif