From 70872673a1ce56fe742c38f0d702b167e794ca9e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 2 Oct 2024 16:53:11 +0200 Subject: [PATCH] telemetry: Invoke Telemetry::sync() only at scrape/collection time This stops invoking Telemetry::sync() via a scheduled event and instead only invokes it on-demand. This makes metric collection network time independent and lazier, too. With Prometheus scrape requests being processed on Zeek's main thread now, we can safely invoke the script layer Telemetry::sync() hook. Closes #3947 --- NEWS | 4 ++ scripts/base/frameworks/telemetry/main.zeek | 30 ++++---- scripts/base/init-bare.zeek | 15 ++++ src/telemetry/Manager.cc | 29 ++++++++ src/telemetry/Manager.h | 7 ++ .../Baseline/bifs.event-handler-stats/out | 2 +- .../out | 13 ++++ .../.stderr | 3 + .../out | 2 + .../metrics1.txt | 2 + .../metrics2.txt | 3 + .../metrics3.txt | 3 + .../zeek..stdout | 5 ++ .../frameworks/telemetry/sync-collect.zeek | 48 +++++++++++++ .../frameworks/telemetry/sync-warnings.zeek | 43 ++++++++++++ .../base/frameworks/telemetry/sync.zeek | 68 +++++++++++++++++++ 16 files changed, 260 insertions(+), 17 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-collect/out create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/.stderr create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/out create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics1.txt create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics2.txt create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics3.txt create mode 100644 testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/zeek..stdout create mode 100644 testing/btest/scripts/base/frameworks/telemetry/sync-collect.zeek create mode 100644 testing/btest/scripts/base/frameworks/telemetry/sync-warnings.zeek create mode 100644 testing/btest/scripts/base/frameworks/telemetry/sync.zeek diff --git a/NEWS b/NEWS index 8b46a774c7..bddb781f4e 100644 --- a/NEWS +++ b/NEWS @@ -93,6 +93,10 @@ Changed Functionality * The ASCII input reader now suppresses warnings for consecutive invalid lines, producing a summary of total suppressions once a valid line is encountered. +* The `Telemetry::sync()` hook is now invoked on demand. Either when the metrics + of a node are scraped via the Prometheus HTTP endpoint, or one of the collect + methods is invoked from Zeek script. + Removed Functionality --------------------- diff --git a/scripts/base/frameworks/telemetry/main.zeek b/scripts/base/frameworks/telemetry/main.zeek index 0e5ce1b1f5..ce67c642d1 100644 --- a/scripts/base/frameworks/telemetry/main.zeek +++ b/scripts/base/frameworks/telemetry/main.zeek @@ -263,22 +263,15 @@ export { label_values: labels_vector, measurement: double): bool; - ## Telemetry sync hook. - ## - ## This hook is invoked every :zeek:see:`Telemetry::sync_interval` - ## for script writers to synchronize or mirror metrics with the - ## telemetry subsystem. For example, when tracking table or value - ## footprints with gauges, the value in question can be set on an actual - ## :zeek:see:`Telemetry::Gauge` instance during execution of this hook. - ## - ## Implementations should be lightweight, this hook may be called - ## multiple times per minute. The interval can increased by changing - ## :zeek:see:`Telemetry::sync_interval` at the cost of delaying - ## metric updates and thereby reducing granularity. - global sync: hook(); - ## Interval at which the :zeek:see:`Telemetry::sync` hook is invoked. - option sync_interval = 10sec; + ## + ## By default, the hook is invoked on demand, setting this option to + ## a positive interval allows to invoke it regularly, too. Regular + ## invocations are relative to Zeek's network time. + ## + ## Note that on-demand hook invocation will happen even if this + ## is set. + option sync_interval = 0sec &deprecated="Remove in 8.1. If you require regular sync invocation, do so explicitly in a scheduled event."; ## Collect all counter and gauge metrics matching the given *name* and *prefix*. ## @@ -493,7 +486,9 @@ function collect_histogram_metrics(prefix: string, name: string): vector of Hist event run_sync_hook() { hook Telemetry::sync(); +@pragma push ignore-deprecations schedule sync_interval { run_sync_hook() }; +@pragma pop ignore-deprecations } # Expose the Zeek version as Prometheus style info metric @@ -508,7 +503,10 @@ global version_gauge_family = Telemetry::register_gauge_family([ event zeek_init() { - schedule sync_interval { run_sync_hook() }; +@pragma push ignore-deprecations + if ( sync_interval > 0sec ) + schedule sync_interval { run_sync_hook() }; +@pragma pop ignore-deprecations local v = Version::info; local labels = vector(cat(v$version_number), diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 48643239b8..a0cdd0cd39 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -5933,6 +5933,21 @@ export { sum: double; }; + ## Telemetry sync hook. + ## + ## This hook is invoked when metrics are requested via functions + ## :zeek:see:`Telemetry::collect_metrics` and :zeek:see:`Telemetry::collect_histogram_metrics`, + ## or just before Zeek collects metrics when being scraped through + ## its Prometheus endpoint. + ## Script writers can use it to synchronize (or mirror) metrics with the + ## telemetry subsystem. For example, when tracking table or value + ## footprints with gauges, the value in question can be set on an actual + ## :zeek:see:`Telemetry::Gauge` instance during execution of this hook. + ## + ## Implementations should be lightweight, this hook may be called + ## multiple times per minute. + global sync: hook(); + type MetricVector : vector of Metric; type HistogramMetricVector : vector of HistogramMetric; diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 3272641882..2924301dad 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -16,6 +16,7 @@ #include #include "zeek/3rdparty/doctest.h" +#include "zeek/Func.h" #include "zeek/ID.h" #include "zeek/RunState.h" #include "zeek/ZeekString.h" @@ -256,6 +257,28 @@ static bool compare_histograms(const std::optional& a, const std::optional return comparer(a, b, metric_record_type); } +void Manager::InvokeTelemetrySyncHook() { + static const auto sync_hook = zeek::id::find_func("Telemetry::sync"); + + if ( sync_hook->Flavor() != FUNC_FLAVOR_HOOK ) + reporter->InternalError("Telemetry::sync not a hook?"); + + if ( in_sync_hook ) { + reporter->Warning("Telemetry::sync() hook invoked recursively"); + return; + } + + in_sync_hook = true; + + zeek::Args empty; + auto result = sync_hook->Invoke(&empty); + + if ( ! result->IsOne() ) + reporter->Warning("Telemetry::sync() implementations skipped due to 'break' usage"); + + in_sync_hook = false; +} + ValPtr Manager::CollectMetrics(std::string_view prefix_pattern, std::string_view name_pattern) { static auto metrics_vector_type = zeek::id::find_type("Telemetry::MetricVector"); static auto string_vec_type = zeek::id::find_type("string_vec"); @@ -268,6 +291,8 @@ ValPtr Manager::CollectMetrics(std::string_view prefix_pattern, std::string_view static auto metric_opts_type = zeek::id::find_type("Telemetry::MetricOpts"); static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type"); + InvokeTelemetrySyncHook(); + VectorValPtr ret_val = make_intrusive(metrics_vector_type); // Due to the name containing the full information about a metric including a potential unit add an @@ -345,6 +370,8 @@ ValPtr Manager::CollectHistogramMetrics(std::string_view prefix_pattern, std::st static auto metric_opts_type = zeek::id::find_type("Telemetry::MetricOpts"); static auto metric_type_idx = metric_opts_type->FieldOffset("metric_type"); + InvokeTelemetrySyncHook(); + VectorValPtr ret_val = make_intrusive(metrics_vector_type); // Due to the name containing the full information about a metric including a potential unit add an @@ -574,6 +601,8 @@ void Manager::ProcessFd(int fd, int flags) { for ( const auto& [name, f] : families ) f->RunCallbacks(); + InvokeTelemetrySyncHook(); + collector_response_idx = collector_request_idx; lk.unlock(); diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index 26e2403f58..9523385b77 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -259,6 +259,13 @@ private: RecordValPtr GetMetricOptsRecord(const prometheus::MetricFamily& metric_family); void BuildClusterJson(); + /** + * Runs the Telemetry::sync() hook in Zeek script land. + */ + void InvokeTelemetrySyncHook(); + + bool in_sync_hook = false; + std::map> families; std::map opts_records; diff --git a/testing/btest/Baseline/bifs.event-handler-stats/out b/testing/btest/Baseline/bifs.event-handler-stats/out index ae43a66216..d438e254c3 100644 --- a/testing/btest/Baseline/bifs.event-handler-stats/out +++ b/testing/btest/Baseline/bifs.event-handler-stats/out @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -[[name=Broker::log_flush, times_called=2], [name=ChecksumOffloading::check, times_called=2], [name=NetControl::init, times_called=1], [name=analyzer_confirmation_info, times_called=1], [name=connection_established, times_called=1], [name=connection_state_remove, times_called=1], [name=file_new, times_called=1], [name=file_over_new_connection, times_called=1], [name=file_sniff, times_called=1], [name=file_state_remove, times_called=1], [name=filter_change_tracking, times_called=3], [name=get_file_handle, times_called=4], [name=http_begin_entity, times_called=2], [name=http_end_entity, times_called=2], [name=http_header, times_called=13], [name=http_message_done, times_called=2], [name=http_reply, times_called=1], [name=http_request, times_called=1], [name=net_done, times_called=1], [name=new_connection, times_called=1], [name=run_sync_hook, times_called=2], [name=zeek_done, times_called=1], [name=zeek_init, times_called=1]] +[[name=Broker::log_flush, times_called=2], [name=ChecksumOffloading::check, times_called=2], [name=NetControl::init, times_called=1], [name=analyzer_confirmation_info, times_called=1], [name=connection_established, times_called=1], [name=connection_state_remove, times_called=1], [name=file_new, times_called=1], [name=file_over_new_connection, times_called=1], [name=file_sniff, times_called=1], [name=file_state_remove, times_called=1], [name=filter_change_tracking, times_called=3], [name=get_file_handle, times_called=4], [name=http_begin_entity, times_called=2], [name=http_end_entity, times_called=2], [name=http_header, times_called=13], [name=http_message_done, times_called=2], [name=http_reply, times_called=1], [name=http_request, times_called=1], [name=net_done, times_called=1], [name=new_connection, times_called=1], [name=zeek_done, times_called=1], [name=zeek_init, times_called=1]] diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-collect/out b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-collect/out new file mode 100644 index 0000000000..393c51b20d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-collect/out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node up +sync, 1, tcp +btest_connections_total, [tcp], 1.0 +sync, 2, udp +btest_connections_total, [udp], 1.0 +btest_connections_total, [tcp], 1.0 +sync, 3, udp +btest_connections_total, [udp], 2.0 +btest_connections_total, [tcp], 1.0 +sync, 4, udp +btest_connections_total, [udp], 2.0 +btest_connections_total, [tcp], 1.0 diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/.stderr b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/.stderr new file mode 100644 index 0000000000..32c85b57ed --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/.stderr @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +warning: Telemetry::sync() hook invoked recursively +warning: Telemetry::sync() implementations skipped due to 'break' usage diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/out b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/out new file mode 100644 index 0000000000..a4801a85eb --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync-warnings/out @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node up diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics1.txt b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics1.txt new file mode 100644 index 0000000000..7df20bcb5a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics1.txt @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +btest_connections_total{proto="tcp"} 1 diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics2.txt b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics2.txt new file mode 100644 index 0000000000..db8cfd6872 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics2.txt @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +btest_connections_total{proto="udp"} 1 +btest_connections_total{proto="tcp"} 1 diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics3.txt b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics3.txt new file mode 100644 index 0000000000..cce741488a --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/metrics3.txt @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +btest_connections_total{proto="udp"} 2 +btest_connections_total{proto="tcp"} 1 diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/zeek..stdout b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/zeek..stdout new file mode 100644 index 0000000000..f3259c3511 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.sync/zeek..stdout @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node up +sync, 1, tcp +sync, 2, udp +sync, 3, udp diff --git a/testing/btest/scripts/base/frameworks/telemetry/sync-collect.zeek b/testing/btest/scripts/base/frameworks/telemetry/sync-collect.zeek new file mode 100644 index 0000000000..ca52fb77fb --- /dev/null +++ b/testing/btest/scripts/base/frameworks/telemetry/sync-collect.zeek @@ -0,0 +1,48 @@ +# @TEST-DOC: Calling collect_metrics() invokes Telemetry::sync. +# Note compilable to C++ due to globals being initialized to a record that +# has an opaque type as a field. +# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" +# +# @TEST-EXEC: zeek -b %INPUT >out +# @TEST-EXEC: btest-diff out + + +@load base/frameworks/telemetry + +global connections_by_proto_cf = Telemetry::register_counter_family([ + $prefix="btest", + $name="connections", + $unit="", + $help_text="Total number of monitored connections", + $label_names=vector("proto") +]); + +function print_metrics(ms: vector of Telemetry::Metric) { + for (_, m in ms) { + print m$opts$name, m$label_values, m$value; + } +} + +event zeek_init() + { + print "node up"; + local ms = Telemetry::collect_metrics("btest"); + print_metrics(ms); + ms = Telemetry::collect_metrics("btest"); + print_metrics(ms); + ms = Telemetry::collect_metrics("btest"); + print_metrics(ms); + local hm = Telemetry::collect_histogram_metrics("btest"); + print_metrics(ms); + } + + +global sync_calls = 0; + +hook Telemetry::sync() + { + ++sync_calls; + local proto = sync_calls == 1 ? "tcp" : "udp"; + print "sync", sync_calls, proto; + Telemetry::counter_family_inc(connections_by_proto_cf, vector(proto)); + } diff --git a/testing/btest/scripts/base/frameworks/telemetry/sync-warnings.zeek b/testing/btest/scripts/base/frameworks/telemetry/sync-warnings.zeek new file mode 100644 index 0000000000..b542ff5ccb --- /dev/null +++ b/testing/btest/scripts/base/frameworks/telemetry/sync-warnings.zeek @@ -0,0 +1,43 @@ +# @TEST-DOC: Breaking and recursive Telemetry::sync() warning +# Note compilable to C++ due to globals being initialized to a record that +# has an opaque type as a field. +# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" +# +# @TEST-EXEC: zeek -b %INPUT >out +# @TEST-EXEC: btest-diff out +# @TEST-EXEC: btest-diff .stderr + + +@load base/frameworks/telemetry + +global connections_by_proto_cf = Telemetry::register_counter_family([ + $prefix="btest", + $name="connections", + $unit="", + $help_text="Total number of monitored connections", + $label_names=vector("proto") +]); + +event zeek_init() + { + print "node up"; + Telemetry::counter_family_inc(connections_by_proto_cf, vector("tcp")); + local ms = Telemetry::collect_metrics("btest"); + } + + +hook Telemetry::sync() + { + # Calling collect_metrics() in Telemetry::sync() is not good as + # it would invoke Telemetry::sync() recursively. The manager will + # emit a warning and not run the second Telemetry::sync() invocation. + local ms = Telemetry::collect_metrics("btest"); + } + +hook Telemetry::sync() &priority=-100 + { + # break is not good as it prevents other Telemetry::sync() hooks + # from running. This will produce a warning. + # We could find this via script validation if we wanted to. + break; + } diff --git a/testing/btest/scripts/base/frameworks/telemetry/sync.zeek b/testing/btest/scripts/base/frameworks/telemetry/sync.zeek new file mode 100644 index 0000000000..2e20eeed14 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/telemetry/sync.zeek @@ -0,0 +1,68 @@ +# @TEST-DOC: Verify Telemetry::sync() is invoked for metric scraping via the Prometheus HTTP endpoint. +# Note compilable to C++ due to globals being initialized to a record that +# has an opaque type as a field. +# @TEST-REQUIRES: test "${ZEEK_USE_CPP}" != "1" +# @TEST-REQUIRES: which jq +# @TEST-REQUIRES: which curl +# +# @TEST-PORT: METRICS_PORT +# +# @TEST-EXEC: chmod +x fetch-metrics.sh +# @TEST-EXEC: zeek --parse-only %INPUT +# @TEST-EXEC: btest-bg-run zeek ZEEKPATH=$ZEEKPATH:.. zeek -b %INPUT +# @TEST-EXEC: $SCRIPTS/wait-for-file zeek/up 5 || (btest-bg-wait -k 1 && false) +# @TEST-EXEC: ./fetch-metrics.sh 1.trace metrics1.txt +# @TEST-EXEC: ./fetch-metrics.sh 2.trace metrics2.txt +# @TEST-EXEC: ./fetch-metrics.sh 3.trace metrics3.txt +# @TEST-EXEC: btest-bg-wait 10 +# +# @TEST-EXEC: btest-diff zeek/.stdout +# @TEST-EXEC: btest-diff metrics1.txt +# @TEST-EXEC: btest-diff metrics2.txt +# @TEST-EXEC: btest-diff metrics3.txt + +@TEST-START-FILE fetch-metrics.sh +#! /usr/bin/env bash +set -ux +trace_file=$1 +output_file=$2 + +PORT=$(echo ${METRICS_PORT} | cut -d '/' -f 1) +URL=http://localhost:${PORT}/metrics + +curl -m 5 --trace $trace_file $URL | grep ^btest > $output_file + +exit 0 +@TEST-END-FILE + +@load base/frameworks/telemetry + +redef exit_only_after_terminate = T; +redef Telemetry::metrics_port = to_port(getenv("METRICS_PORT")); + +event zeek_init() + { + print "node up"; + system("touch up"); + } + +global connections_by_proto_cf = Telemetry::register_counter_family([ + $prefix="btest", + $name="connections", + $unit="", + $help_text="Total number of monitored connections", + $label_names=vector("proto") +]); + +global sync_calls = 0; + +hook Telemetry::sync() + { + ++sync_calls; + local proto = sync_calls == 1 ? "tcp" : "udp"; + print "sync", sync_calls, proto; + Telemetry::counter_family_inc(connections_by_proto_cf, vector(proto)); + + if ( sync_calls == 3 ) + terminate(); + }