diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index beafba7758..8e67676643 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -56,8 +56,6 @@ void Manager::InitPostScript() { for ( const auto& t : thread_mgr->msg_threads ) { t->GetStats(&thread_stats); - thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; - thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; @@ -88,22 +86,10 @@ void Manager::InitPostScript() { total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_in_total); - return metric; - }); + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", ""); - total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_out_total); - return metric; - }); + total_messages_out_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", ""); pending_messages_in_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", @@ -259,6 +245,10 @@ void Manager::StartHeartbeatTimer() { new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); } +void Manager::MessageIn() { total_messages_in_metric->Inc(); } + +void Manager::MessageOut() { total_messages_out_metric->Inc(); } + // Raise everything in here as warnings so it is passed to scriptland without // looking "fatal". In addition to these warnings, ReaderBackend will queue // one reporter message. diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 3b4763a497..897068bee5 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -153,6 +153,16 @@ protected: */ void StartHeartbeatTimer(); + /** + * Called by MsgThread::SendIn() to update metrics. + */ + void MessageIn(); + + /** + * Called by MsgThread::SendOut() to update metrics. + */ + void MessageOut(); + private: using all_thread_list = std::list; all_thread_list all_threads; @@ -181,8 +191,6 @@ private: std::map pending_message_out_buckets; struct BucketedMessages { - uint64_t sent_in_total; - uint64_t sent_out_total; uint64_t pending_in_total; uint64_t pending_out_total; std::map pending_in; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6854f5f188..5bfb8ebeb4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) { queue_in.Put(msg); ++cnt_sent_in; + + zeek::thread_mgr->MessageIn(); } void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { @@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; + zeek::thread_mgr->MessageOut(); + if ( io_source ) io_source->Fire(); }