telemetry: Move callbacks to Zeek

Now that we run callbacks on the main loop, we can move callback support
for Counter and Gauge instances directly into Zeek and don't need to patch
prometheus-cpp anymore.
This commit is contained in:
Arne Welzel 2024-09-11 17:49:06 +02:00
parent c7fcdc4050
commit 48dd89ef33
15 changed files with 143 additions and 186 deletions

View file

@ -557,42 +557,38 @@ void DNS_Mgr::InitPostScript() {
cached_hosts_metric = cached_hosts_metric =
telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "host"}}, telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "host"}},
"Number of cached hosts in DNS_Mgr", "", []() -> prometheus::ClientMetric { "Number of cached hosts in DNS_Mgr", "", []() {
prometheus::ClientMetric metric; double value = 0;
metric.gauge.value = 0;
if ( dns_mgr ) { if ( dns_mgr ) {
dns_mgr->UpdateCachedStats(false); dns_mgr->UpdateCachedStats(false);
metric.gauge.value = static_cast<double>(dns_mgr->last_cached_stats.hosts); value = static_cast<double>(dns_mgr->last_cached_stats.hosts);
} }
return metric; return value;
}); });
cached_addresses_metric = cached_addresses_metric =
telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "address"}}, telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "address"}},
"Number of cached addresses in DNS_Mgr", "", []() -> prometheus::ClientMetric { "Number of cached addresses in DNS_Mgr", "", []() {
prometheus::ClientMetric metric; double value = 0;
metric.gauge.value = 0;
if ( dns_mgr ) { if ( dns_mgr ) {
dns_mgr->UpdateCachedStats(false); dns_mgr->UpdateCachedStats(false);
metric.gauge.value = value = static_cast<double>(dns_mgr->last_cached_stats.addresses);
static_cast<double>(dns_mgr->last_cached_stats.addresses);
} }
return metric; return value;
}); });
cached_texts_metric = cached_texts_metric =
telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "text"}}, telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "text"}},
"Number of cached texts in DNS_Mgr", "", []() -> prometheus::ClientMetric { "Number of cached texts in DNS_Mgr", "", []() {
prometheus::ClientMetric metric; double value = 0;
metric.gauge.value = 0;
if ( dns_mgr ) { if ( dns_mgr ) {
dns_mgr->UpdateCachedStats(false); dns_mgr->UpdateCachedStats(false);
metric.gauge.value = static_cast<double>(dns_mgr->last_cached_stats.texts); value = static_cast<double>(dns_mgr->last_cached_stats.texts);
} }
return metric; return value;
}); });
if ( ! doctest::is_running_in_test ) { if ( ! doctest::is_running_in_test ) {

View file

@ -98,32 +98,20 @@ void TimerMgr::InitPostScript() {
dispatch_all_expired = zeek::detail::max_timer_expires == 0; dispatch_all_expired = zeek::detail::max_timer_expires == 0;
cumulative_num_metric = telemetry_mgr->CounterInstance("zeek", "timers", {}, "Cumulative number of timers", "", cumulative_num_metric =
[]() -> prometheus::ClientMetric { telemetry_mgr->CounterInstance("zeek", "timers", {}, "Cumulative number of timers", "",
prometheus::ClientMetric metric; []() { return static_cast<double>(timer_mgr->CumulativeNum()); });
metric.counter.value =
static_cast<double>(timer_mgr->CumulativeNum());
return metric;
});
lag_time_metric = telemetry_mgr->GaugeInstance("zeek", "timers_lag_time", {}, lag_time_metric =
"Lag between current network time and last expired timer", "seconds", telemetry_mgr->GaugeInstance("zeek", "timers_lag_time", {},
[]() -> prometheus::ClientMetric { "Lag between current network time and last expired timer", "seconds",
prometheus::ClientMetric metric; []() { return run_state::network_time - timer_mgr->last_timestamp; });
metric.gauge.value =
run_state::network_time - timer_mgr->last_timestamp;
return metric;
});
std::shared_ptr<telemetry::GaugeFamily> family = std::shared_ptr<telemetry::GaugeFamily> family =
telemetry_mgr->GaugeFamily("zeek", "timers_pending", {"type"}, "Number of timers for a certain type"); telemetry_mgr->GaugeFamily("zeek", "timers_pending", {"type"}, "Number of timers for a certain type");
for ( int i = 0; i < NUM_TIMER_TYPES; i++ ) { for ( int i = 0; i < NUM_TIMER_TYPES; i++ ) {
current_timer_metrics[i] = family->GetOrAdd({{"type", timer_type_to_string(static_cast<TimerType>(i))}}, current_timer_metrics[i] = family->GetOrAdd({{"type", timer_type_to_string(static_cast<TimerType>(i))}},
[i]() -> prometheus::ClientMetric { [i]() { return TimerMgr::CurrentTimers()[i]; });
prometheus::ClientMetric metric;
metric.gauge.value = TimerMgr::CurrentTimers()[i];
return metric;
});
} }
} }

View file

@ -435,13 +435,9 @@ Manager::~Manager() { delete pending; }
void Manager::InitPostScript() { void Manager::InitPostScript() {
trigger_count = telemetry_mgr->CounterInstance("zeek", "triggers", {}, "Total number of triggers scheduled"); trigger_count = telemetry_mgr->CounterInstance("zeek", "triggers", {}, "Total number of triggers scheduled");
trigger_pending = trigger_pending =
telemetry_mgr->GaugeInstance("zeek", "pending_triggers", {}, "Pending number of triggers", "", telemetry_mgr->GaugeInstance("zeek", "pending_triggers", {}, "Pending number of triggers", "", []() {
[]() -> prometheus::ClientMetric { return trigger_mgr ? static_cast<double>(trigger_mgr->pending->size()) : 0.0;
prometheus::ClientMetric metric; });
metric.gauge.value =
trigger_mgr ? static_cast<double>(trigger_mgr->pending->size()) : 0.0;
return metric;
});
iosource_mgr->Register(this, true); iosource_mgr->Register(this, true);
} }

View file

@ -343,27 +343,15 @@ void Manager::InitPostScript() {
num_peers_metric = num_peers_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "", telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "",
[]() -> prometheus::ClientMetric { []() { return static_cast<double>(broker_mgr->peer_count); });
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->peer_count);
return metric;
});
num_stores_metric = num_stores_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "", telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "",
[]() -> prometheus::ClientMetric { []() { return static_cast<double>(broker_mgr->data_stores.size()); });
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->data_stores.size());
return metric;
});
num_pending_queries_metric = num_pending_queries_metric =
telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries", telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries",
"", []() -> prometheus::ClientMetric { "", []() { return static_cast<double>(broker_mgr->pending_queries.size()); });
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(broker_mgr->pending_queries.size());
return metric;
});
num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {}, num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {},
"Total number of incoming events via broker"); "Total number of incoming events via broker");

