Merge remote-tracking branch 'origin/topic/awelzel/threading-manager-metrics-follow-up'

* origin/topic/awelzel/threading-manager-metrics-follow-up:
  telemetry/Manager: Check RegisterFd() return value
  telemetry/Manager: Track sent_in and sent_out totals without callback
  threading/Manager: Switch inf bucket from infinity() to max()
  threading/Manager: "lt" to "le" and do not break
This commit is contained in:
Arne Welzel 2024-08-07 10:54:10 +02:00
commit b0df736ba7
6 changed files with 80 additions and 36 deletions

44
CHANGES
View file

@ -1,3 +1,47 @@
7.1.0-dev.105 | 2024-08-07 10:54:10 +0200
* telemetry/Manager: Check RegisterFd() return value (Arne Welzel, Corelight)
Please coverity.
* telemetry/Manager: Track sent_in and sent_out totals without callback (Arne Welzel, Corelight)
For terminated threads, the totals would go down once the threads are
removed, which isn't great. Move tracking of sent in and sent out
messages from callback to explicit `Inc()` calls.
Also fixes total_messages_in_metric being initialized twice rather
than total_messages_out_metric.
* threading/Manager: Switch inf bucket from infinity() to max() (Arne Welzel, Corelight)
For uint64_t, std::numeric_limits<T>::has_infinity is false and infinity()
actually returns 0. Use uint64_t's max() instead. We could cast to double
and use the double infinity, but this seems reasonable, too.
This was found while trying to provoke some pending messages and being
confused why all but the "inf" bucket increased.
* threading/Manager: "lt" to "le" and do not break (Arne Welzel, Corelight)
The buckets are specified as lower-equal (changed from lower-than now),
which means we shouldn't break: The larger "le" bucket contains all previous
buckets, too. The "inf" bucket represents the current number of threads.
For example, with a total of 10 threads, 5 threads with 0 messages pending,
another 4 threads with 50 messages, and on with 2000 messages, the metrics
would end end up as follows:
pending_buckets{le=1} = 5
pending_buckets{le=10} = 5
pending_buckets{le=100} = 9
pending_buckets{le=1000} = 9
pending_buckets{le=10000} = 10
pending_buckets{le=inf} = 10
This might be strange initially, but aligns with the Prometheus
histogram approach (though we're using gauges here).
7.1.0-dev.99 | 2024-08-06 20:08:37 +0200 7.1.0-dev.99 | 2024-08-06 20:08:37 +0200
* Bump auxil/spicy to latest development snapshot (Arne Welzel, Corelight) * Bump auxil/spicy to latest development snapshot (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.1.0-dev.99 7.1.0-dev.105

View file

@ -162,7 +162,9 @@ void Manager::InitPostScript() {
}); });
#endif #endif
iosource_mgr->RegisterFd(collector_flare.FD(), this); if ( ! iosource_mgr->RegisterFd(collector_flare.FD(), this) ) {
reporter->FatalError("Failed to register telemetry collector descriptor");
}
} }
void Manager::Terminate() { void Manager::Terminate() {

View file

@ -2,6 +2,8 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include <cstdint>
#include <limits>
#include "zeek/Event.h" #include "zeek/Event.h"
#include "zeek/IPAddr.h" #include "zeek/IPAddr.h"
@ -22,8 +24,7 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) {
} // namespace detail } // namespace detail
static std::vector<uint64_t> pending_bucket_brackets = {1, 10, 100, static std::vector<uint64_t> pending_bucket_brackets = {1, 10, 100, 1000, 10000, std::numeric_limits<uint64_t>::max()};
1000, 10000, std::numeric_limits<uint64_t>::infinity()};
Manager::Manager() { Manager::Manager() {
DBG_LOG(DBG_THREADING, "Creating thread manager ..."); DBG_LOG(DBG_THREADING, "Creating thread manager ...");
@ -55,22 +56,15 @@ void Manager::InitPostScript() {
for ( const auto& t : thread_mgr->msg_threads ) { for ( const auto& t : thread_mgr->msg_threads ) {
t->GetStats(&thread_stats); t->GetStats(&thread_stats);
thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in;
thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out;
thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in;
thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out;
for ( auto upper_limit : pending_bucket_brackets ) { for ( auto upper_limit : pending_bucket_brackets ) {
if ( thread_stats.pending_in < upper_limit ) { if ( thread_stats.pending_in <= upper_limit )
thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; thread_mgr->current_bucketed_messages.pending_in[upper_limit]++;
break;
} if ( thread_stats.pending_out <= upper_limit )
}
for ( auto upper_limit : pending_bucket_brackets ) {
if ( thread_stats.pending_out < upper_limit ) {
thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; thread_mgr->current_bucketed_messages.pending_out[upper_limit]++;
break;
}
} }
} }
@ -92,22 +86,10 @@ void Manager::InitPostScript() {
total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads");
total_messages_in_metric = total_messages_in_metric =
telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "");
[]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->sent_in_total);
return metric;
});
total_messages_in_metric = total_messages_out_metric =
telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "");
[]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats();
prometheus::ClientMetric metric;
metric.gauge.value = static_cast<double>(s->sent_out_total);
return metric;
});
pending_messages_in_metric = pending_messages_in_metric =
telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages",
@ -127,15 +109,15 @@ void Manager::InitPostScript() {
}); });
pending_message_in_buckets_fam = pending_message_in_buckets_fam =
telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"le"},
"Number of threads with pending inbound messages split into buckets"); "Number of threads with pending inbound messages split into buckets");
pending_message_out_buckets_fam = pending_message_out_buckets_fam =
telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"le"},
"Number of threads with pending outbound messages split into buckets"); "Number of threads with pending outbound messages split into buckets");
for ( auto upper_limit : pending_bucket_brackets ) { for ( auto upper_limit : pending_bucket_brackets ) {
std::string upper_limit_str; std::string upper_limit_str;
if ( upper_limit == std::numeric_limits<uint64_t>::infinity() ) if ( upper_limit == std::numeric_limits<uint64_t>::max() )
upper_limit_str = "inf"; upper_limit_str = "inf";
else else
upper_limit_str = std::to_string(upper_limit); upper_limit_str = std::to_string(upper_limit);
@ -144,7 +126,7 @@ void Manager::InitPostScript() {
current_bucketed_messages.pending_out[upper_limit] = 0; current_bucketed_messages.pending_out[upper_limit] = 0;
pending_message_in_buckets[upper_limit] = pending_message_in_buckets[upper_limit] =
pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, pending_message_in_buckets_fam->GetOrAdd({{"le", upper_limit_str}},
[upper_limit]() -> prometheus::ClientMetric { [upper_limit]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats(); auto* s = get_message_thread_stats();
prometheus::ClientMetric metric; prometheus::ClientMetric metric;
@ -153,7 +135,7 @@ void Manager::InitPostScript() {
return metric; return metric;
}); });
pending_message_out_buckets[upper_limit] = pending_message_out_buckets[upper_limit] =
pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, pending_message_out_buckets_fam->GetOrAdd({{"le", upper_limit_str}},
[upper_limit]() -> prometheus::ClientMetric { [upper_limit]() -> prometheus::ClientMetric {
auto* s = get_message_thread_stats(); auto* s = get_message_thread_stats();
prometheus::ClientMetric metric; prometheus::ClientMetric metric;
@ -263,6 +245,10 @@ void Manager::StartHeartbeatTimer() {
new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval)); new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval));
} }
void Manager::MessageIn() { total_messages_in_metric->Inc(); }
void Manager::MessageOut() { total_messages_out_metric->Inc(); }
// Raise everything in here as warnings so it is passed to scriptland without // Raise everything in here as warnings so it is passed to scriptland without
// looking "fatal". In addition to these warnings, ReaderBackend will queue // looking "fatal". In addition to these warnings, ReaderBackend will queue
// one reporter message. // one reporter message.

