From 98480cf3393d3cf2b72411c41c53f5e70ff129e1 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 Aug 2024 09:30:29 +0200 Subject: [PATCH 1/4] threading/Manager: "lt" to "le" and do not break The buckets are specified as lower-equal (changed from lower-than now), which means we shouldn't break: The larger "le" bucket contains all previous buckets, too. The "inf" bucket represents the current number of threads. For example, with a total of 10 threads, 5 threads with 0 messages pending, another 4 threads with 50 messages, and on with 2000 messages, the metrics would end end up as follows: pending_buckets{le=1} = 5 pending_buckets{le=10} = 5 pending_buckets{le=100} = 9 pending_buckets{le=1000} = 9 pending_buckets{le=10000} = 10 pending_buckets{le=inf} = 10 This might be strange initially, but aligns with the Prometheus histogram approach (though we're using gauges here). --- src/threading/Manager.cc | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 5499cbb8ed..de6e7008b9 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -61,16 +61,11 @@ void Manager::InitPostScript() { thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; for ( auto upper_limit : pending_bucket_brackets ) { - if ( thread_stats.pending_in < upper_limit ) { + if ( thread_stats.pending_in <= upper_limit ) thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; - break; - } - } - for ( auto upper_limit : pending_bucket_brackets ) { - if ( thread_stats.pending_out < upper_limit ) { + + if ( thread_stats.pending_out <= upper_limit ) thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; - break; - } } } @@ -127,10 +122,10 @@ void Manager::InitPostScript() { }); pending_message_in_buckets_fam = - telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"le"}, "Number of threads with pending inbound messages split into buckets"); pending_message_out_buckets_fam = - telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"le"}, "Number of threads with pending outbound messages split into buckets"); for ( auto upper_limit : pending_bucket_brackets ) { @@ -144,7 +139,7 @@ void Manager::InitPostScript() { current_bucketed_messages.pending_out[upper_limit] = 0; pending_message_in_buckets[upper_limit] = - pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + pending_message_in_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats(); prometheus::ClientMetric metric; @@ -153,7 +148,7 @@ void Manager::InitPostScript() { return metric; }); pending_message_out_buckets[upper_limit] = - pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + pending_message_out_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats(); prometheus::ClientMetric metric; From c55b2ece8f47107de4ec88e973e63d1f667ce430 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 Aug 2024 10:38:18 +0200 Subject: [PATCH 2/4] threading/Manager: Switch inf bucket from infinity() to max() For uint64_t, std::numeric_limits::has_infinity is false and infinity() actually returns 0. Use uint64_t's max() instead. We could cast to double and use the double infinity, but this seems reasonable, too. This was found while trying to provoke some pending messages and being confused why all but the "inf" bucket increased. --- src/threading/Manager.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index de6e7008b9..beafba7758 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -2,6 +2,8 @@ #include #include +#include +#include #include "zeek/Event.h" #include "zeek/IPAddr.h" @@ -22,8 +24,7 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) { } // namespace detail -static std::vector pending_bucket_brackets = {1, 10, 100, - 1000, 10000, std::numeric_limits::infinity()}; +static std::vector pending_bucket_brackets = {1, 10, 100, 1000, 10000, std::numeric_limits::max()}; Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); @@ -130,7 +131,7 @@ void Manager::InitPostScript() { for ( auto upper_limit : pending_bucket_brackets ) { std::string upper_limit_str; - if ( upper_limit == std::numeric_limits::infinity() ) + if ( upper_limit == std::numeric_limits::max() ) upper_limit_str = "inf"; else upper_limit_str = std::to_string(upper_limit); From 351f16c160d8a4ae548b08fe9ccb05fa3225ca89 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 Aug 2024 10:56:07 +0200 Subject: [PATCH 3/4] telemetry/Manager: Track sent_in and sent_out totals without callback For terminated threads, the totals would go down once the threads are removed, which isn't great. Move tracking of sent in and sent out messages from callback to explicit `Inc()` calls. Also fixes total_messages_in_metric being initialized twice rather than total_messages_out_metric. --- src/threading/Manager.cc | 24 +++++++----------------- src/threading/Manager.h | 12 ++++++++++-- src/threading/MsgThread.cc | 4 ++++ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index beafba7758..8e67676643 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -56,8 +56,6 @@ void Manager::InitPostScript() { for ( const auto& t : thread_mgr->msg_threads ) { t->GetStats(&thread_stats); - thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; - thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; @@ -88,22 +86,10 @@ void Manager::InitPostScript() { total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_in_total); - return metric; - }); + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", ""); - total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_out_total); - return metric; - }); + total_messages_out_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", ""); pending_messages_in_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", @@ -259,6 +245,10 @@ void Manager::StartHeartbeatTimer() { new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); } +void Manager::MessageIn() { total_messages_in_metric->Inc(); } + +void Manager::MessageOut() { total_messages_out_metric->Inc(); } + // Raise everything in here as warnings so it is passed to scriptland without // looking "fatal". In addition to these warnings, ReaderBackend will queue // one reporter message. diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 3b4763a497..897068bee5 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -153,6 +153,16 @@ protected: */ void StartHeartbeatTimer(); + /** + * Called by MsgThread::SendIn() to update metrics. + */ + void MessageIn(); + + /** + * Called by MsgThread::SendOut() to update metrics. + */ + void MessageOut(); + private: using all_thread_list = std::list; all_thread_list all_threads; @@ -181,8 +191,6 @@ private: std::map pending_message_out_buckets; struct BucketedMessages { - uint64_t sent_in_total; - uint64_t sent_out_total; uint64_t pending_in_total; uint64_t pending_out_total; std::map pending_in; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6854f5f188..5bfb8ebeb4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) { queue_in.Put(msg); ++cnt_sent_in; + + zeek::thread_mgr->MessageIn(); } void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { @@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; + zeek::thread_mgr->MessageOut(); + if ( io_source ) io_source->Fire(); } From 4fe9580a7ee7665757b50f513af0d2752a2da1db Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 7 Aug 2024 09:35:37 +0200 Subject: [PATCH 4/4] telemetry/Manager: Check RegisterFd() return value Please coverity. --- src/telemetry/Manager.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 69b4ed485a..7b19affc82 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -162,7 +162,9 @@ void Manager::InitPostScript() { }); #endif - iosource_mgr->RegisterFd(collector_flare.FD(), this); + if ( ! iosource_mgr->RegisterFd(collector_flare.FD(), this) ) { + reporter->FatalError("Failed to register telemetry collector descriptor"); + } } void Manager::Terminate() {