From e93e4cc26d0da433b3cc4f35effd7b0e29307442 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Wed, 13 Mar 2024 15:28:48 -0700 Subject: [PATCH] Add a services.json endpoint for Prometheus service discovery --- scripts/base/frameworks/cluster/main.zeek | 26 ++++++ .../frameworks/telemetry/prometheus.zeek | 4 +- src/telemetry/Manager.cc | 87 +++++++++++++------ src/telemetry/Manager.h | 6 ++ .../Baseline/language.init-integration/out | 10 +-- .../manager-1..stdout | 3 +- .../manager-1.services.out | 5 ++ .../zeek.bare-1.node.out | 2 +- .../zeek.bare-32.node.out | 2 +- .../frameworks/telemetry/prometheus.zeek | 64 +++++++------- 10 files changed, 140 insertions(+), 69 deletions(-) create mode 100644 testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1.services.out diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 8f2a790db8..e3e45a0cbc 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -196,6 +196,10 @@ export { ## A unique identifier assigned to the node by the broker framework. ## This field is only set while a node is connected. id: string &optional; + ## The port used to expose metrics to Prometheus. Setting this in a cluster + ## configuration will override the setting for Telemetry::metrics_port for + ## the node. + metrics_port: port &optional; }; ## Record to represent a cluster node including its name. @@ -218,6 +222,14 @@ export { ## Returns: The :zeek:type:`Cluster::NodeType` the calling node acts as. global local_node_type: function(): NodeType; + ## This function can be called at any time to determine the configured + ## metrics port for Prometheus being used by current Zeek instance. If + ## :zeek:id:`Cluster::is_enabled` returns false or the node isn't found, + ## ``0/unknown`` is returned. + ## + ## Returns: The metrics port used by the calling node. + global local_node_metrics_port: function(): port; + ## The cluster layout definition. This should be placed into a filter ## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be ## automatically loaded if the CLUSTER_NODE environment variable is set. @@ -338,6 +350,20 @@ function local_node_type(): NodeType return nodes[node]$node_type; } +function local_node_metrics_port(): port + { + if ( ! is_enabled() ) + return 0/unknown; + + if ( node !in nodes ) + return 0/unknown; + + if ( ! nodes[node]?$metrics_port ) + return 0/unknown; + + return nodes[node]$metrics_port; + } + function node_topic(name: string): string { return node_topic_prefix + name + "/"; diff --git a/scripts/policy/frameworks/telemetry/prometheus.zeek b/scripts/policy/frameworks/telemetry/prometheus.zeek index 990ab937b8..8110e9a672 100644 --- a/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -15,8 +15,8 @@ redef Telemetry::metrics_endpoint_name = Cluster::node; -@if ( Cluster::local_node_type() == Cluster::MANAGER ) -redef Telemetry::metrics_port = 9911/tcp; +@if ( Cluster::local_node_metrics_port() != 0/unknown ) +redef Telemetry::metrics_port = Cluster::local_node_metrics_port(); @endif @endif diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 19783db6a2..bf4364ead1 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -19,6 +19,7 @@ #include "zeek/telemetry/ProcessStats.h" #include "zeek/telemetry/Timer.h" #include "zeek/telemetry/telemetry.bif.h" +#include "zeek/threading/formatters/detail/json.h" namespace zeek::telemetry { @@ -40,34 +41,41 @@ void Manager::InitPostScript() { } if ( ! prometheus_url.empty() ) { - printf("prometheus configured: %s\n", prometheus_url.c_str()); - CivetCallbacks* callbacks = nullptr; - // if ( ! request_topic.empty() ) { - // callbacks = new CivetCallbacks(); - // callbacks->begin_request = [](struct mg_connection* conn) -> int { - // printf("begin_request\n"); - // // We only care about requests made to the /metrics endpoint. There are other request - // // made to the server that we can ignore, such as favicon.ico. - // auto req_info = mg_get_request_info(conn); - // if ( strcmp(req_info->request_uri, "/metrics") == 0 ) { - // // send a request to a topic for data from workers - // printf("posting event\n"); - // broker_mgr->PublishEvent(telemetry_mgr->RequestTopic(), "Telemetry::remote_request", - // broker::vector{}); + auto local_node_name = id::find_val("Cluster::node")->AsStringVal(); + if ( local_node_name->Len() > 0 ) { + auto cluster_nodes = id::find_val("Cluster::nodes")->AsTableVal(); + auto local_node = cluster_nodes->Find(IntrusivePtr{NewRef{}, local_node_name}); + auto local_node_type = local_node->AsRecordVal()->GetField("node_type")->Get(); - // // wait a few seconds for workers to respond - // // TODO: do we wait for all workers to respond or just go ahead and - // // respond after a few seconds with the understanding that some workers - // // might be out of date? - // // TODO: the 4 seconds here is completely arbitrary - // std::this_thread::sleep_for(std::chrono::seconds(4)); - // } - // return 0; - // }; - // } + static auto node_type_type = id::find_type("Cluster::NodeType")->AsEnumType(); + static auto manager_type = node_type_type->Lookup("Cluster", "MANAGER"); + + if ( local_node_type == manager_type ) { + callbacks = new CivetCallbacks(); + callbacks->begin_request = [](struct mg_connection* conn) -> int { + // Handle the services.json request ourselves by building up a response based on + // the cluster configuration. + auto req_info = mg_get_request_info(conn); + if ( strcmp(req_info->request_uri, "/services.json") == 0 ) { + // send a request to a topic for data from workers + auto json = telemetry_mgr->GetClusterJson(); + mg_send_http_ok(conn, "application/json", static_cast(json.size())); + mg_write(conn, json.data(), json.size()); + return 1; + } + + return 0; + }; + } + } + + try { + prometheus_exposer = std::make_unique(prometheus_url, 2, callbacks); + } catch ( const CivetException& exc ) { + reporter->FatalError("Failed to setup Prometheus endpoint: %s\n", exc.what()); + } - prometheus_exposer = std::make_unique(prometheus_url, 2, callbacks); prometheus_exposer->RegisterCollectable(prometheus_registry); } @@ -174,6 +182,35 @@ ValPtr Manager::CollectHistogramMetrics(std::string_view prefix_pattern, std::st return ret_val; } +std::string Manager::GetClusterJson() const { + rapidjson::StringBuffer buffer; + json::detail::NullDoubleWriter writer(buffer); + + writer.StartArray(); + writer.StartObject(); + + writer.Key("targets"); + writer.StartArray(); + auto cluster_nodes = id::find_val("Cluster::nodes")->AsTableVal()->ToMap(); + for ( const auto& [idx, value] : cluster_nodes ) { + auto node = value->AsRecordVal(); + auto ip = node->GetField("ip"); + auto port = node->GetField("metrics_port"); + if ( ip && port && port->Port() != 0 ) + writer.String(util::fmt("%s:%d", ip->Get().AsString().c_str(), port->Port())); + } + writer.EndArray(); + + writer.Key("labels"); + writer.StartObject(); + writer.EndObject(); + + writer.EndObject(); + writer.EndArray(); + + return buffer.GetString(); +} + } // namespace zeek::telemetry // -- unit tests --------------------------------------------------------------- diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index ebcfdc5fb3..0567def827 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -331,6 +331,12 @@ public: return nullptr; } + /** + * @return A JSON description of the cluster configuration for reporting + * to Prometheus for service discovery requests. + */ + std::string GetClusterJson() const; + protected: template static auto WithLabelNames(Span xs, F continuation) { diff --git a/testing/btest/Baseline/language.init-integration/out b/testing/btest/Baseline/language.init-integration/out index bf0bf88bb3..12ca8b84ce 100644 --- a/testing/btest/Baseline/language.init-integration/out +++ b/testing/btest/Baseline/language.init-integration/out @@ -13,15 +13,15 @@ init_key2 in state2: 1 [worker-1] = [node_type=Cluster::WORKER, ip=127.0.0.1, p=5/udp, manager=manager-1] } { -[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=] +[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=, metrics_port=] } { -[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=], -[worker-5] = [node_type=Cluster::WORKER, ip=3.4.5.6, zone_id=, p=15/tcp, interface=, manager=, time_machine=, id=] +[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=, metrics_port=], +[worker-5] = [node_type=Cluster::WORKER, ip=3.4.5.6, zone_id=, p=15/tcp, interface=, manager=, time_machine=, id=, metrics_port=] } { -[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=], -[worker-6] = [node_type=Cluster::WORKER, ip=4.5.6.7, zone_id=, p=17/udp, interface=, manager=, time_machine=, id=] +[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=, manager=, time_machine=, id=, metrics_port=], +[worker-6] = [node_type=Cluster::WORKER, ip=4.5.6.7, zone_id=, p=17/udp, interface=, manager=, time_machine=, id=, metrics_port=] } { [3.0, 4] diff --git a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1..stdout b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1..stdout index f0734f656c..4fdec13f30 100644 --- a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1..stdout +++ b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1..stdout @@ -1,3 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -manager-1, original Broker::metrics_port, 9911/tcp -[endpoint="logger-1", endpoint="manager-1", endpoint="proxy-1", endpoint="worker-1"] +manager-1, Telemetry::metrics_port from cluster config, 1028/tcp diff --git a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1.services.out b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1.services.out new file mode 100644 index 0000000000..29931892ad --- /dev/null +++ b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.prometheus/manager-1.services.out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +endpoint="proxy-1" + +endpoint="worker-1" + diff --git a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out index 4c79e06437..8837c1f15f 100644 --- a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out +++ b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-1.node.out @@ -1,5 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. supervised node zeek_init() 1024, cluster_nodes! -[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=] +[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=, metrics_port=] supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out index 4c79e06437..8837c1f15f 100644 --- a/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out +++ b/testing/btest/Baseline/supervisor.large-cluster/zeek.bare-32.node.out @@ -1,5 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. supervised node zeek_init() 1024, cluster_nodes! -[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=] +[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=, manager=, time_machine=, id=, metrics_port=] supervised node zeek_done() diff --git a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek index 1dc32a0279..605593a7ab 100644 --- a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -1,4 +1,4 @@ -# @TEST-DOC: Query the Prometheus endpoint on 9911 and smoke check that zeek_version_info{...} is contained in the response for all cluster nodes. +# @TEST-DOC: Query the Prometheus endpoint and smoke check that zeek_version_info{...} is contained in the response for all cluster nodes. # Note compilable to C++ due to globals being initialized to a record that # has an opaque type as a field. # @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" @@ -18,16 +18,35 @@ # @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT # @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff manager-1/services.out @TEST-START-FILE cluster-layout.zeek redef Cluster::nodes = { - ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], - ["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"], - ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1"], - ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4")), $manager="manager-1"], + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1")), $metrics_port=1028/tcp], + ["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $metrics_port=1029/tcp], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $metrics_port=1030/tcp], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4")), $manager="manager-1", $metrics_port=1031/tcp], }; @TEST-END-FILE +@TEST-START-FILE request-services.sh +#!/bin/sh + +# This script makes repeat curl requests to find all of the metrics data from the +# hosts listed in the services output from the manager, and outputs it all into a +# single file. + +services_url=$1 +output_file=$2 + +for host in $(curl -s -m 5 ${services_url} | jq '.[0].targets.[]'); do + host=$(echo ${host} | sed 's/"//g') + metrics=$(curl -s -m 5 http://${host}/metrics) + version_info=$(echo ${metrics} | grep -Eo "zeek_version_info\{[^}]+\}" | grep -o 'endpoint=\"[^"]*\"') + echo ${version_info} >> ${output_file}; +done +@TEST-END-FILE + @load policy/frameworks/cluster/experimental @load policy/frameworks/telemetry/prometheus @load base/frameworks/telemetry @@ -38,45 +57,24 @@ redef Cluster::nodes = { # Query the Prometheus endpoint using ActiveHTTP for testing, oh my. event run_test() { - local url = fmt("http://localhost:%s/metrics", port_to_count(Telemetry::metrics_port)); - when [url] ( local response = ActiveHTTP::request([$url=url]) ) - { - if ( response$code != 200 ) - { - print fmt("ERROR: %s", response); - exit(1); - } - - # Grumble grumble, ActiveHTTP actually joins away the \n characters - # from the response. Not sure how that's helpful. We simply - # grep out the zeek_version_info{...} endpoint="..." pieces and - # expect one for each node to exist as a smoke test. - local version_infos = find_all(response$body, /zeek_version_info\{[^}]+\}/, 0); - local endpoints: vector of string; - for ( info in version_infos ) - for ( ep in find_all(info, /endpoint=\"[^"]+\"/)) - endpoints += ep; - - print sort(endpoints, strcmp); - - terminate(); - } - timeout 10sec + local services_url = fmt("http://localhost:%s/services.json", port_to_count(Telemetry::metrics_port)); + local result = system(fmt("sh ../request-services.sh %s %s", services_url, "services.out")); + if ( result != 0 ) { # This is bad. - print "ERROR: HTTP request timeout"; + print "ERROR: Failed to request service information"; exit(1); } + + terminate(); } # Use a dynamic metrics port for testing to avoid colliding on 9911/tcp # when running tests in parallel. -global orig_metrics_port = Telemetry::metrics_port; -redef Telemetry::metrics_port = to_port(getenv("BROKER_TEST_METRICS_PORT")); event zeek_init() { - print Cluster::node, "original Telemetry::metrics_port", orig_metrics_port; + print Cluster::node, "Telemetry::metrics_port from cluster config", Telemetry::metrics_port; } event Cluster::Experimental::cluster_started()