telemetry: Invoke Telemetry::sync() only at scrape/collection time

This stops invoking Telemetry::sync() via a scheduled event and instead
only invokes it on-demand. This makes metric collection network time
independent and lazier, too.

With Prometheus scrape requests being processed on Zeek's main thread
now, we can safely invoke the script layer Telemetry::sync() hook.

Closes #3947
This commit is contained in:
Arne Welzel 2024-10-02 16:53:11 +02:00
parent e118887771
commit 70872673a1
16 changed files with 260 additions and 17 deletions

4
NEWS
View file

@ -93,6 +93,10 @@ Changed Functionality
* The ASCII input reader now suppresses warnings for consecutive invalid lines,
producing a summary of total suppressions once a valid line is encountered.
* The `Telemetry::sync()` hook is now invoked on demand. Either when the metrics
of a node are scraped via the Prometheus HTTP endpoint, or one of the collect
methods is invoked from Zeek script.
Removed Functionality
---------------------

View file

@ -263,22 +263,15 @@ export {
label_values: labels_vector,
measurement: double): bool;
## Telemetry sync hook.
##
## This hook is invoked every :zeek:see:`Telemetry::sync_interval`
## for script writers to synchronize or mirror metrics with the
## telemetry subsystem. For example, when tracking table or value
## footprints with gauges, the value in question can be set on an actual
## :zeek:see:`Telemetry::Gauge` instance during execution of this hook.
##
## Implementations should be lightweight, this hook may be called
## multiple times per minute. The interval can increased by changing
## :zeek:see:`Telemetry::sync_interval` at the cost of delaying
## metric updates and thereby reducing granularity.
global sync: hook();
## Interval at which the :zeek:see:`Telemetry::sync` hook is invoked.
option sync_interval = 10sec;
##
## By default, the hook is invoked on demand, setting this option to
## a positive interval allows to invoke it regularly, too. Regular
## invocations are relative to Zeek's network time.
##
## Note that on-demand hook invocation will happen even if this
## is set.
option sync_interval = 0sec &deprecated="Remove in 8.1. If you require regular sync invocation, do so explicitly in a scheduled event.";
## Collect all counter and gauge metrics matching the given *name* and *prefix*.
##
@ -493,7 +486,9 @@ function collect_histogram_metrics(prefix: string, name: string): vector of Hist
event run_sync_hook()
{
hook Telemetry::sync();
@pragma push ignore-deprecations
schedule sync_interval { run_sync_hook() };
@pragma pop ignore-deprecations
}
# Expose the Zeek version as Prometheus style info metric
@ -508,7 +503,10 @@ global version_gauge_family = Telemetry::register_gauge_family([
event zeek_init()
{
@pragma push ignore-deprecations
if ( sync_interval > 0sec )
schedule sync_interval { run_sync_hook() };
@pragma pop ignore-deprecations
local v = Version::info;
local labels = vector(cat(v$version_number),

View file

@ -5933,6 +5933,21 @@ export {
sum: double;
};
## Telemetry sync hook.
##
## This hook is invoked when metrics are requested via functions
## :zeek:see:`Telemetry::collect_metrics` and :zeek:see:`Telemetry::collect_histogram_metrics`,
## or just before Zeek collects metrics when being scraped through
## its Prometheus endpoint.
## Script writers can use it to synchronize (or mirror) metrics with the
## telemetry subsystem. For example, when tracking table or value
## footprints with gauges, the value in question can be set on an actual
## :zeek:see:`Telemetry::Gauge` instance during execution of this hook.
##
## Implementations should be lightweight, this hook may be called
## multiple times per minute.
global sync: hook();
type MetricVector : vector of Metric;
type HistogramMetricVector : vector of HistogramMetric;

View file

@ -16,6 +16,7 @@
#include <variant>
#include "zeek/3rdparty/doctest.h"
#include "zeek/Func.h"
#include "zeek/ID.h"
#include "zeek/RunState.h"
#include "zeek/ZeekString.h"
@ -256,6 +257,28 @@ static bool compare_histograms(const std::optional<ZVal>& a, const std::optional
return comparer(a, b, metric_record_type);
}
void Manager::InvokeTelemetrySyncHook() {
static const auto sync_hook = zeek::id::find_func("Telemetry::sync");
if ( sync_hook->Flavor() != FUNC_FLAVOR_HOOK )
reporter->InternalError("Telemetry::sync not a hook?");
if ( in_sync_hook ) {
reporter->Warning("Telemetry::sync() hook invoked recursively");
return;
}
in_sync_hook = true;
zeek::Args empty;
auto result = sync_hook->Invoke(&empty);
if ( ! result->IsOne() )
reporter->Warning("Telemetry::sync() implementations skipped due to 'break' usage");
in_sync_hook = false;
}
ValPtr Manager::CollectMetrics(std::string_view prefix_pattern, std::string_view name_pattern) {
static auto metrics_vector_type = zeek::id::find_type<VectorType>("Telemetry::MetricVector");
static auto string_vec_type = zeek::id::find_type<zeek::VectorType>("string_vec");
@ -268,6 +291,8 @@ ValPtr Manager::CollectMetrics(std::string_view prefix_pattern, std::string_view
static auto metric_opts_type = zeek::id::find_type<zeek::RecordType>("Telemetry::MetricOpts");
static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type");
InvokeTelemetrySyncHook();
VectorValPtr ret_val = make_intrusive<VectorVal>(metrics_vector_type);
// Due to the name containing the full information about a metric including a potential unit add an
@ -345,6 +370,8 @@ ValPtr Manager::CollectHistogramMetrics(std::string_view prefix_pattern, std::st
static auto metric_opts_type = zeek::id::find_type<zeek::RecordType>("Telemetry::MetricOpts");
static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type");
InvokeTelemetrySyncHook();
VectorValPtr ret_val = make_intrusive<VectorVal>(metrics_vector_type);
// Due to the name containing the full information about a metric including a potential unit add an
@ -574,6 +601,8 @@ void Manager::ProcessFd(int fd, int flags) {
for ( const auto& [name, f] : families )
f->RunCallbacks();
InvokeTelemetrySyncHook();
collector_response_idx = collector_request_idx;
lk.unlock();

View file

@ -259,6 +259,13 @@ private:
RecordValPtr GetMetricOptsRecord(const prometheus::MetricFamily& metric_family);
void BuildClusterJson();
/**
* Runs the Telemetry::sync() hook in Zeek script land.
*/
void InvokeTelemetrySyncHook();
bool in_sync_hook = false;
std::map<std::string, std::shared_ptr<MetricFamily>> families;
std::map<std::string, RecordValPtr> opts_records;

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
[[name=Broker::log_flush, times_called=2], [name=ChecksumOffloading::check, times_called=2], [name=NetControl::init, times_called=1], [name=analyzer_confirmation_info, times_called=1], [name=connection_established, times_called=1], [name=connection_state_remove, times_called=1], [name=file_new, times_called=1], [name=file_over_new_connection, times_called=1], [name=file_sniff, times_called=1], [name=file_state_remove, times_called=1], [name=filter_change_tracking, times_called=3], [name=get_file_handle, times_called=4], [name=http_begin_entity, times_called=2], [name=http_end_entity, times_called=2], [name=http_header, times_called=13], [name=http_message_done, times_called=2], [name=http_reply, times_called=1], [name=http_request, times_called=1], [name=net_done, times_called=1], [name=new_connection, times_called=1], [name=run_sync_hook, times_called=2], [name=zeek_done, times_called=1], [name=zeek_init, times_called=1]]
[[name=Broker::log_flush, times_called=2], [name=ChecksumOffloading::check, times_called=2], [name=NetControl::init, times_called=1], [name=analyzer_confirmation_info, times_called=1], [name=connection_established, times_called=1], [name=connection_state_remove, times_called=1], [name=file_new, times_called=1], [name=file_over_new_connection, times_called=1], [name=file_sniff, times_called=1], [name=file_state_remove, times_called=1], [name=filter_change_tracking, times_called=3], [name=get_file_handle, times_called=4], [name=http_begin_entity, times_called=2], [name=http_end_entity, times_called=2], [name=http_header, times_called=13], [name=http_message_done, times_called=2], [name=http_reply, times_called=1], [name=http_request, times_called=1], [name=net_done, times_called=1], [name=new_connection, times_called=1], [name=zeek_done, times_called=1], [name=zeek_init, times_called=1]]

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node up
sync, 1, tcp
btest_connections_total, [tcp], 1.0
sync, 2, udp
btest_connections_total, [udp], 1.0
btest_connections_total, [tcp], 1.0
sync, 3, udp
btest_connections_total, [udp], 2.0
btest_connections_total, [tcp], 1.0
sync, 4, udp
btest_connections_total, [udp], 2.0
btest_connections_total, [tcp], 1.0

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
warning: Telemetry::sync() hook invoked recursively
warning: Telemetry::sync() implementations skipped due to 'break' usage

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node up

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
btest_connections_total{proto="tcp"} 1

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
btest_connections_total{proto="udp"} 1
btest_connections_total{proto="tcp"} 1

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
btest_connections_total{proto="udp"} 2
btest_connections_total{proto="tcp"} 1

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node up
sync, 1, tcp
sync, 2, udp
sync, 3, udp

View file

@ -0,0 +1,48 @@
# @TEST-DOC: Calling collect_metrics() invokes Telemetry::sync.
# Note compilable to C++ due to globals being initialized to a record that
# has an opaque type as a field.
# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1"
#
# @TEST-EXEC: zeek -b %INPUT >out
# @TEST-EXEC: btest-diff out
@load base/frameworks/telemetry
global connections_by_proto_cf = Telemetry::register_counter_family([
$prefix="btest",
$name="connections",
$unit="",
$help_text="Total number of monitored connections",
$label_names=vector("proto")
]);
function print_metrics(ms: vector of Telemetry::Metric) {
for (_, m in ms) {
print m$opts$name, m$label_values, m$value;
}
}
event zeek_init()
{
print "node up";
local ms = Telemetry::collect_metrics("btest");
print_metrics(ms);
ms = Telemetry::collect_metrics("btest");
print_metrics(ms);
ms = Telemetry::collect_metrics("btest");
print_metrics(ms);
local hm = Telemetry::collect_histogram_metrics("btest");
print_metrics(ms);
}
global sync_calls = 0;
hook Telemetry::sync()
{
++sync_calls;
local proto = sync_calls == 1 ? "tcp" : "udp";
print "sync", sync_calls, proto;
Telemetry::counter_family_inc(connections_by_proto_cf, vector(proto));
}

View file

@ -0,0 +1,43 @@
# @TEST-DOC: Breaking and recursive Telemetry::sync() warning
# Note compilable to C++ due to globals being initialized to a record that
# has an opaque type as a field.
# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1"
#
# @TEST-EXEC: zeek -b %INPUT >out
# @TEST-EXEC: btest-diff out
# @TEST-EXEC: btest-diff .stderr
@load base/frameworks/telemetry
global connections_by_proto_cf = Telemetry::register_counter_family([
$prefix="btest",
$name="connections",
$unit="",
$help_text="Total number of monitored connections",
$label_names=vector("proto")
]);
event zeek_init()
{
print "node up";
Telemetry::counter_family_inc(connections_by_proto_cf, vector("tcp"));
local ms = Telemetry::collect_metrics("btest");
}
hook Telemetry::sync()
{
# Calling collect_metrics() in Telemetry::sync() is not good as
# it would invoke Telemetry::sync() recursively. The manager will
# emit a warning and not run the second Telemetry::sync() invocation.
local ms = Telemetry::collect_metrics("btest");
}
hook Telemetry::sync() &priority=-100
{
# break is not good as it prevents other Telemetry::sync() hooks
# from running. This will produce a warning.
# We could find this via script validation if we wanted to.
break;
}

View file

@ -0,0 +1,68 @@
# @TEST-DOC: Verify Telemetry::sync() is invoked for metric scraping via the Prometheus HTTP endpoint.
# Note compilable to C++ due to globals being initialized to a record that
# has an opaque type as a field.
# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1"
# @TEST-REQUIRES: which jq
# @TEST-REQUIRES: which curl
#
# @TEST-PORT: METRICS_PORT
#
# @TEST-EXEC: chmod +x fetch-metrics.sh
# @TEST-EXEC: zeek --parse-only %INPUT
# @TEST-EXEC: btest-bg-run zeek ZEEKPATH=$ZEEKPATH:.. zeek -b %INPUT
# @TEST-EXEC: $SCRIPTS/wait-for-file zeek/up 5 || (btest-bg-wait -k 1 && false)
# @TEST-EXEC: ./fetch-metrics.sh 1.trace metrics1.txt
# @TEST-EXEC: ./fetch-metrics.sh 2.trace metrics2.txt
# @TEST-EXEC: ./fetch-metrics.sh 3.trace metrics3.txt
# @TEST-EXEC: btest-bg-wait 10
#
# @TEST-EXEC: btest-diff zeek/.stdout
# @TEST-EXEC: btest-diff metrics1.txt
# @TEST-EXEC: btest-diff metrics2.txt
# @TEST-EXEC: btest-diff metrics3.txt
@TEST-START-FILE fetch-metrics.sh
#! /usr/bin/env bash
set -ux
trace_file=$1
output_file=$2
PORT=$(echo ${METRICS_PORT} | cut -d '/' -f 1)
URL=http://localhost:${PORT}/metrics
curl -m 5 --trace $trace_file $URL | grep ^btest > $output_file
exit 0
@TEST-END-FILE
@load base/frameworks/telemetry
redef exit_only_after_terminate = T;
redef Telemetry::metrics_port = to_port(getenv("METRICS_PORT"));
event zeek_init()
{
print "node up";
system("touch up");
}
global connections_by_proto_cf = Telemetry::register_counter_family([
$prefix="btest",
$name="connections",
$unit="",
$help_text="Total number of monitored connections",
$label_names=vector("proto")
]);
global sync_calls = 0;
hook Telemetry::sync()
{
++sync_calls;
local proto = sync_calls == 1 ? "tcp" : "udp";
print "sync", sync_calls, proto;
Telemetry::counter_family_inc(connections_by_proto_cf, vector(proto));
if ( sync_calls == 3 )
terminate();
}