View file

@ -153,6 +153,16 @@ protected:
*/ */
void StartHeartbeatTimer(); void StartHeartbeatTimer();
/**
* Called by MsgThread::SendIn() to update metrics.
*/
void MessageIn();
/**
* Called by MsgThread::SendOut() to update metrics.
*/
void MessageOut();
private: private:
using all_thread_list = std::list<BasicThread*>; using all_thread_list = std::list<BasicThread*>;
all_thread_list all_threads; all_thread_list all_threads;
@ -181,8 +191,6 @@ private:
std::map<uint64_t, telemetry::GaugePtr> pending_message_out_buckets; std::map<uint64_t, telemetry::GaugePtr> pending_message_out_buckets;
struct BucketedMessages { struct BucketedMessages {
uint64_t sent_in_total;
uint64_t sent_out_total;
uint64_t pending_in_total; uint64_t pending_in_total;
uint64_t pending_out_total; uint64_t pending_out_total;
std::map<uint64_t, uint64_t> pending_in; std::map<uint64_t, uint64_t> pending_in;

View file

@ -388,6 +388,8 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) {
queue_in.Put(msg); queue_in.Put(msg);
++cnt_sent_in; ++cnt_sent_in;
zeek::thread_mgr->MessageIn();
} }
void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
@ -400,6 +402,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
++cnt_sent_out; ++cnt_sent_out;
zeek::thread_mgr->MessageOut();
if ( io_source ) if ( io_source )
io_source->Fire(); io_source->Fire();
} }