Add a services.json endpoint for Prometheus service discovery

This commit is contained in:
Tim Wojtulewicz 2024-03-13 15:28:48 -07:00
parent abb84db6c8
commit e93e4cc26d
10 changed files with 140 additions and 69 deletions

View file

@ -196,6 +196,10 @@ export {
## A unique identifier assigned to the node by the broker framework. ## A unique identifier assigned to the node by the broker framework.
## This field is only set while a node is connected. ## This field is only set while a node is connected.
id: string &optional; id: string &optional;
## The port used to expose metrics to Prometheus. Setting this in a cluster
## configuration will override the setting for Telemetry::metrics_port for
## the node.
metrics_port: port &optional;
}; };
## Record to represent a cluster node including its name. ## Record to represent a cluster node including its name.
@ -218,6 +222,14 @@ export {
## Returns: The :zeek:type:`Cluster::NodeType` the calling node acts as. ## Returns: The :zeek:type:`Cluster::NodeType` the calling node acts as.
global local_node_type: function(): NodeType; global local_node_type: function(): NodeType;
## This function can be called at any time to determine the configured
## metrics port for Prometheus being used by current Zeek instance. If
## :zeek:id:`Cluster::is_enabled` returns false or the node isn't found,
## ``0/unknown`` is returned.
##
## Returns: The metrics port used by the calling node.
global local_node_metrics_port: function(): port;
## The cluster layout definition. This should be placed into a filter ## The cluster layout definition. This should be placed into a filter
## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be ## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be
## automatically loaded if the CLUSTER_NODE environment variable is set. ## automatically loaded if the CLUSTER_NODE environment variable is set.
@ -338,6 +350,20 @@ function local_node_type(): NodeType
return nodes[node]$node_type; return nodes[node]$node_type;
} }
function local_node_metrics_port(): port
{
if ( ! is_enabled() )
return 0/unknown;
if ( node !in nodes )
return 0/unknown;
if ( ! nodes[node]?$metrics_port )
return 0/unknown;
return nodes[node]$metrics_port;
}
function node_topic(name: string): string function node_topic(name: string): string
{ {
return node_topic_prefix + name + "/"; return node_topic_prefix + name + "/";

View file

@ -15,8 +15,8 @@
redef Telemetry::metrics_endpoint_name = Cluster::node; redef Telemetry::metrics_endpoint_name = Cluster::node;
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_metrics_port() != 0/unknown )
redef Telemetry::metrics_port = 9911/tcp; redef Telemetry::metrics_port = Cluster::local_node_metrics_port();
@endif @endif
@endif @endif

View file

@ -19,6 +19,7 @@
#include "zeek/telemetry/ProcessStats.h" #include "zeek/telemetry/ProcessStats.h"
#include "zeek/telemetry/Timer.h" #include "zeek/telemetry/Timer.h"
#include "zeek/telemetry/telemetry.bif.h" #include "zeek/telemetry/telemetry.bif.h"
#include "zeek/threading/formatters/detail/json.h"
namespace zeek::telemetry { namespace zeek::telemetry {
@ -40,34 +41,41 @@ void Manager::InitPostScript() {
} }
if ( ! prometheus_url.empty() ) { if ( ! prometheus_url.empty() ) {
printf("prometheus configured: %s\n", prometheus_url.c_str());
CivetCallbacks* callbacks = nullptr; CivetCallbacks* callbacks = nullptr;
// if ( ! request_topic.empty() ) { auto local_node_name = id::find_val("Cluster::node")->AsStringVal();
// callbacks = new CivetCallbacks(); if ( local_node_name->Len() > 0 ) {
// callbacks->begin_request = [](struct mg_connection* conn) -> int { auto cluster_nodes = id::find_val("Cluster::nodes")->AsTableVal();
// printf("begin_request\n"); auto local_node = cluster_nodes->Find(IntrusivePtr<StringVal>{NewRef{}, local_node_name});
// // We only care about requests made to the /metrics endpoint. There are other request auto local_node_type = local_node->AsRecordVal()->GetField<EnumVal>("node_type")->Get();
// // made to the server that we can ignore, such as favicon.ico.
// auto req_info = mg_get_request_info(conn);
// if ( strcmp(req_info->request_uri, "/metrics") == 0 ) {
// // send a request to a topic for data from workers
// printf("posting event\n");
// broker_mgr->PublishEvent(telemetry_mgr->RequestTopic(), "Telemetry::remote_request",
// broker::vector{});
// // wait a few seconds for workers to respond static auto node_type_type = id::find_type("Cluster::NodeType")->AsEnumType();
// // TODO: do we wait for all workers to respond or just go ahead and static auto manager_type = node_type_type->Lookup("Cluster", "MANAGER");
// // respond after a few seconds with the understanding that some workers
// // might be out of date? if ( local_node_type == manager_type ) {
// // TODO: the 4 seconds here is completely arbitrary callbacks = new CivetCallbacks();
// std::this_thread::sleep_for(std::chrono::seconds(4)); callbacks->begin_request = [](struct mg_connection* conn) -> int {
// } // Handle the services.json request ourselves by building up a response based on
// return 0; // the cluster configuration.
// }; auto req_info = mg_get_request_info(conn);
// } if ( strcmp(req_info->request_uri, "/services.json") == 0 ) {
// send a request to a topic for data from workers
auto json = telemetry_mgr->GetClusterJson();
mg_send_http_ok(conn, "application/json", static_cast<long long>(json.size()));
mg_write(conn, json.data(), json.size());
return 1;
}
return 0;
};
}
}
try {
prometheus_exposer = std::make_unique<prometheus::Exposer>(prometheus_url, 2, callbacks);
} catch ( const CivetException& exc ) {
reporter->FatalError("Failed to setup Prometheus endpoint: %s\n", exc.what());
}
prometheus_exposer = std::make_unique<prometheus::Exposer>(prometheus_url, 2, callbacks);
prometheus_exposer->RegisterCollectable(prometheus_registry); prometheus_exposer->RegisterCollectable(prometheus_registry);
} }
@ -174,6 +182,35 @@ ValPtr Manager::CollectHistogramMetrics(std::string_view prefix_pattern, std::st
return ret_val; return ret_val;
} }
std::string Manager::GetClusterJson() const {
rapidjson::StringBuffer buffer;
json::detail::NullDoubleWriter writer(buffer);
writer.StartArray();
writer.StartObject();
writer.Key("targets");
writer.StartArray();
auto cluster_nodes = id::find_val("Cluster::nodes")->AsTableVal()->ToMap();
for ( const auto& [idx, value] : cluster_nodes ) {
auto node = value->AsRecordVal();
auto ip = node->GetField<AddrVal>("ip");
auto port = node->GetField<PortVal>("metrics_port");
if ( ip && port && port->Port() != 0 )
writer.String(util::fmt("%s:%d", ip->Get().AsString().c_str(), port->Port()));
}
writer.EndArray();
writer.Key("labels");
writer.StartObject();
writer.EndObject();
writer.EndObject();
writer.EndArray();
return buffer.GetString();
}
} // namespace zeek::telemetry } // namespace zeek::telemetry
// -- unit tests --------------------------------------------------------------- // -- unit tests ---------------------------------------------------------------