View file

@ -79,12 +79,9 @@ Manager::Manager() {
stats = new detail::ProtocolStats(); stats = new detail::ProtocolStats();
ended_sessions_metric_family = telemetry_mgr->CounterFamily("zeek", "ended_sessions", {"reason"}, ended_sessions_metric_family = telemetry_mgr->CounterFamily("zeek", "ended_sessions", {"reason"},
"Number of sessions ended for specific reasons"); "Number of sessions ended for specific reasons");
ended_by_inactivity_metric = ended_by_inactivity_metric = ended_sessions_metric_family->GetOrAdd({{"reason", "inactivity"}}, []() {
ended_sessions_metric_family->GetOrAdd({{"reason", "inactivity"}}, []() -> prometheus::ClientMetric { return static_cast<double>(zeek::detail::killed_by_inactivity);
prometheus::ClientMetric metric; });
metric.counter.value = static_cast<double>(zeek::detail::killed_by_inactivity);
return metric;
});
} }
Manager::~Manager() { Manager::~Manager() {

View file

@ -2,27 +2,17 @@
using namespace zeek::telemetry; using namespace zeek::telemetry;
Counter::Counter(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept Counter::Counter(FamilyType* family, const prometheus::Labels& labels, detail::CollectCallbackPtr callback) noexcept
: family(family), handle(family->Add(labels)), labels(labels) { : family(family), handle(family->Add(labels)), labels(labels), callback(std::move(callback)) {}
if ( callback ) {
handle.AddCollectCallback(std::move(callback));
has_callback = true;
}
}
double Counter::Value() const noexcept { double Counter::Value() const noexcept {
if ( has_callback ) { if ( callback )
// Use Collect() here instead of Value() to correctly handle metrics with return callback();
// callbacks.
auto metric = handle.Collect();
return metric.counter.value;
}
return handle.Value(); return handle.Value();
} }
std::shared_ptr<Counter> CounterFamily::GetOrAdd(Span<const LabelView> labels, std::shared_ptr<Counter> CounterFamily::GetOrAdd(Span<const LabelView> labels, detail::CollectCallbackPtr callback) {
prometheus::CollectCallbackPtr callback) {
prometheus::Labels p_labels = detail::BuildPrometheusLabels(labels); prometheus::Labels p_labels = detail::BuildPrometheusLabels(labels);
auto check = [&](const std::shared_ptr<Counter>& counter) { return counter->CompareLabels(p_labels); }; auto check = [&](const std::shared_ptr<Counter>& counter) { return counter->CompareLabels(p_labels); };
@ -36,6 +26,15 @@ std::shared_ptr<Counter> CounterFamily::GetOrAdd(Span<const LabelView> labels,
} }
std::shared_ptr<Counter> CounterFamily::GetOrAdd(std::initializer_list<LabelView> labels, std::shared_ptr<Counter> CounterFamily::GetOrAdd(std::initializer_list<LabelView> labels,
prometheus::CollectCallbackPtr callback) { detail::CollectCallbackPtr callback) {
return GetOrAdd(Span{labels.begin(), labels.size()}, std::move(callback)); return GetOrAdd(Span{labels.begin(), labels.size()}, std::move(callback));
} }
void CounterFamily::RunCallbacks() {
for ( auto& c : counters ) {
if ( c->HasCallback() ) {
double val = c->RunCallback();
c->Set(val);
}
}
}

View file

@ -4,7 +4,6 @@
#include <prometheus/counter.h> #include <prometheus/counter.h>
#include <prometheus/family.h> #include <prometheus/family.h>
#include <cstdint>
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
@ -15,6 +14,12 @@
namespace zeek::telemetry { namespace zeek::telemetry {
namespace detail {
using CollectCallbackPtr = std::function<double()>;
}
class CounterFamily;
/** /**
* A handle to a metric that can only go up. * A handle to a metric that can only go up.
*/ */
@ -26,7 +31,7 @@ public:
using FamilyType = prometheus::Family<Handle>; using FamilyType = prometheus::Family<Handle>;
explicit Counter(FamilyType* family, const prometheus::Labels& labels, explicit Counter(FamilyType* family, const prometheus::Labels& labels,
prometheus::CollectCallbackPtr callback = nullptr) noexcept; detail::CollectCallbackPtr callback = nullptr) noexcept;
/** /**
* Increments the value by 1. * Increments the value by 1.
@ -55,11 +60,21 @@ public:
bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; }
bool HasCallback() const noexcept { return callback != nullptr; }
double RunCallback() const { return callback(); }
private: private:
friend class CounterFamily;
void Set(double val) {
// Counter has no Set(), but we can fake it.
handle.Reset();
handle.Increment(val);
}
FamilyType* family = nullptr; FamilyType* family = nullptr;
Handle& handle; Handle& handle;
prometheus::Labels labels; prometheus::Labels labels;
bool has_callback = false; detail::CollectCallbackPtr callback;
}; };
using CounterPtr = std::shared_ptr<Counter>; using CounterPtr = std::shared_ptr<Counter>;
@ -75,15 +90,17 @@ public:
* Returns the metrics handle for given labels, creating a new instance * Returns the metrics handle for given labels, creating a new instance
* lazily if necessary. * lazily if necessary.
*/ */
CounterPtr GetOrAdd(Span<const LabelView> labels, prometheus::CollectCallbackPtr callback = nullptr); CounterPtr GetOrAdd(Span<const LabelView> labels, detail::CollectCallbackPtr callback = nullptr);
/** /**
* @copydoc GetOrAdd * @copydoc GetOrAdd
*/ */
CounterPtr GetOrAdd(std::initializer_list<LabelView> labels, prometheus::CollectCallbackPtr callback = nullptr); CounterPtr GetOrAdd(std::initializer_list<LabelView> labels, detail::CollectCallbackPtr callback = nullptr);
zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::COUNTER; } zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::COUNTER; }
void RunCallbacks() override;
private: private:
prometheus::Family<prometheus::Counter>* family; prometheus::Family<prometheus::Counter>* family;
std::vector<CounterPtr> counters; std::vector<CounterPtr> counters;

View file

@ -3,26 +3,17 @@
using namespace zeek::telemetry; using namespace zeek::telemetry;
double Gauge::Value() const noexcept { double Gauge::Value() const noexcept {
if ( has_callback ) { if ( callback )
// Use Collect() here instead of Value() to correctly handle metrics return callback();
// with callbacks.
auto metric = handle.Collect();
return metric.gauge.value;
}
return handle.Value(); return handle.Value();
} }
Gauge::Gauge(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept Gauge::Gauge(FamilyType* family, const prometheus::Labels& labels, detail::CollectCallbackPtr callback) noexcept
: family(family), handle(family->Add(labels)), labels(labels) { : family(family), handle(family->Add(labels)), labels(labels), callback(std::move(callback)) {}
if ( callback ) {
handle.AddCollectCallback(std::move(callback));
has_callback = true;
}
}
std::shared_ptr<Gauge> GaugeFamily::GetOrAdd(Span<const LabelView> labels, prometheus::CollectCallbackPtr callback) { std::shared_ptr<Gauge> GaugeFamily::GetOrAdd(Span<const LabelView> labels, detail::CollectCallbackPtr callback) {
prometheus::Labels p_labels = detail::BuildPrometheusLabels(labels); prometheus::Labels p_labels = detail::BuildPrometheusLabels(labels);
auto check = [&](const std::shared_ptr<Gauge>& gauge) { return gauge->CompareLabels(p_labels); }; auto check = [&](const std::shared_ptr<Gauge>& gauge) { return gauge->CompareLabels(p_labels); };
@ -36,6 +27,13 @@ std::shared_ptr<Gauge> GaugeFamily::GetOrAdd(Span<const LabelView> labels, prome
} }
std::shared_ptr<Gauge> GaugeFamily::GetOrAdd(std::initializer_list<LabelView> labels, std::shared_ptr<Gauge> GaugeFamily::GetOrAdd(std::initializer_list<LabelView> labels,
prometheus::CollectCallbackPtr callback) { detail::CollectCallbackPtr callback) {
return GetOrAdd(Span{labels.begin(), labels.size()}, std::move(callback)); return GetOrAdd(Span{labels.begin(), labels.size()}, std::move(callback));
} }
void GaugeFamily::RunCallbacks() {
for ( const auto& g : gauges ) {
if ( g->HasCallback() )
g->Set(g->RunCallback());
}
}

View file

@ -4,7 +4,7 @@
#include <prometheus/family.h> #include <prometheus/family.h>
#include <prometheus/gauge.h> #include <prometheus/gauge.h>
#include <cstdint> #include <unistd.h>
#include <initializer_list> #include <initializer_list>
#include <memory> #include <memory>
@ -15,6 +15,10 @@
namespace zeek::telemetry { namespace zeek::telemetry {
namespace detail {
using CollectCallbackPtr = std::function<double()>;
}
/** /**
* A handle to a metric that can count up and down. * A handle to a metric that can count up and down.
*/ */
@ -26,7 +30,7 @@ public:
using FamilyType = prometheus::Family<Handle>; using FamilyType = prometheus::Family<Handle>;
explicit Gauge(FamilyType* family, const prometheus::Labels& labels, explicit Gauge(FamilyType* family, const prometheus::Labels& labels,
prometheus::CollectCallbackPtr callback = nullptr) noexcept; detail::CollectCallbackPtr callback = nullptr) noexcept;
/** /**
* Increments the value by 1. * Increments the value by 1.
@ -57,6 +61,11 @@ public:
*/ */
void Dec(double amount) noexcept { handle.Decrement(amount); } void Dec(double amount) noexcept { handle.Decrement(amount); }
/**
* Set the value by @p val.
*/
void Set(double val) noexcept { handle.Set(val); }
/** /**
* Decrements the value by 1. * Decrements the value by 1.
* @return The new value. * @return The new value.
@ -73,11 +82,14 @@ public:
bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; }
bool HasCallback() const noexcept { return callback != nullptr; }
double RunCallback() const { return callback(); }
private: private:
FamilyType* family = nullptr; FamilyType* family = nullptr;
Handle& handle; Handle& handle;
prometheus::Labels labels; prometheus::Labels labels;
bool has_callback = false; detail::CollectCallbackPtr callback;
}; };
using GaugePtr = std::shared_ptr<Gauge>; using GaugePtr = std::shared_ptr<Gauge>;
@ -90,18 +102,20 @@ public:
* Returns the metrics handle for given labels, creating a new instance * Returns the metrics handle for given labels, creating a new instance
* lazily if necessary. * lazily if necessary.
*/ */
GaugePtr GetOrAdd(Span<const LabelView> labels, prometheus::CollectCallbackPtr callback = nullptr); GaugePtr GetOrAdd(Span<const LabelView> labels, detail::CollectCallbackPtr callback = nullptr);
/** /**
* @copydoc GetOrAdd * @copydoc GetOrAdd
*/ */
GaugePtr GetOrAdd(std::initializer_list<LabelView> labels, prometheus::CollectCallbackPtr callback = nullptr); GaugePtr GetOrAdd(std::initializer_list<LabelView> labels, detail::CollectCallbackPtr callback = nullptr);
zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::GAUGE; } zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::GAUGE; }
GaugeFamily(prometheus::Family<prometheus::Gauge>* family, Span<const std::string_view> labels) GaugeFamily(prometheus::Family<prometheus::Gauge>* family, Span<const std::string_view> labels)
: MetricFamily(labels), family(family) {} : MetricFamily(labels), family(family) {}
void RunCallbacks() override;
private: private:
prometheus::Family<prometheus::Gauge>* family; prometheus::Family<prometheus::Gauge>* family;
std::vector<GaugePtr> gauges; std::vector<GaugePtr> gauges;

View file

@ -66,6 +66,8 @@ public:
zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::HISTOGRAM; } zeek_int_t MetricType() const noexcept override { return BifEnum::Telemetry::MetricType::HISTOGRAM; }
void RunCallbacks() override {}
private: private:
prometheus::Family<prometheus::Histogram>* family; prometheus::Family<prometheus::Histogram>* family;
prometheus::Histogram::BucketBoundaries boundaries; prometheus::Histogram::BucketBoundaries boundaries;

View file

@ -122,44 +122,19 @@ void Manager::InitPostScript() {
return &telemetry_mgr->current_process_stats; return &telemetry_mgr->current_process_stats;
}; };
rss_gauge = GaugeInstance("process", "resident_memory", {}, "Resident memory size", "bytes", rss_gauge = GaugeInstance("process", "resident_memory", {}, "Resident memory size", "bytes",
[]() -> prometheus::ClientMetric { []() { return static_cast<double>(get_stats()->rss); });
auto* s = get_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->rss);
return metric;
});
vms_gauge = GaugeInstance("process", "virtual_memory", {}, "Virtual memory size", "bytes", vms_gauge = GaugeInstance("process", "virtual_memory", {}, "Virtual memory size", "bytes",
[]() -> prometheus::ClientMetric { []() { return static_cast<double>(get_stats()->vms); });
auto* s = get_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->vms);
return metric;
});
cpu_user_counter = CounterInstance("process", "cpu_user", {}, "Total user CPU time spent", "seconds", cpu_user_counter = CounterInstance("process", "cpu_user", {}, "Total user CPU time spent", "seconds",
[]() -> prometheus::ClientMetric { []() { return get_stats()->cpu_user; });
auto* s = get_stats();
prometheus::ClientMetric metric;
metric.gauge.value = s->cpu_user;
return metric;
});
cpu_system_counter = CounterInstance("process", "cpu_system", {}, "Total system CPU time spent", "seconds", cpu_system_counter = CounterInstance("process", "cpu_system", {}, "Total system CPU time spent", "seconds",
[]() -> prometheus::ClientMetric { []() { return get_stats()->cpu_system; });
auto* s = get_stats();
prometheus::ClientMetric metric;
metric.gauge.value = s->cpu_system;
return metric;
});
fds_gauge = GaugeInstance("process", "open_fds", {}, "Number of open file descriptors", "", fds_gauge = GaugeInstance("process", "open_fds", {}, "Number of open file descriptors", "",
[]() -> prometheus::ClientMetric { []() { return static_cast<double>(get_stats()->fds); });
auto* s = get_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->fds);
return metric;
});
#endif #endif
if ( ! iosource_mgr->RegisterFd(collector_flare.FD(), this) ) { if ( ! iosource_mgr->RegisterFd(collector_flare.FD(), this) ) {
@ -501,7 +476,7 @@ CounterFamilyPtr Manager::CounterFamily(std::string_view prefix, std::string_vie
CounterPtr Manager::CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels, CounterPtr Manager::CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit, std::string_view helptext, std::string_view unit,
prometheus::CollectCallbackPtr callback) { detail::CollectCallbackPtr callback) {
return WithLabelNames(labels, [&, this](auto labelNames) { return WithLabelNames(labels, [&, this](auto labelNames) {
auto family = CounterFamily(prefix, name, labelNames, helptext, unit); auto family = CounterFamily(prefix, name, labelNames, helptext, unit);
return family->GetOrAdd(labels, callback); return family->GetOrAdd(labels, callback);
@ -510,7 +485,7 @@ CounterPtr Manager::CounterInstance(std::string_view prefix, std::string_view na
CounterPtr Manager::CounterInstance(std::string_view prefix, std::string_view name, CounterPtr Manager::CounterInstance(std::string_view prefix, std::string_view name,
std::initializer_list<LabelView> labels, std::string_view helptext, std::initializer_list<LabelView> labels, std::string_view helptext,
std::string_view unit, prometheus::CollectCallbackPtr callback) { std::string_view unit, detail::CollectCallbackPtr callback) {
auto lbl_span = Span{labels.begin(), labels.size()}; auto lbl_span = Span{labels.begin(), labels.size()};
return CounterInstance(prefix, name, lbl_span, helptext, unit, std::move(callback)); return CounterInstance(prefix, name, lbl_span, helptext, unit, std::move(callback));
} }
@ -539,8 +514,7 @@ GaugeFamilyPtr Manager::GaugeFamily(std::string_view prefix, std::string_view na
} }
GaugePtr Manager::GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels, GaugePtr Manager::GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit, std::string_view helptext, std::string_view unit, detail::CollectCallbackPtr callback) {
prometheus::CollectCallbackPtr callback) {
return WithLabelNames(labels, [&, this](auto labelNames) { return WithLabelNames(labels, [&, this](auto labelNames) {
auto family = GaugeFamily(prefix, name, labelNames, helptext, unit); auto family = GaugeFamily(prefix, name, labelNames, helptext, unit);
return family->GetOrAdd(labels, callback); return family->GetOrAdd(labels, callback);
@ -548,8 +522,7 @@ GaugePtr Manager::GaugeInstance(std::string_view prefix, std::string_view name,
} }
GaugePtr Manager::GaugeInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels, GaugePtr Manager::GaugeInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels,
std::string_view helptext, std::string_view unit, std::string_view helptext, std::string_view unit, detail::CollectCallbackPtr callback) {
prometheus::CollectCallbackPtr callback) {
auto lbl_span = Span{labels.begin(), labels.size()}; auto lbl_span = Span{labels.begin(), labels.size()};
return GaugeInstance(prefix, name, lbl_span, helptext, unit, std::move(callback)); return GaugeInstance(prefix, name, lbl_span, helptext, unit, std::move(callback));
} }
@ -598,7 +571,9 @@ void Manager::ProcessFd(int fd, int flags) {
collector_flare.Extinguish(); collector_flare.Extinguish();
prometheus_registry->UpdateViaCallbacks(); for ( const auto& [name, f] : families )
f->RunCallbacks();
collector_response_idx = collector_request_idx; collector_response_idx = collector_request_idx;
lk.unlock(); lk.unlock();

View file

@ -31,6 +31,10 @@ class Registry;
namespace zeek::telemetry { namespace zeek::telemetry {
namespace detail {
using CollectCallbackPtr = std::function<double()>;
}
class ZeekCollectable; class ZeekCollectable;
/** /**
@ -98,12 +102,12 @@ public:
*/ */
CounterPtr CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels, CounterPtr CounterInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "", std::string_view helptext, std::string_view unit = "",
prometheus::CollectCallbackPtr callback = nullptr); detail::CollectCallbackPtr callback = nullptr);
/// @copydoc counterInstance /// @copydoc counterInstance
CounterPtr CounterInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels, CounterPtr CounterInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels,
std::string_view helptext, std::string_view unit = "", std::string_view helptext, std::string_view unit = "",
prometheus::CollectCallbackPtr callback = nullptr); detail::CollectCallbackPtr callback = nullptr);
/** /**
* @return A gauge metric family. Creates the family lazily if necessary. * @return A gauge metric family. Creates the family lazily if necessary.
@ -134,12 +138,12 @@ public:
*/ */
GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels, GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, Span<const LabelView> labels,
std::string_view helptext, std::string_view unit = "", std::string_view helptext, std::string_view unit = "",
prometheus::CollectCallbackPtr callback = nullptr); detail::CollectCallbackPtr callback = nullptr);
/// @copydoc GaugeInstance /// @copydoc GaugeInstance
GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels, GaugePtr GaugeInstance(std::string_view prefix, std::string_view name, std::initializer_list<LabelView> labels,
std::string_view helptext, std::string_view unit = "", std::string_view helptext, std::string_view unit = "",
prometheus::CollectCallbackPtr callback = nullptr); detail::CollectCallbackPtr callback = nullptr);
// Forces the compiler to use the type `Span<const T>` instead of trying to // Forces the compiler to use the type `Span<const T>` instead of trying to
// match parameters to a `span`. // match parameters to a `span`.

View file

@ -22,6 +22,8 @@ public:
std::vector<std::string> LabelNames() const { return label_names; } std::vector<std::string> LabelNames() const { return label_names; }
virtual void RunCallbacks() = 0;
protected: protected:
MetricFamily(Span<const std::string_view> labels) { MetricFamily(Span<const std::string_view> labels) {
for ( const auto& lbl : labels ) for ( const auto& lbl : labels )

View file

@ -5,7 +5,6 @@
#include <string_view> #include <string_view>
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/Val.h"
namespace zeek::telemetry { namespace zeek::telemetry {

View file

@ -76,13 +76,9 @@ void Manager::InitPostScript() {
}; };
num_threads_metric = num_threads_metric =
telemetry_mgr->GaugeInstance("zeek", "msgthread_active_threads", {}, "Number of active threads", "", telemetry_mgr->GaugeInstance("zeek", "msgthread_active_threads", {}, "Number of active threads", "", []() {
[]() -> prometheus::ClientMetric { return thread_mgr ? static_cast<double>(thread_mgr->all_threads.size()) : 0.0;
prometheus::ClientMetric metric; });
metric.gauge.value =
thread_mgr ? static_cast<double>(thread_mgr->all_threads.size()) : 0.0;
return metric;
});
total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads");
total_messages_in_metric = total_messages_in_metric =
@ -91,22 +87,16 @@ void Manager::InitPostScript() {
total_messages_out_metric = total_messages_out_metric =
telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", ""); telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "");
pending_messages_in_metric = pending_messages_in_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {},
telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", "Pending number of inbound messages", "", []() {
"", []() -> prometheus::ClientMetric { auto* s = get_message_thread_stats();
auto* s = get_message_thread_stats(); return static_cast<double>(s->pending_in_total);
prometheus::ClientMetric metric; });
metric.gauge.value = static_cast<double>(s->pending_in_total); pending_messages_out_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_out_messages", {},
return metric; "Pending number of outbound messages", "", []() {
}); auto* s = get_message_thread_stats();
pending_messages_out_metric = return static_cast<double>(s->pending_out_total);
telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_out_messages", {}, });
"Pending number of outbound messages", "", []() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->pending_out_total);
return metric;
});
pending_message_in_buckets_fam = pending_message_in_buckets_fam =
telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"le"}, telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"le"},
@ -126,23 +116,15 @@ void Manager::InitPostScript() {
current_bucketed_messages.pending_out[upper_limit] = 0; current_bucketed_messages.pending_out[upper_limit] = 0;
pending_message_in_buckets[upper_limit] = pending_message_in_buckets[upper_limit] =
pending_message_in_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, pending_message_in_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() {
[upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats();
auto* s = get_message_thread_stats(); return static_cast<double>(s->pending_in.at(upper_limit));
prometheus::ClientMetric metric; });
metric.gauge.value =
static_cast<double>(s->pending_in.at(upper_limit));
return metric;
});
pending_message_out_buckets[upper_limit] = pending_message_out_buckets[upper_limit] =
pending_message_out_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, pending_message_out_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() {
[upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats();
auto* s = get_message_thread_stats(); return static_cast<double>(s->pending_out.at(upper_limit));
prometheus::ClientMetric metric; });
metric.gauge.value =
static_cast<double>(s->pending_out.at(upper_limit));
return metric;
});
} }
} }