diff --git a/CHANGES b/CHANGES index dfc7fb6e91..d069106207 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,47 @@ +7.1.0-dev.105 | 2024-08-07 10:54:10 +0200 + + * telemetry/Manager: Check RegisterFd() return value (Arne Welzel, Corelight) + + Please coverity. + + * telemetry/Manager: Track sent_in and sent_out totals without callback (Arne Welzel, Corelight) + + For terminated threads, the totals would go down once the threads are + removed, which isn't great. Move tracking of sent in and sent out + messages from callback to explicit `Inc()` calls. + + Also fixes total_messages_in_metric being initialized twice rather + than total_messages_out_metric. + + * threading/Manager: Switch inf bucket from infinity() to max() (Arne Welzel, Corelight) + + For uint64_t, std::numeric_limits::has_infinity is false and infinity() + actually returns 0. Use uint64_t's max() instead. We could cast to double + and use the double infinity, but this seems reasonable, too. + + This was found while trying to provoke some pending messages and being + confused why all but the "inf" bucket increased. + + * threading/Manager: "lt" to "le" and do not break (Arne Welzel, Corelight) + + The buckets are specified as lower-equal (changed from lower-than now), + which means we shouldn't break: The larger "le" bucket contains all previous + buckets, too. The "inf" bucket represents the current number of threads. + + For example, with a total of 10 threads, 5 threads with 0 messages pending, + another 4 threads with 50 messages, and on with 2000 messages, the metrics + would end end up as follows: + + pending_buckets{le=1} = 5 + pending_buckets{le=10} = 5 + pending_buckets{le=100} = 9 + pending_buckets{le=1000} = 9 + pending_buckets{le=10000} = 10 + pending_buckets{le=inf} = 10 + + This might be strange initially, but aligns with the Prometheus + histogram approach (though we're using gauges here). + 7.1.0-dev.99 | 2024-08-06 20:08:37 +0200 * Bump auxil/spicy to latest development snapshot (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index 8652984ad5..7f7d6edcc2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.1.0-dev.99 +7.1.0-dev.105 diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 69b4ed485a..7b19affc82 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -162,7 +162,9 @@ void Manager::InitPostScript() { }); #endif - iosource_mgr->RegisterFd(collector_flare.FD(), this); + if ( ! iosource_mgr->RegisterFd(collector_flare.FD(), this) ) { + reporter->FatalError("Failed to register telemetry collector descriptor"); + } } void Manager::Terminate() { diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 5499cbb8ed..8e67676643 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -2,6 +2,8 @@ #include #include +#include +#include #include "zeek/Event.h" #include "zeek/IPAddr.h" @@ -22,8 +24,7 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) { } // namespace detail -static std::vector pending_bucket_brackets = {1, 10, 100, - 1000, 10000, std::numeric_limits::infinity()}; +static std::vector pending_bucket_brackets = {1, 10, 100, 1000, 10000, std::numeric_limits::max()}; Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); @@ -55,22 +56,15 @@ void Manager::InitPostScript() { for ( const auto& t : thread_mgr->msg_threads ) { t->GetStats(&thread_stats); - thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; - thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; for ( auto upper_limit : pending_bucket_brackets ) { - if ( thread_stats.pending_in < upper_limit ) { + if ( thread_stats.pending_in <= upper_limit ) thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; - break; - } - } - for ( auto upper_limit : pending_bucket_brackets ) { - if ( thread_stats.pending_out < upper_limit ) { + + if ( thread_stats.pending_out <= upper_limit ) thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; - break; - } } } @@ -92,22 +86,10 @@ void Manager::InitPostScript() { total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_in_total); - return metric; - }); + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", ""); - total_messages_in_metric = - telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", - []() -> prometheus::ClientMetric { - auto* s = get_message_thread_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(s->sent_out_total); - return metric; - }); + total_messages_out_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", ""); pending_messages_in_metric = telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", @@ -127,15 +109,15 @@ void Manager::InitPostScript() { }); pending_message_in_buckets_fam = - telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"le"}, "Number of threads with pending inbound messages split into buckets"); pending_message_out_buckets_fam = - telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"le"}, "Number of threads with pending outbound messages split into buckets"); for ( auto upper_limit : pending_bucket_brackets ) { std::string upper_limit_str; - if ( upper_limit == std::numeric_limits::infinity() ) + if ( upper_limit == std::numeric_limits::max() ) upper_limit_str = "inf"; else upper_limit_str = std::to_string(upper_limit); @@ -144,7 +126,7 @@ void Manager::InitPostScript() { current_bucketed_messages.pending_out[upper_limit] = 0; pending_message_in_buckets[upper_limit] = - pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + pending_message_in_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats(); prometheus::ClientMetric metric; @@ -153,7 +135,7 @@ void Manager::InitPostScript() { return metric; }); pending_message_out_buckets[upper_limit] = - pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + pending_message_out_buckets_fam->GetOrAdd({{"le", upper_limit_str}}, [upper_limit]() -> prometheus::ClientMetric { auto* s = get_message_thread_stats(); prometheus::ClientMetric metric; @@ -263,6 +245,10 @@ void Manager::StartHeartbeatTimer() { new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); } +void Manager::MessageIn() { total_messages_in_metric->Inc(); } + +void Manager::MessageOut() { total_messages_out_metric->Inc(); } + // Raise everything in here as warnings so it is passed to scriptland without // looking "fatal". In addition to these warnings, ReaderBackend will queue // one reporter message. diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 3b4763a497..897068bee5 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -153,6 +153,16 @@ protected: */ void StartHeartbeatTimer(); + /** + * Called by MsgThread::SendIn() to update metrics. + */ + void MessageIn(); + + /** + * Called by MsgThread::SendOut() to update metrics. + */ + void MessageOut(); + private: using all_thread_list = std::list; all_thread_list all_threads; @@ -181,8 +191,6 @@ private: std::map pending_message_out_buckets; struct BucketedMessages { - uint64_t sent_in_total; - uint64_t sent_out_total; uint64_t pending_in_total; uint64_t pending_out_total; std::map pending_in; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6854f5f188..5bfb8ebeb4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) { queue_in.Put(msg); ++cnt_sent_in; + + zeek::thread_mgr->MessageIn(); } void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { @@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; + zeek::thread_mgr->MessageOut(); + if ( io_source ) io_source->Fire(); }