From 5c20d81a6aaed5ed20ec25dc99d2bd994a14d4ed Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 12 Mar 2024 09:56:56 -0700 Subject: [PATCH] Remove everything related to aggregation --- .../base/frameworks/telemetry/options.zeek | 23 +-- .../frameworks/telemetry/prometheus.zeek | 8 +- src/telemetry/CMakeLists.txt | 6 + src/telemetry/Counter.h | 3 +- src/telemetry/Gauge.h | 3 +- src/telemetry/Histogram.h | 3 +- src/telemetry/Manager.cc | 141 ++++-------------- src/telemetry/Manager.h | 5 +- src/telemetry/MetricFamily.cc | 6 +- .../frameworks/telemetry/prometheus.zeek | 8 +- 10 files changed, 51 insertions(+), 155 deletions(-) diff --git a/scripts/base/frameworks/telemetry/options.zeek b/scripts/base/frameworks/telemetry/options.zeek index 99f4a2d194..fd3f7f91ed 100644 --- a/scripts/base/frameworks/telemetry/options.zeek +++ b/scripts/base/frameworks/telemetry/options.zeek @@ -7,29 +7,10 @@ module Telemetry; export { ## Port used to make metric data available to Prometheus scrapers via - ## HTTP. Zeek overrides any value provided in zeek_init or earlier at + ## HTTP. Zeek overrides any value provided in zeek_init or earlier at ## startup if the environment variable ZEEK_METRICS_PORT is defined. const metrics_port = 0/unknown &redef; - ## Frequency for publishing scraped metrics to the target topic. Zeek - ## overrides any value provided in zeek_init or earlier at startup if - ## the environment variable ZEEK_METRICS_EXPORT_INTERVAL is defined. - const metrics_export_interval = 1 sec &redef; - - ## Target topic for the metrics. Setting a non-empty string starts the - ## periodic publishing of local metrics. Zeek overrides any value - ## provided in zeek_init or earlier at startup if the environment - ## variable ZEEK_METRICS_EXPORT_TOPIC is defined. - const metrics_export_topic = "" &redef; - - ## Topics for the telmeetry framework for collecting metrics from other - ## peers in the network and including them in the output. Has no effect - ## when not exporting the metrics to Prometheus. - ## - ## Zeek overrides any value provided in zeek_init or earlier at startup - ## if the environment variable ZEEK_METRICS_IMPORT_TOPICS is defined. - const metrics_import_topics: vector of string = vector() &redef; - ## ID for the metrics exporter. When setting a target topic for the ## exporter, Broker sets this option to the suffix of the new topic ## *unless* the ID is a non-empty string. Since setting a topic starts @@ -38,7 +19,7 @@ export { ## good-enough ID. Zeek overrides any value provided in zeek_init or ## earlier at startup if the environment variable ## ZEEK_METRICS_ENDPOINT_NAME is defined. - const metrics_export_endpoint_name = "" &redef; + const metrics_endpoint_name = "" &redef; ## Selects prefixes from the local metrics. Only metrics with prefixes ## listed in this variable are included when publishing local metrics. diff --git a/scripts/policy/frameworks/telemetry/prometheus.zeek b/scripts/policy/frameworks/telemetry/prometheus.zeek index 65ba8470bd..990ab937b8 100644 --- a/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -13,16 +13,10 @@ @if ( Cluster::is_enabled() ) -# Use Cluster::node as "endpoint" label -redef Telemetry::metrics_export_endpoint_name = Cluster::node; +redef Telemetry::metrics_endpoint_name = Cluster::node; -# The manager opens port 9911 and imports metrics from all nodes by default. @if ( Cluster::local_node_type() == Cluster::MANAGER ) redef Telemetry::metrics_port = 9911/tcp; -redef Telemetry::metrics_import_topics = vector("zeek/cluster/metrics/"); - -@else -redef Telemetry::metrics_export_topic = "zeek/cluster/metrics/"; @endif @endif diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt index ed7e5284ee..76f3da9e38 100644 --- a/src/telemetry/CMakeLists.txt +++ b/src/telemetry/CMakeLists.txt @@ -6,3 +6,9 @@ zeek_add_subdir_library( ProcessStats.cc BIFS telemetry.bif) + +# We don't need to include the civetweb headers across the whole project, only +# here in the telemetry framework. +target_include_directories( + zeek_telemetry_obj BEFORE + PUBLIC ${PROJECT_SOURCE_DIR}/auxil/prometheus-cpp/3rdparty/civetweb/include) diff --git a/src/telemetry/Counter.h b/src/telemetry/Counter.h index 4db62f94ff..812fe8ad7f 100644 --- a/src/telemetry/Counter.h +++ b/src/telemetry/Counter.h @@ -144,8 +144,7 @@ public: for ( const auto& ctr : counters ) { auto label_values_vec = make_intrusive(string_vec_type); for ( const auto& [label_key, label] : ctr->Labels() ) - if ( label_key != "endpoint" ) - label_values_vec->Append(make_intrusive(label)); + label_values_vec->Append(make_intrusive(label)); auto r = make_intrusive(metric_record_type); r->Assign(labels_idx, label_values_vec); diff --git a/src/telemetry/Gauge.h b/src/telemetry/Gauge.h index 8766628d9f..083df5230f 100644 --- a/src/telemetry/Gauge.h +++ b/src/telemetry/Gauge.h @@ -168,8 +168,7 @@ public: for ( const auto& g : gauges ) { auto label_values_vec = make_intrusive(string_vec_type); for ( const auto& [label_key, label] : g->Labels() ) - if ( label_key != "endpoint" ) - label_values_vec->Append(make_intrusive(label)); + label_values_vec->Append(make_intrusive(label)); auto r = make_intrusive(metric_record_type); r->Assign(labels_idx, label_values_vec); diff --git a/src/telemetry/Histogram.h b/src/telemetry/Histogram.h index a9bc302709..846a871fcc 100644 --- a/src/telemetry/Histogram.h +++ b/src/telemetry/Histogram.h @@ -145,8 +145,7 @@ public: for ( const auto& h : histograms ) { auto label_values_vec = make_intrusive(string_vec_type); for ( const auto& [label_key, label] : h->Labels() ) - if ( label_key != "endpoint" ) - label_values_vec->Append(make_intrusive(label)); + label_values_vec->Append(make_intrusive(label)); auto r = make_intrusive(histogram_metric_type); r->Assign(labels_idx, label_values_vec); diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 6ffd1a7dc7..d714fb0b29 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -14,6 +14,8 @@ #include "zeek/telemetry/Timer.h" #include "zeek/telemetry/telemetry.bif.h" +#include "CivetServer.h" + namespace zeek::telemetry { Manager::Manager() { prometheus_registry = std::make_shared(); } @@ -21,14 +23,10 @@ Manager::Manager() { prometheus_registry = std::make_sharedWarning("BROKER_METRICS_PORT is deprecated, use ZEEK_METRICS_PORT."); - prometheus_url = util::fmt("localhost:%s", env); - } else { auto metrics_port = id::find_val("Telemetry::metrics_port")->AsPortVal(); if ( metrics_port->Port() == 0 ) @@ -40,114 +38,35 @@ void Manager::InitPostScript() { } if ( ! prometheus_url.empty() ) { - printf("prometheus configured\n"); + printf("prometheus configured: %s\n", prometheus_url.c_str()); - prometheus_exposer = std::make_unique(prometheus_url); + CivetCallbacks* callbacks = nullptr; + // if ( ! request_topic.empty() ) { + // callbacks = new CivetCallbacks(); + // callbacks->begin_request = [](struct mg_connection* conn) -> int { + // printf("begin_request\n"); + // // We only care about requests made to the /metrics endpoint. There are other request + // // made to the server that we can ignore, such as favicon.ico. + // auto req_info = mg_get_request_info(conn); + // if ( strcmp(req_info->request_uri, "/metrics") == 0 ) { + // // send a request to a topic for data from workers + // printf("posting event\n"); + // broker_mgr->PublishEvent(telemetry_mgr->RequestTopic(), "Telemetry::remote_request", + // broker::vector{}); + + // // wait a few seconds for workers to respond + // // TODO: do we wait for all workers to respond or just go ahead and + // // respond after a few seconds with the understanding that some workers + // // might be out of date? + // // TODO: the 4 seconds here is completely arbitrary + // std::this_thread::sleep_for(std::chrono::seconds(4)); + // } + // return 0; + // }; + // } + + prometheus_exposer = std::make_unique(prometheus_url, 2, callbacks); prometheus_exposer->RegisterCollectable(prometheus_registry); - - // Import topics are only enabled if Prometheus is enabled, because we don't care - // to get imported metrics if we're just going to drop them on the floor. - auto topics = import_topics; - if ( auto env = getenv("ZEEK_METRICS_IMPORT_TOPICS") ) { - topics = util::split(std::string{env}, ":"); - } - else if ( auto env = getenv("BROKER_METRICS_IMPORT_TOPICS") ) { - // Remove this in v7.1 when the Broker variables are removed - reporter->Warning("BROKER_METRICS_IMPORT_TOPICS is deprecated, use ZEEK_METRICS_IMPORT_TOPICS."); - topics = util::split(std::string{env}, ":"); - } - else { - auto script_topics = id::find_val("Telemetry::metrics_import_topics")->AsVectorVal(); - if ( script_topics->Size() == 0 ) - // Remove this in v7.1 when the Broker variables are removed - script_topics = id::find_val("Broker::metrics_import_topics")->AsVectorVal(); - - for ( int i = 0; i < script_topics->Size(); i++ ) - topics.push_back(script_topics->StringValAt(i)->ToStdString()); - } - - for ( const auto& topic : topics ) { - broker_mgr->Subscribe(topic); - } - } - - if ( export_topic.empty() ) { - if ( auto env = getenv("ZEEK_METRICS_EXPORT_TOPIC") ) - export_topic = env; - else if ( auto env = getenv("BROKER_METRICS_EXPORT_TOPIC") ) { - // Remove this in v7.1 when the Broker variables are removed - reporter->Warning("BROKER_METRICS_EXPORT_TOPIC is deprecated, use ZEEK_METRICS_EXPORT_TOPIC."); - export_topic = env; - } - else { - auto script_topic = id::find_val("Telemetry::metrics_export_topic")->AsStringVal(); - if ( script_topic->Len() == 0 ) - // Remove this in v7.1 when the Broker variables are removed - script_topic = id::find_val("Broker::metrics_export_topic")->AsStringVal(); - - export_topic = script_topic->ToStdString(); - } - } - - if ( export_endpoint.empty() ) { - if ( auto env = getenv("ZEEK_METRICS_ENDPOINT_NAME") ) - export_endpoint = env; - else if ( auto env = getenv("BROKER_METRICS_ENDPOINT_NAME") ) { - // Remove this in v7.1 when the Broker variables are removed - reporter->Warning("BROKER_METRICS_ENDPOINT_NAME is deprecated, use ZEEK_METRICS_ENDPOINT_NAME."); - export_endpoint = env; - } - else { - auto script_endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal(); - if ( script_endpoint->Len() == 0 ) - // Remove this in v7.1 when the Broker variables are removed - script_endpoint = id::find_val("Broker::metrics_export_endpoint_name")->AsStringVal(); - - export_endpoint = script_endpoint->ToStdString(); - } - } - - if ( export_interval == 0 ) { - if ( auto env = getenv("ZEEK_METRICS_EXPORT_INTERVAL") ) - export_interval = std::strtod(env, nullptr); - else if ( auto env = getenv("BROKER_METRICS_EXPORT_INTERVAL") ) { - reporter->Warning("BROKER_METRICS_EXPORT_INTERVAL is deprecated, use ZEEK_METRICS_EXPORT_INTERVAL."); - export_interval = std::strtod(env, nullptr); - } - else { - export_interval = id::find_val("Telemetry::metrics_export_interval")->AsInterval(); - if ( export_interval == 0 ) - // Remove this in v7.1 when the Broker variables are removed - export_interval = id::find_val("Broker::metrics_export_interval")->AsInterval(); - } - } - - if ( export_prefixes.empty() ) { - if ( auto env = getenv("ZEEK_METRICS_EXPORT_PREFIXES") ) { - export_prefixes = util::split(std::string{env}, ":"); - } - else if ( auto env = getenv("BROKER_METRICS_EXPORT_PREFIXES") ) { - reporter->Warning("BROKER_METRICS_EXPORT_PREFIXES is deprecated, use ZEEK_METRICS_EXPORT_PREFIXES."); - export_prefixes = util::split(std::string{env}, ":"); - } - else { - auto script_topics = id::find_val("Telemetry::metrics_export_prefixes")->AsVectorVal(); - if ( script_topics->Size() == 0 ) - // Remove this in v7.1 when the Broker variables are removed - script_topics = id::find_val("Broker::metrics_export_prefixes")->AsVectorVal(); - - for ( int i = 0; i < script_topics->Size(); i++ ) - export_prefixes.push_back(script_topics->StringValAt(i)->ToStdString()); - } - } - - // printf("topic: %s\n", export_topic.c_str()); - // printf("endpoint: %s\n", export_endpoint.c_str()); - // printf("interval: %f\n", export_interval); - // printf("prefixes: %zu\n", export_prefixes.size()); - - if ( ! export_topic.empty() && ! export_endpoint.empty() && export_interval > 0 ) { - printf("topic exporter configured\n"); } #ifdef HAVE_PROCESS_STAT_METRICS diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index d2f09f0309..9971210787 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -364,11 +364,8 @@ private: std::shared_ptr cpu_gauge; std::shared_ptr fds_gauge; - std::string export_topic; - std::vector import_topics; - std::string export_endpoint; + std::string endpoint_name; std::vector export_prefixes; - double export_interval = 0.0; std::shared_ptr prometheus_registry; std::unique_ptr prometheus_exposer; diff --git a/src/telemetry/MetricFamily.cc b/src/telemetry/MetricFamily.cc index 4ddb234fc0..107053ece2 100644 --- a/src/telemetry/MetricFamily.cc +++ b/src/telemetry/MetricFamily.cc @@ -42,6 +42,7 @@ RecordValPtr MetricFamily::GetMetricOptsRecord() const { record_val->Assign(is_total_idx, val_mgr->Bool(is_sum)); auto label_names_vec = make_intrusive(string_vec_type); + label_names_vec->Append(make_intrusive("endpoint")); for ( const auto& lbl : labels ) label_names_vec->Append(make_intrusive(lbl)); @@ -68,8 +69,9 @@ prometheus::Labels MetricFamily::BuildPrometheusLabels(Span lab } if ( ! found_endpoint ) { - auto endpoint = id::find_val("Telemetry::metrics_export_endpoint_name")->AsStringVal(); - p_labels.emplace("endpoint", endpoint->ToStdString()); + auto endpoint = id::find_val("Telemetry::metrics_endpoint_name")->AsStringVal(); + if ( endpoint && endpoint->Len() > 0 ) + p_labels.emplace("endpoint", endpoint->ToStdString()); } return p_labels; diff --git a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek index 2494358ec2..1dc32a0279 100644 --- a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -33,6 +33,8 @@ redef Cluster::nodes = { @load base/frameworks/telemetry @load base/utils/active-http +@if ( Cluster::node == "manager-1" ) + # Query the Prometheus endpoint using ActiveHTTP for testing, oh my. event run_test() { @@ -67,7 +69,6 @@ event run_test() } } -@if ( Cluster::node == "manager-1" ) # Use a dynamic metrics port for testing to avoid colliding on 9911/tcp # when running tests in parallel. global orig_metrics_port = Telemetry::metrics_port; @@ -80,9 +81,8 @@ event zeek_init() event Cluster::Experimental::cluster_started() { - # Run the test once all nodes are up and metrics_export_interval - # has passed at least once. - schedule 2 * Telemetry::metrics_export_interval { run_test() }; + # Run the test once all nodes are up + schedule 2 secs { run_test() }; } @endif