From 73f71e652d0da37f016652983225a789c1c81e69 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 5 Aug 2024 13:06:55 -0700 Subject: [PATCH] Make telemetry metrics out of MsgThread statistics --- src/telemetry/Counter.cc | 2 +- src/telemetry/Counter.h | 1 + src/telemetry/Gauge.cc | 2 +- src/telemetry/Gauge.h | 1 + src/threading/Manager.cc | 121 ++++++++++++++++++++++++++++++++++++- src/threading/Manager.h | 32 +++++++++- src/threading/MsgThread.cc | 78 +----------------------- 7 files changed, 155 insertions(+), 82 deletions(-) diff --git a/src/telemetry/Counter.cc b/src/telemetry/Counter.cc index 8b34624254..ef3679329e 100644 --- a/src/telemetry/Counter.cc +++ b/src/telemetry/Counter.cc @@ -3,7 +3,7 @@ using namespace zeek::telemetry; Counter::Counter(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Counter.h b/src/telemetry/Counter.h index f6c49315b7..a0186ad219 100644 --- a/src/telemetry/Counter.h +++ b/src/telemetry/Counter.h @@ -56,6 +56,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/telemetry/Gauge.cc b/src/telemetry/Gauge.cc index 273c9a57bf..114ada3811 100644 --- a/src/telemetry/Gauge.cc +++ b/src/telemetry/Gauge.cc @@ -15,7 +15,7 @@ double Gauge::Value() const noexcept { Gauge::Gauge(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Gauge.h b/src/telemetry/Gauge.h index 900cb7b784..652ff72667 100644 --- a/src/telemetry/Gauge.h +++ b/src/telemetry/Gauge.h @@ -74,6 +74,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 36a4ced939..f97427b08c 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -23,6 +23,9 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) { } // namespace detail +static std::vector pending_bucket_brackets = {1, 10, 100, + 1000, 10000, std::numeric_limits::infinity()}; + Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); @@ -38,14 +41,128 @@ Manager::~Manager() { } void Manager::InitPostScript() { + static auto get_message_thread_stats = []() -> const BucketedMessages* { + if ( ! thread_mgr->terminating ) { + double now = util::current_time(); + if ( thread_mgr->bucketed_messages_last_updated < now - 1 ) { + thread_mgr->current_bucketed_messages.pending_in_total = 0; + thread_mgr->current_bucketed_messages.pending_out_total = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_in ) + m.second = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_out ) + m.second = 0; + + MsgThread::Stats thread_stats; + for ( const auto& t : thread_mgr->msg_threads ) { + t->GetStats(&thread_stats); + + thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; + thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; + thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; + thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; + + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_in < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; + break; + } + } + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_out < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; + break; + } + } + } + + thread_mgr->bucketed_messages_last_updated = now; + } + } + + return &thread_mgr->current_bucketed_messages; + }; + num_threads_metric = - telemetry_mgr->GaugeInstance("zeek", "active_threads", {}, "Number of active threads", "", + telemetry_mgr->GaugeInstance("zeek", "msgthread_active_threads", {}, "Number of active threads", "", []() -> prometheus::ClientMetric { prometheus::ClientMetric metric; metric.gauge.value = thread_mgr ? static_cast(thread_mgr->all_threads.size()) : 0.0; return metric; }); + + total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_in_total); + return metric; + }); + + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_out_total); + return metric; + }); + + pending_messages_in_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", + "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_in_total); + return metric; + }); + pending_messages_out_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_out_messages", {}, + "Pending number of outbound messages", "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_out_total); + return metric; + }); + + pending_message_in_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, + "Number of threads with pending inbound messages split into buckets"); + pending_message_out_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, + "Number of threads with pending outbound messages split into buckets"); + + for ( auto upper_limit : pending_bucket_brackets ) { + std::string upper_limit_str; + if ( upper_limit == std::numeric_limits::infinity() ) + upper_limit_str = "inf"; + else + upper_limit_str = std::to_string(upper_limit); + + current_bucketed_messages.pending_in[upper_limit] = 0; + current_bucketed_messages.pending_out[upper_limit] = 0; + + pending_message_in_buckets[upper_limit] = + pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_in.at(upper_limit)); + return metric; + }); + pending_message_out_buckets[upper_limit] = + pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_out.at(upper_limit)); + return metric; + }); + } } void Manager::Terminate() { @@ -90,6 +207,8 @@ void Manager::AddThread(BasicThread* thread) { if ( ! heartbeat_timer_running ) StartHeartbeatTimer(); + + total_threads_metric->Inc(); } void Manager::AddMsgThread(MsgThread* thread) { diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 306063081f..3b4763a497 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "zeek/Timer.h" @@ -10,7 +11,12 @@ namespace zeek { namespace telemetry { class Gauge; -} +using GaugePtr = std::shared_ptr; +class GaugeFamily; +using GaugeFamilyPtr = std::shared_ptr; +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry namespace threading { namespace detail { @@ -162,7 +168,29 @@ private: msg_stats_list stats; bool heartbeat_timer_running = false; - std::shared_ptr num_threads_metric; + telemetry::GaugePtr num_threads_metric; + telemetry::CounterPtr total_threads_metric; + telemetry::CounterPtr total_messages_in_metric; + telemetry::CounterPtr total_messages_out_metric; + telemetry::GaugePtr pending_messages_in_metric; + telemetry::GaugePtr pending_messages_out_metric; + + telemetry::GaugeFamilyPtr pending_message_in_buckets_fam; + telemetry::GaugeFamilyPtr pending_message_out_buckets_fam; + std::map pending_message_in_buckets; + std::map pending_message_out_buckets; + + struct BucketedMessages { + uint64_t sent_in_total; + uint64_t sent_out_total; + uint64_t pending_in_total; + uint64_t pending_out_total; + std::map pending_in; + std::map pending_out; + }; + + BucketedMessages current_bucketed_messages; + double bucketed_messages_last_updated = 0.0; }; } // namespace threading diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index c33c9d5bd7..6854f5f188 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -9,6 +9,7 @@ #include "zeek/Obj.h" #include "zeek/RunState.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" #include "zeek/threading/Manager.h" // Set by Zeek's main signal handler. @@ -229,83 +230,6 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp // Register IOSource as non-counting lifetime managed IO source. iosource_mgr->Register(io_source, true); - - cnt_sent_in_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_in", {{"thread_name", Name()}}, - "Number of messages sent into thread"); - cnt_sent_out_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_out", {{"thread_name", Name()}}, - "Number of messages sent from thread"); - pending_in_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, - "Number of pending messages sent into thread", "", - [this]() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(queue_in.Size()); - return metric; - }); - pending_out_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, - "Number of pending messages sent from thread", "", - [this]() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(queue_out.Size()); - return metric; - }); - - static auto get_queue_in_stats = [this]() -> const Queue::Stats { - double now = util::current_time(); - if ( this->queue_in_stats_last_updated < now - 0.01 ) { - queue_in.GetStats(&queue_in_last_stats); - this->queue_in_stats_last_updated = now; - } - - return queue_in_last_stats; - }; - - queue_in_num_reads_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_reads", {{"thread_name", Name()}}, - "Number of reads from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_in_stats(); - metric.gauge.value = static_cast(stats.num_reads); - return metric; - }); - queue_in_num_writes_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_writes", {{"thread_name", Name()}}, - "Number of writes from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_in_stats(); - metric.gauge.value = static_cast(stats.num_writes); - return metric; - }); - - static auto get_queue_out_stats = [this]() -> const Queue::Stats { - double now = util::current_time(); - if ( this->queue_out_stats_last_updated < now - 0.01 ) { - queue_out.GetStats(&queue_out_last_stats); - this->queue_out_stats_last_updated = now; - } - - return queue_out_last_stats; - }; - - queue_out_num_reads_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_reads", {{"thread_name", Name()}}, - "Number of reads from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_out_stats(); - metric.gauge.value = static_cast(stats.num_reads); - return metric; - }); - queue_out_num_writes_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_writes", {{"thread_name", Name()}}, - "Number of writes from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_out_stats(); - metric.gauge.value = static_cast(stats.num_writes); - return metric; - }); } MsgThread::~MsgThread() {