From 351f16c160d8a4ae548b08fe9ccb05fa3225ca89 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 Aug 2024 10:56:07 +0200 Subject: [PATCH] telemetry/Manager: Track sent_in and sent_out totals without callback For terminated threads, the totals would go down once the threads are removed, which isn't great. Move tracking of sent in and sent out messages from callback to explicit `Inc()` calls. Also fixes total_messages_in_metric being initialized twice rather than total_messages_out_metric. --- src/threading/Manager.cc | 24 +++++++----------------- src/threading/Manager.h | 12 ++++++++++-- src/threading/MsgThread.cc | 4 ++++ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index beafba7758..8e67676643 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -56,8 +56,6 @@ void Manager::InitPostScript() { for ( const auto& t : thread_mgr->msg_threads ) { t->GetStats(&thread_stats); - thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; - thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; @@ -88,22 +86,10 @@ void Manager::InitPostScript() { total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_in_total); - return metric; - }); + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", ""); - total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_out_total); - return metric; - }); + total_messages_out_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", ""); pending_messages_in_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", @@ -259,6 +245,10 @@ void Manager::StartHeartbeatTimer() { new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); } +void Manager::MessageIn() { total_messages_in_metric->Inc(); } + +void Manager::MessageOut() { total_messages_out_metric->Inc(); } + // Raise everything in here as warnings so it is passed to scriptland without // looking "fatal". In addition to these warnings, ReaderBackend will queue // one reporter message. diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 3b4763a497..897068bee5 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -153,6 +153,16 @@ protected: */ void StartHeartbeatTimer(); + /** + * Called by MsgThread::SendIn() to update metrics. + */ + void MessageIn(); + + /** + * Called by MsgThread::SendOut() to update metrics. + */ + void MessageOut(); + private: using all_thread_list = std::list; all_thread_list all_threads; @@ -181,8 +191,6 @@ private: std::map pending_message_out_buckets; struct BucketedMessages { - uint64_t sent_in_total; - uint64_t sent_out_total; uint64_t pending_in_total; uint64_t pending_out_total; std::map pending_in; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6854f5f188..5bfb8ebeb4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) { queue_in.Put(msg); ++cnt_sent_in; + + zeek::thread_mgr->MessageIn(); } void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { @@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; + zeek::thread_mgr->MessageOut(); + if ( io_source ) io_source->Fire(); }