View file

@ -331,6 +331,12 @@ public:
return nullptr; return nullptr;
} }
/**
* @return A JSON description of the cluster configuration for reporting
* to Prometheus for service discovery requests.
*/
std::string GetClusterJson() const;
protected: protected:
template<class F> template<class F>
static auto WithLabelNames(Span<const LabelView> xs, F continuation) { static auto WithLabelNames(Span<const LabelView> xs, F continuation) {

View file

@ -13,15 +13,15 @@ init_key2 in state2: 1
[worker-1] = [node_type=Cluster::WORKER, ip=127.0.0.1, p=5/udp, manager=manager-1] [worker-1] = [node_type=Cluster::WORKER, ip=127.0.0.1, p=5/udp, manager=manager-1]
} }
{ {
[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>] [worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>]
} }
{ {
[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>], [worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>],
[worker-5] = [node_type=Cluster::WORKER, ip=3.4.5.6, zone_id=, p=15/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>] [worker-5] = [node_type=Cluster::WORKER, ip=3.4.5.6, zone_id=, p=15/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>]
} }
{ {
[worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>], [worker-4] = [node_type=Cluster::WORKER, ip=2.3.4.5, zone_id=, p=13/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>],
[worker-6] = [node_type=Cluster::WORKER, ip=4.5.6.7, zone_id=, p=17/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>] [worker-6] = [node_type=Cluster::WORKER, ip=4.5.6.7, zone_id=, p=17/udp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>]
} }
{ {
[3.0, 4] [3.0, 4]

View file

@ -1,3 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
manager-1, original Broker::metrics_port, 9911/tcp manager-1, Telemetry::metrics_port from cluster config, 1028/tcp
[endpoint="logger-1", endpoint="manager-1", endpoint="proxy-1", endpoint="worker-1"]

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
endpoint="proxy-1"
endpoint="worker-1"

View file

@ -1,5 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
supervised node zeek_init() supervised node zeek_init()
1024, cluster_nodes! 1024, cluster_nodes!
[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>] [node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>]
supervised node zeek_done() supervised node zeek_done()

View file

@ -1,5 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
supervised node zeek_init() supervised node zeek_init()
1024, cluster_nodes! 1024, cluster_nodes!
[node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>] [node_type=Cluster::WORKER, ip=127.0.0.1, zone_id=, p=0/tcp, interface=<uninitialized>, manager=<uninitialized>, time_machine=<uninitialized>, id=<uninitialized>, metrics_port=<uninitialized>]
supervised node zeek_done() supervised node zeek_done()

View file

@ -1,4 +1,4 @@
# @TEST-DOC: Query the Prometheus endpoint on 9911 and smoke check that zeek_version_info{...} is contained in the response for all cluster nodes. # @TEST-DOC: Query the Prometheus endpoint and smoke check that zeek_version_info{...} is contained in the response for all cluster nodes.
# Note compilable to C++ due to globals being initialized to a record that # Note compilable to C++ due to globals being initialized to a record that
# has an opaque type as a field. # has an opaque type as a field.
# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" # @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1"
@ -18,16 +18,35 @@
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT # @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker-1 zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff manager-1/.stdout # @TEST-EXEC: btest-diff manager-1/.stdout
# @TEST-EXEC: btest-diff manager-1/services.out
@TEST-START-FILE cluster-layout.zeek @TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = { redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1")), $metrics_port=1028/tcp],
["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"], ["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1", $metrics_port=1029/tcp],
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1"], ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1", $metrics_port=1030/tcp],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4")), $manager="manager-1"], ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4")), $manager="manager-1", $metrics_port=1031/tcp],
}; };
@TEST-END-FILE @TEST-END-FILE
@TEST-START-FILE request-services.sh
#!/bin/sh
# This script makes repeat curl requests to find all of the metrics data from the
# hosts listed in the services output from the manager, and outputs it all into a
# single file.
services_url=$1
output_file=$2
for host in $(curl -s -m 5 ${services_url} | jq '.[0].targets.[]'); do
host=$(echo ${host} | sed 's/"//g')
metrics=$(curl -s -m 5 http://${host}/metrics)
version_info=$(echo ${metrics} | grep -Eo "zeek_version_info\{[^}]+\}" | grep -o 'endpoint=\"[^"]*\"')
echo ${version_info} >> ${output_file};
done
@TEST-END-FILE
@load policy/frameworks/cluster/experimental @load policy/frameworks/cluster/experimental
@load policy/frameworks/telemetry/prometheus @load policy/frameworks/telemetry/prometheus
@load base/frameworks/telemetry @load base/frameworks/telemetry
@ -38,45 +57,24 @@ redef Cluster::nodes = {
# Query the Prometheus endpoint using ActiveHTTP for testing, oh my. # Query the Prometheus endpoint using ActiveHTTP for testing, oh my.
event run_test() event run_test()
{ {
local url = fmt("http://localhost:%s/metrics", port_to_count(Telemetry::metrics_port)); local services_url = fmt("http://localhost:%s/services.json", port_to_count(Telemetry::metrics_port));
when [url] ( local response = ActiveHTTP::request([$url=url]) ) local result = system(fmt("sh ../request-services.sh %s %s", services_url, "services.out"));
{ if ( result != 0 )
if ( response$code != 200 )
{
print fmt("ERROR: %s", response);
exit(1);
}
# Grumble grumble, ActiveHTTP actually joins away the \n characters
# from the response. Not sure how that's helpful. We simply
# grep out the zeek_version_info{...} endpoint="..." pieces and
# expect one for each node to exist as a smoke test.
local version_infos = find_all(response$body, /zeek_version_info\{[^}]+\}/, 0);
local endpoints: vector of string;
for ( info in version_infos )
for ( ep in find_all(info, /endpoint=\"[^"]+\"/))
endpoints += ep;
print sort(endpoints, strcmp);
terminate();
}
timeout 10sec
{ {
# This is bad. # This is bad.
print "ERROR: HTTP request timeout"; print "ERROR: Failed to request service information";
exit(1); exit(1);
} }
terminate();
} }
# Use a dynamic metrics port for testing to avoid colliding on 9911/tcp # Use a dynamic metrics port for testing to avoid colliding on 9911/tcp
# when running tests in parallel. # when running tests in parallel.
global orig_metrics_port = Telemetry::metrics_port;
redef Telemetry::metrics_port = to_port(getenv("BROKER_TEST_METRICS_PORT"));
event zeek_init() event zeek_init()
{ {
print Cluster::node, "original Telemetry::metrics_port", orig_metrics_port; print Cluster::node, "Telemetry::metrics_port from cluster config", Telemetry::metrics_port;
} }
event Cluster::Experimental::cluster_started() event Cluster::Experimental::cluster_started()