telemetry/Manager: Track sent_in and sent_out totals without callback

For terminated threads, the totals would go down once the threads are
removed, which isn't great. Move tracking of sent in and sent out
messages from callback to explicit `Inc()` calls.

Also fixes total_messages_in_metric being initialized twice rather
than total_messages_out_metric.
This commit is contained in:
Arne Welzel 2024-08-06 10:56:07 +02:00
parent c55b2ece8f
commit 351f16c160
3 changed files with 21 additions and 19 deletions

View file

@ -56,8 +56,6 @@ void Manager::InitPostScript() {
for ( const auto& t : thread_mgr->msg_threads ) { for ( const auto& t : thread_mgr->msg_threads ) {
t->GetStats(&thread_stats); t->GetStats(&thread_stats);
thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in;
thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out;
thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in;
thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out;
@ -88,22 +86,10 @@ void Manager::InitPostScript() {
total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads");
total_messages_in_metric = total_messages_in_metric =
telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "");
[]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->sent_in_total);
return metric;
});
total_messages_in_metric = total_messages_out_metric =
telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "");
[]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->sent_out_total);
return metric;
});
pending_messages_in_metric = pending_messages_in_metric =
telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages",
@ -259,6 +245,10 @@ void Manager::StartHeartbeatTimer() {
new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval));
} }
void Manager::MessageIn() { total_messages_in_metric->Inc(); }
void Manager::MessageOut() { total_messages_out_metric->Inc(); }
// Raise everything in here as warnings so it is passed to scriptland without // Raise everything in here as warnings so it is passed to scriptland without
// looking "fatal". In addition to these warnings, ReaderBackend will queue // looking "fatal". In addition to these warnings, ReaderBackend will queue
// one reporter message. // one reporter message.

View file

@ -153,6 +153,16 @@ protected:
*/ */
void StartHeartbeatTimer(); void StartHeartbeatTimer();
/**
* Called by MsgThread::SendIn() to update metrics.
*/
void MessageIn();
/**
* Called by MsgThread::SendOut() to update metrics.
*/
void MessageOut();
private: private:
using all_thread_list = std::list<BasicThread*>; using all_thread_list = std::list<BasicThread*>;
all_thread_list all_threads; all_thread_list all_threads;
@ -181,8 +191,6 @@ private:
std::map<uint64_t, telemetry::GaugePtr> pending_message_out_buckets; std::map<uint64_t, telemetry::GaugePtr> pending_message_out_buckets;
struct BucketedMessages { struct BucketedMessages {
uint64_t sent_in_total;
uint64_t sent_out_total;
uint64_t pending_in_total; uint64_t pending_in_total;
uint64_t pending_out_total; uint64_t pending_out_total;
std::map<uint64_t, uint64_t> pending_in; std::map<uint64_t, uint64_t> pending_in;

View file

@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) {
queue_in.Put(msg); queue_in.Put(msg);
++cnt_sent_in; ++cnt_sent_in;
zeek::thread_mgr->MessageIn();
} }
void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
++cnt_sent_out; ++cnt_sent_out;
zeek::thread_mgr->MessageOut();
if ( io_source ) if ( io_source )
io_source->Fire(); io_source->Fire();
} }