From 77c05357b5dc45381e90edb0b17a3acdab55bfb5 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 30 May 2024 16:00:41 -0700 Subject: [PATCH 01/11] Move pulling of global state inside 'expensive' check for stats --- src/Stats.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Stats.cc b/src/Stats.cc index 7f49585121..fb7766eaac 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -213,14 +213,12 @@ void ProfileLogger::Log() { cs.num_events_outgoing, cs.num_logs_incoming, cs.num_logs_outgoing, cs.num_ids_incoming, cs.num_ids_outgoing)); - // Script-level state. - const auto& globals = global_scope()->Vars(); - if ( expensive ) { + // Script-level state. int total_table_entries = 0; int total_table_rentries = 0; - for ( const auto& global : globals ) { + for ( const auto& global : global_scope()->Vars() ) { auto& id = global.second; // We don't show/count internal globals as they are always From 8b4af064840f32e88f030d8fa9f4d707440792d3 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 28 May 2024 16:53:10 -0700 Subject: [PATCH 02/11] Move trigger stats to telemetry instruments --- src/Trigger.cc | 19 ++++++++++++++++--- src/Trigger.h | 10 +++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/Trigger.cc b/src/Trigger.cc index dae99bab37..3dfe76e2dc 100644 --- a/src/Trigger.cc +++ b/src/Trigger.cc @@ -13,6 +13,7 @@ #include "zeek/Traverse.h" #include "zeek/Val.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" using namespace zeek::detail; using namespace zeek::detail::trigger; @@ -437,7 +438,19 @@ Manager::Manager() : iosource::IOSource() { pending = new TriggerList(); } Manager::~Manager() { delete pending; } -void Manager::InitPostScript() { iosource_mgr->Register(this, true); } +void Manager::InitPostScript() { + trigger_count = telemetry_mgr->CounterInstance("zeek", "triggers", {}, "Total number of triggers scheduled"); + trigger_pending = + telemetry_mgr->GaugeInstance("zeek", "pending_triggers", {}, "Pending number of triggers", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + trigger_mgr ? static_cast(trigger_mgr->pending->size()) : 0.0; + return metric; + }); + + iosource_mgr->Register(this, true); +} double Manager::GetNextTimeout() { return pending->empty() ? -1 : run_state::network_time + 0.100; } @@ -468,13 +481,13 @@ void Manager::Queue(Trigger* trigger) { if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) { Ref(trigger); pending->push_back(trigger); - total_triggers++; + trigger_count->Inc(); iosource_mgr->Wakeup(Tag()); } } void Manager::GetStats(Stats* stats) { - stats->total = total_triggers; + stats->total = static_cast(trigger_count->Value()); stats->pending = pending->size(); } diff --git a/src/Trigger.h b/src/Trigger.h index 6c4fade3be..5955cc25b4 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -18,6 +18,13 @@ class Val; using ValPtr = IntrusivePtr; +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + namespace detail { class Frame; @@ -187,7 +194,8 @@ public: private: using TriggerList = std::list; TriggerList* pending; - unsigned long total_triggers = 0; + telemetry::CounterPtr trigger_count; + telemetry::GaugePtr trigger_pending; }; } // namespace trigger From d1f7999f61137bae6070c352e421969d97d1a26f Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Tue, 28 May 2024 19:52:49 -0700 Subject: [PATCH 03/11] Move dns_mgr stats to telemetry instruments --- src/DNS_Mgr.cc | 111 ++++++++++++++++++++++++++++++++++++++----------- src/DNS_Mgr.h | 38 +++++++++++++---- src/Stats.cc | 2 +- src/stats.bif | 8 ++-- 4 files changed, 122 insertions(+), 37 deletions(-) diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc index a707d8ead0..33da7dd0ae 100644 --- a/src/DNS_Mgr.cc +++ b/src/DNS_Mgr.cc @@ -45,6 +45,7 @@ using ztd::out_ptr::out_ptr; #include "zeek/Val.h" #include "zeek/ZeekString.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" // Number of seconds we'll wait for a reply. constexpr int DNS_TIMEOUT = 5; @@ -545,6 +546,55 @@ void DNS_Mgr::InitSource() { } void DNS_Mgr::InitPostScript() { + num_requests_metric = + telemetry_mgr->CounterInstance("zeek", "dnsmgr_requests", {}, "Total number of requests through DNS_Mgr"); + successful_metric = telemetry_mgr->CounterInstance("zeek", "dnsmgr_successful_requests", {}, + "Total number of successful requests through DNS_Mgr"); + failed_metric = telemetry_mgr->CounterInstance("zeek", "dnsmgr_failed_requests", {}, + "Total number of failed requests through DNS_Mgr"); + asyncs_pending_metric = telemetry_mgr->GaugeInstance("zeek", "dnsmgr_pending_asyncs_requests", {}, + "Number of pending async requests through DNS_Mgr"); + + cached_hosts_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "host"}}, + "Number of cached hosts in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = static_cast(dns_mgr->last_cached_stats.hosts); + } + return metric; + }); + + cached_addresses_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "address"}}, + "Number of cached addresses in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = + static_cast(dns_mgr->last_cached_stats.addresses); + } + return metric; + }); + + cached_texts_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "text"}}, + "Number of cached texts in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = static_cast(dns_mgr->last_cached_stats.texts); + } + return metric; + }); + if ( ! doctest::is_running_in_test ) { dm_rec = id::find_type("dns_mapping"); @@ -1158,7 +1208,7 @@ void DNS_Mgr::IssueAsyncRequests() { AsyncRequest* req = asyncs_queued.front(); asyncs_queued.pop_front(); - ++num_requests; + num_requests_metric->Inc(); req->time = util::current_time(); if ( req->type == T_PTR ) @@ -1173,6 +1223,7 @@ void DNS_Mgr::IssueAsyncRequests() { dns_req->MakeRequest(channel, this); ++asyncs_pending; + asyncs_pending_metric->Inc(); } } @@ -1182,11 +1233,11 @@ void DNS_Mgr::CheckAsyncHostRequest(const std::string& host, bool timeout) { if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto addrs = LookupNameInCache(host, true, false) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(addrs); } else @@ -1195,6 +1246,7 @@ void DNS_Mgr::CheckAsyncHostRequest(const std::string& host, bool timeout) { delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1207,11 +1259,11 @@ void DNS_Mgr::CheckAsyncAddrRequest(const IPAddr& addr, bool timeout) { if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto name = LookupAddrInCache(addr, true, false) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(name->CheckString()); } else @@ -1220,6 +1272,7 @@ void DNS_Mgr::CheckAsyncAddrRequest(const IPAddr& addr, bool timeout) { delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1229,11 +1282,11 @@ void DNS_Mgr::CheckAsyncOtherRequest(const std::string& host, bool timeout, int auto i = asyncs.find(std::make_pair(request_type, host)); if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto name = LookupOtherInCache(host, request_type, true) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(name->CheckString()); } else @@ -1242,6 +1295,7 @@ void DNS_Mgr::CheckAsyncOtherRequest(const std::string& host, bool timeout, int delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1293,26 +1347,35 @@ void DNS_Mgr::Process() { ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD); } +void DNS_Mgr::UpdateCachedStats(bool force) { + double now = util::current_time(); + if ( force || last_cached_stats_update < now - 0.01 ) { + last_cached_stats.hosts = 0; + last_cached_stats.addresses = 0; + last_cached_stats.texts = 0; + last_cached_stats.total = all_mappings.size(); + + for ( const auto& [key, mapping] : all_mappings ) { + if ( mapping->ReqType() == T_PTR ) + last_cached_stats.addresses++; + else if ( mapping->ReqType() == T_A ) + last_cached_stats.hosts++; + else + last_cached_stats.texts++; + } + + last_cached_stats_update = now; + } +} + void DNS_Mgr::GetStats(Stats* stats) { - // TODO: can this use the telemetry framework? - stats->requests = num_requests; - stats->successful = successful; - stats->failed = failed; + stats->requests = static_cast(num_requests_metric->Value()); + stats->successful = static_cast(successful_metric->Value()); + stats->failed = static_cast(failed_metric->Value()); stats->pending = asyncs_pending; - stats->cached_hosts = 0; - stats->cached_addresses = 0; - stats->cached_texts = 0; - stats->cached_total = all_mappings.size(); - - for ( const auto& [key, mapping] : all_mappings ) { - if ( mapping->ReqType() == T_PTR ) - stats->cached_addresses++; - else if ( mapping->ReqType() == T_A ) - stats->cached_hosts++; - else - stats->cached_texts++; - } + UpdateCachedStats(true); + stats->cached = last_cached_stats; } void DNS_Mgr::AsyncRequest::Resolved(const std::string& name) { diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h index 5d0f9a84b7..7e063b28a3 100644 --- a/src/DNS_Mgr.h +++ b/src/DNS_Mgr.h @@ -42,6 +42,13 @@ using TableValPtr = IntrusivePtr; using StringValPtr = IntrusivePtr; using RecordValPtr = IntrusivePtr; +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + } // namespace zeek namespace zeek::detail { @@ -198,15 +205,19 @@ public: */ bool Save(); + struct CachedStats { + unsigned long hosts; + unsigned long addresses; + unsigned long texts; + unsigned long total; + }; + struct Stats { unsigned long requests; // These count only async requests. unsigned long successful; unsigned long failed; unsigned long pending; - unsigned long cached_hosts; - unsigned long cached_addresses; - unsigned long cached_texts; - unsigned long cached_total; + CachedStats cached; }; /** @@ -285,6 +296,8 @@ protected: const char* Tag() override { return "DNS_Mgr"; } double GetNextTimeout() override; + void UpdateCachedStats(bool force); + DNS_MgrMode mode; MappingMap all_mappings; @@ -293,7 +306,6 @@ protected: std::string dir; // directory in which cache_name resides bool did_init = false; - int asyncs_pending = 0; RecordTypePtr dm_rec; @@ -327,9 +339,19 @@ protected: using QueuedList = std::list; QueuedList asyncs_queued; - unsigned long num_requests = 0; - unsigned long successful = 0; - unsigned long failed = 0; + telemetry::CounterPtr num_requests_metric; + telemetry::CounterPtr successful_metric; + telemetry::CounterPtr failed_metric; + telemetry::GaugePtr asyncs_pending_metric; + + telemetry::GaugePtr cached_hosts_metric; + telemetry::GaugePtr cached_addresses_metric; + telemetry::GaugePtr cached_texts_metric; + + double last_cached_stats_update = 0; + CachedStats last_cached_stats; + + int asyncs_pending = 0; std::set socket_fds; std::set write_socket_fds; diff --git a/src/Stats.cc b/src/Stats.cc index fb7766eaac..3742c42528 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -173,7 +173,7 @@ void ProfileLogger::Log() { util::fmt("%.06f DNS_Mgr: requests=%lu successful=%lu failed=%lu pending=%lu " "cached_hosts=%lu cached_addrs=%lu\n", run_state::network_time, dstats.requests, dstats.successful, dstats.failed, dstats.pending, - dstats.cached_hosts, dstats.cached_addresses)); + dstats.cached.hosts, dstats.cached.addresses)); trigger::Manager::Stats tstats; trigger_mgr->GetStats(&tstats); diff --git a/src/stats.bif b/src/stats.bif index 50dcc5685f..4763eedea2 100644 --- a/src/stats.bif +++ b/src/stats.bif @@ -252,10 +252,10 @@ function get_dns_stats%(%): DNSStats r->Assign(n++, static_cast(dstats.successful)); r->Assign(n++, static_cast(dstats.failed)); r->Assign(n++, static_cast(dstats.pending)); - r->Assign(n++, static_cast(dstats.cached_hosts)); - r->Assign(n++, static_cast(dstats.cached_addresses)); - r->Assign(n++, static_cast(dstats.cached_texts)); - r->Assign(n++, static_cast(dstats.cached_total)); + r->Assign(n++, static_cast(dstats.cached.hosts)); + r->Assign(n++, static_cast(dstats.cached.addresses)); + r->Assign(n++, static_cast(dstats.cached.texts)); + r->Assign(n++, static_cast(dstats.cached.total)); return std::move(r); %} From a81f6ab9a6a0b4f176080198d1c49ff44c70d2ed Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Wed, 29 May 2024 17:28:11 -0700 Subject: [PATCH 04/11] Add extra metrics to session_mgr - Sessions killed by activity - Current number of sessions across all types --- src/Stats.cc | 2 +- src/session/Manager.cc | 16 +++++++++++++--- src/session/Manager.h | 11 ++++++++++- src/stats.bif | 2 +- .../scripts.base.frameworks.telemetry.basic/out | 3 ++- .../telemetry.log.filtered | 2 ++ 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/Stats.cc b/src/Stats.cc index 3742c42528..d223002294 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -119,7 +119,7 @@ void ProfileLogger::Log() { // TODO: This previously output the number of connections, but now that we're storing // sessions as well as connections, this might need to be renamed. - file->Write(util::fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%u\n", run_state::network_time, + file->Write(util::fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%zu\n", run_state::network_time, Connection::TotalConnections(), Connection::CurrentConnections(), session_mgr->CurrentSessions())); diff --git a/src/session/Manager.cc b/src/session/Manager.cc index 8f2fbc6881..d33182a705 100644 --- a/src/session/Manager.cc +++ b/src/session/Manager.cc @@ -46,9 +46,9 @@ public: ProtocolMap::iterator InitCounters(const std::string& protocol) { auto active_family = - telemetry_mgr->GaugeFamily("zeek", "active-sessions", {"protocol"}, "Active Zeek Sessions"); + telemetry_mgr->GaugeFamily("zeek", "active_sessions", {"protocol"}, "Active Zeek Sessions"); auto total_family = - telemetry_mgr->CounterFamily("zeek", "total-sessions", {"protocol"}, "Total number of sessions"); + telemetry_mgr->CounterFamily("zeek", "total_sessions", {"protocol"}, "Total number of sessions"); auto [it, inserted] = entries.insert({protocol, Protocol{active_family, total_family, protocol}}); @@ -75,7 +75,17 @@ private: } // namespace detail -Manager::Manager() { stats = new detail::ProtocolStats(); } +Manager::Manager() { + stats = new detail::ProtocolStats(); + ended_sessions_metric_family = telemetry_mgr->CounterFamily("zeek", "ended_sessions", {"reason"}, + "Number of sessions ended for specific reasons"); + ended_by_inactivity_metric = + ended_sessions_metric_family->GetOrAdd({{"reason", "inactivity"}}, []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.counter.value = static_cast(zeek::detail::killed_by_inactivity); + return metric; + }); +} Manager::~Manager() { Clear(); diff --git a/src/session/Manager.h b/src/session/Manager.h index 6bbb128d95..eb02e87498 100644 --- a/src/session/Manager.h +++ b/src/session/Manager.h @@ -13,6 +13,13 @@ namespace zeek { +namespace telemetry { +class CounterFamily; +using CounterFamilyPtr = std::shared_ptr; +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + namespace detail { class PacketFilter; } @@ -82,7 +89,7 @@ public: void Weird(const char* name, const Packet* pkt, const char* addl = "", const char* source = ""); void Weird(const char* name, const IP_Hdr* ip, const char* addl = ""); - unsigned int CurrentSessions() { return session_map.size(); } + size_t CurrentSessions() { return session_map.size(); } private: using SessionMap = std::unordered_map; @@ -96,6 +103,8 @@ private: SessionMap session_map; detail::ProtocolStats* stats; + telemetry::CounterFamilyPtr ended_sessions_metric_family; + telemetry::CounterPtr ended_by_inactivity_metric; }; } // namespace session diff --git a/src/stats.bif b/src/stats.bif index 4763eedea2..295cb7e9d1 100644 --- a/src/stats.bif +++ b/src/stats.bif @@ -83,7 +83,7 @@ function get_conn_stats%(%): ConnStats r->Assign(n++, Connection::TotalConnections()); r->Assign(n++, Connection::CurrentConnections()); - r->Assign(n++, session_mgr->CurrentSessions()); + r->Assign(n++, static_cast(session_mgr->CurrentSessions())); session::Stats s; if ( session_mgr ) diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out b/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out index d35b64e3d4..fdde7d52ff 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out @@ -1,5 +1,6 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -### zeek_session_metrics |2| +### zeek_session_metrics |3| +Telemetry::COUNTER, zeek, zeek_ended_sessions_total, [reason], [inactivity], 0.0 Telemetry::COUNTER, zeek, zeek_total_sessions_total, [protocol], [tcp], 500.0 Telemetry::GAUGE, zeek, zeek_active_sessions, [protocol], [tcp], 500.0 ### bt* metrics |5| diff --git a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered index c7b26a1f28..b5c04c3f44 100644 --- a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered +++ b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered @@ -1,5 +1,7 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +XXXXXXXXXX.XXXXXX zeek counter zeek_ended_sessions_total reason inactivity 0.0 XXXXXXXXXX.XXXXXX zeek counter zeek_total_sessions_total protocol tcp 1.0 XXXXXXXXXX.XXXXXX zeek gauge zeek_active_sessions protocol tcp 1.0 +XXXXXXXXXX.XXXXXX zeek counter zeek_ended_sessions_total reason inactivity 0.0 XXXXXXXXXX.XXXXXX zeek counter zeek_total_sessions_total protocol tcp 500.0 XXXXXXXXXX.XXXXXX zeek gauge zeek_active_sessions protocol tcp 500.0 From 4face43462704e43b1121eb43106701b988233fc Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 30 May 2024 11:26:59 -0700 Subject: [PATCH 05/11] Move thread manager stats to telemetry metric --- src/Stats.cc | 2 +- src/stats.bif | 2 +- src/threading/Manager.cc | 12 ++++++ src/threading/Manager.h | 14 ++++++- src/threading/MsgThread.cc | 77 ++++++++++++++++++++++++++++++++++++++ src/zeek-setup.cc | 1 + 6 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/Stats.cc b/src/Stats.cc index d223002294..7b776b0138 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -188,7 +188,7 @@ void ProfileLogger::Log() { timer_type_to_string(static_cast(i)), current_timers[i])); } - file->Write(util::fmt("%0.6f Threads: current=%d\n", run_state::network_time, thread_mgr->NumThreads())); + file->Write(util::fmt("%0.6f Threads: current=%zu\n", run_state::network_time, thread_mgr->NumThreads())); const threading::Manager::msg_stats_list& thread_stats = thread_mgr->GetMsgThreadStats(); for ( threading::Manager::msg_stats_list::const_iterator i = thread_stats.begin(); i != thread_stats.end(); ++i ) { diff --git a/src/stats.bif b/src/stats.bif index 295cb7e9d1..5d410bc9bf 100644 --- a/src/stats.bif +++ b/src/stats.bif @@ -337,7 +337,7 @@ function get_thread_stats%(%): ThreadStats auto r = zeek::make_intrusive(ThreadStats); int n = 0; - r->Assign(n++, zeek::thread_mgr->NumThreads()); + r->Assign(n++, static_cast(zeek::thread_mgr->NumThreads())); return std::move(r); %} diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 5620a0bf80..36a4ced939 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -8,6 +8,7 @@ #include "zeek/NetVar.h" #include "zeek/RunState.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" namespace zeek::threading { namespace detail { @@ -36,6 +37,17 @@ Manager::~Manager() { Terminate(); } +void Manager::InitPostScript() { + num_threads_metric = + telemetry_mgr->GaugeInstance("zeek", "active_threads", {}, "Number of active threads", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + thread_mgr ? static_cast(thread_mgr->all_threads.size()) : 0.0; + return metric; + }); +} + void Manager::Terminate() { DBG_LOG(DBG_THREADING, "Terminating thread manager ..."); terminating = true; diff --git a/src/threading/Manager.h b/src/threading/Manager.h index b075e6a70d..306063081f 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -7,6 +7,11 @@ #include "zeek/threading/MsgThread.h" namespace zeek { + +namespace telemetry { +class Gauge; +} + namespace threading { namespace detail { @@ -46,6 +51,12 @@ public: */ ~Manager(); + /** + * Performs initialization that can only happen after script parsing has + * completed. + */ + void InitPostScript(); + /** * Terminates the manager's processor. The method signals all threads * to terminates and wait for them to do so. It then joins them and @@ -77,7 +88,7 @@ public: * threads that are not yet joined, including any potentially in * Terminating() state. */ - int NumThreads() const { return all_threads.size(); } + size_t NumThreads() const { return all_threads.size(); } /** * Signals a specific threads to terminate immediately. @@ -151,6 +162,7 @@ private: msg_stats_list stats; bool heartbeat_timer_running = false; + std::shared_ptr num_threads_metric; }; } // namespace threading diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 022a8ce2b4..c33c9d5bd7 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -229,6 +229,83 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp // Register IOSource as non-counting lifetime managed IO source. iosource_mgr->Register(io_source, true); + + cnt_sent_in_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_in", {{"thread_name", Name()}}, + "Number of messages sent into thread"); + cnt_sent_out_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_out", {{"thread_name", Name()}}, + "Number of messages sent from thread"); + pending_in_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, + "Number of pending messages sent into thread", "", + [this]() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(queue_in.Size()); + return metric; + }); + pending_out_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, + "Number of pending messages sent from thread", "", + [this]() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(queue_out.Size()); + return metric; + }); + + static auto get_queue_in_stats = [this]() -> const Queue::Stats { + double now = util::current_time(); + if ( this->queue_in_stats_last_updated < now - 0.01 ) { + queue_in.GetStats(&queue_in_last_stats); + this->queue_in_stats_last_updated = now; + } + + return queue_in_last_stats; + }; + + queue_in_num_reads_metric = + telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_reads", {{"thread_name", Name()}}, + "Number of reads from msg thread input queue", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + auto stats = get_queue_in_stats(); + metric.gauge.value = static_cast(stats.num_reads); + return metric; + }); + queue_in_num_writes_metric = + telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_writes", {{"thread_name", Name()}}, + "Number of writes from msg thread input queue", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + auto stats = get_queue_in_stats(); + metric.gauge.value = static_cast(stats.num_writes); + return metric; + }); + + static auto get_queue_out_stats = [this]() -> const Queue::Stats { + double now = util::current_time(); + if ( this->queue_out_stats_last_updated < now - 0.01 ) { + queue_out.GetStats(&queue_out_last_stats); + this->queue_out_stats_last_updated = now; + } + + return queue_out_last_stats; + }; + + queue_out_num_reads_metric = + telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_reads", {{"thread_name", Name()}}, + "Number of reads from msg thread input queue", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + auto stats = get_queue_out_stats(); + metric.gauge.value = static_cast(stats.num_reads); + return metric; + }); + queue_out_num_writes_metric = + telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_writes", {{"thread_name", Name()}}, + "Number of writes from msg thread input queue", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + auto stats = get_queue_out_stats(); + metric.gauge.value = static_cast(stats.num_writes); + return metric; + }); } MsgThread::~MsgThread() { diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 3f5549b78a..c97458984c 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -802,6 +802,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) { RecordType::InitPostScript(); telemetry_mgr->InitPostScript(); + thread_mgr->InitPostScript(); iosource_mgr->InitPostScript(); log_mgr->InitPostScript(); plugin_mgr->InitPostScript(); From 44860676a281815c44c0b7423634f0dc2aa44778 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 30 May 2024 15:22:35 -0700 Subject: [PATCH 06/11] Add timer counts as telemetry metrics --- src/Timer.cc | 29 +++++++++++++++++++++++++++++ src/Timer.h | 17 +++++++++++++---- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/Timer.cc b/src/Timer.cc index ef2763071a..191d9c9e6e 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -10,6 +10,7 @@ #include "zeek/broker/Manager.h" #include "zeek/iosource/Manager.h" #include "zeek/iosource/PktSrc.h" +#include "zeek/telemetry/Manager.h" #include "zeek/util.h" namespace zeek::detail { @@ -97,6 +98,34 @@ void TimerMgr::InitPostScript() { iosource_mgr->Register(this, true); dispatch_all_expired = zeek::detail::max_timer_expires == 0; + + cumulative_num_metric = telemetry_mgr->CounterInstance("zeek", "timers", {}, "Cumulative number of timers", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.counter.value = + static_cast(timer_mgr->CumulativeNum()); + return metric; + }); + + lag_time_metric = telemetry_mgr->GaugeInstance("zeek", "timers_lag_time", {}, + "Lag between current network time and last expired timer", "seconds", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + run_state::network_time - timer_mgr->last_timestamp; + return metric; + }); + + std::shared_ptr family = + telemetry_mgr->GaugeFamily("zeek", "timers_pending", {"type"}, "Number of timers for a certain type"); + for ( int i = 0; i < NUM_TIMER_TYPES; i++ ) { + current_timer_metrics[i] = family->GetOrAdd({{"type", timer_type_to_string(static_cast(i))}}, + [i]() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = TimerMgr::CurrentTimers()[i]; + return metric; + }); + } } void TimerMgr::Add(Timer* timer) { diff --git a/src/Timer.h b/src/Timer.h index ff535f64e1..45ba50314e 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -10,7 +10,14 @@ namespace zeek { class ODesc; -} + +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry +} // namespace zeek namespace zeek::detail { @@ -153,10 +160,12 @@ private: // for the max_timer_expires=0 case. bool dispatch_all_expired = false; - size_t peak_size = 0; - size_t cumulative_num = 0; - static unsigned int current_timers[NUM_TIMER_TYPES]; + + telemetry::CounterPtr cumulative_num_metric; + telemetry::GaugePtr lag_time_metric; + telemetry::GaugePtr current_timer_metrics[NUM_TIMER_TYPES]; + std::unique_ptr q; }; From 206f5cd52282a0f049b00bd78618998946da42b4 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 6 Jun 2024 14:51:09 -0700 Subject: [PATCH 07/11] Move broker statistics to be telemetry metrics --- src/broker/Manager.cc | 63 ++++++++++++++++++++++++++++++++++++------- src/broker/Manager.h | 17 ++++++++++-- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 015f53d10e..dec03afb8e 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -340,6 +340,43 @@ void Manager::InitPostScript() { bstate->subscriber.add_topic(broker::topic::store_events(), true); InitializeBrokerStoreForwarding(); + + num_peers_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->peer_count); + return metric; + }); + + num_stores_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->data_stores.size()); + return metric; + }); + + num_pending_queries_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries", + "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->pending_queries.size()); + return metric; + }); + + num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {}, + "Total number of incoming events via broker"); + num_events_outgoing_metric = telemetry_mgr->CounterInstance("zeek", "broker_outgoing_events", {}, + "Total number of outgoing events via broker"); + num_logs_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_logs", {}, "Total number of incoming logs via broker"); + num_logs_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_logs", {}, "Total number of outgoing logs via broker"); + num_ids_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_ids", {}, "Total number of incoming ids via broker"); + num_ids_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_ids", {}, "Total number of outgoing ids via broker"); } void Manager::InitializeBrokerStoreForwarding() { @@ -528,7 +565,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts)); bstate->endpoint.publish(std::move(topic), ev.move_data()); - ++statistics.num_events_outgoing; + num_events_outgoing_metric->Inc(); return true; } @@ -588,7 +625,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) { broker::zeek::IdentifierUpdate msg(std::move(id), std::move(data.value_)); DBG_LOG(DBG_BROKER, "Publishing id-update: %s", RenderMessage(topic, msg.as_data()).c_str()); bstate->endpoint.publish(std::move(topic), msg.move_data()); - ++statistics.num_ids_outgoing; + num_ids_outgoing_metric->Inc(); return true; } @@ -715,8 +752,10 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int ++lb.message_count; lb.msgs[topic].add(std::move(msg)); - if ( lb.message_count >= log_batch_size ) - statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); + if ( lb.message_count >= log_batch_size ) { + auto outgoing_logs = static_cast(lb.Flush(bstate->endpoint, log_batch_size)); + num_logs_outgoing_metric->Inc(outgoing_logs); + } return true; } @@ -746,7 +785,8 @@ size_t Manager::FlushLogBuffers() { for ( auto& lb : log_buffers ) rval += lb.Flush(bstate->endpoint, log_batch_size); - statistics.num_logs_outgoing += rval; + num_logs_outgoing_metric->Inc(rval); + return rval; } @@ -1141,7 +1181,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { ts = run_state::network_time; DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str()); - ++statistics.num_events_incoming; + num_events_incoming_metric->Inc(); auto handler = event_registry->Lookup(name); if ( ! handler ) @@ -1286,7 +1326,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { return false; } - ++statistics.num_logs_incoming; + num_logs_incoming_metric->Inc(); auto&& stream_id_name = lw.stream_id().name; // Get stream ID. @@ -1352,7 +1392,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i return false; } - ++statistics.num_ids_incoming; + num_ids_incoming_metric->Inc(); auto id_name = c_str_safe(iu.id_name()); auto id_value = convert_if_broker_variant_or_move(iu.id_value()); const auto& id = zeek::detail::global_scope()->Find(id_name); @@ -1706,7 +1746,12 @@ const Stats& Manager::GetStatistics() { statistics.num_stores = data_stores.size(); statistics.num_pending_queries = pending_queries.size(); - // The other attributes are set as activity happens. + statistics.num_events_incoming = static_cast(num_events_incoming_metric->Value()); + statistics.num_events_outgoing = static_cast(num_events_outgoing_metric->Value()); + statistics.num_logs_incoming = static_cast(num_logs_incoming_metric->Value()); + statistics.num_logs_outgoing = static_cast(num_logs_outgoing_metric->Value()); + statistics.num_ids_incoming = static_cast(num_ids_incoming_metric->Value()); + statistics.num_ids_outgoing = static_cast(num_ids_outgoing_metric->Value()); return statistics; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index d373d8883e..43bd00fbc9 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -27,8 +27,11 @@ using VectorTypePtr = IntrusivePtr; using TableValPtr = IntrusivePtr; namespace telemetry { -class Manager; -} +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry namespace detail { class Frame; @@ -451,6 +454,16 @@ private: std::string zeek_table_db_directory; static int script_scope; + + telemetry::GaugePtr num_peers_metric; + telemetry::GaugePtr num_stores_metric; + telemetry::GaugePtr num_pending_queries_metric; + telemetry::CounterPtr num_events_incoming_metric; + telemetry::CounterPtr num_events_outgoing_metric; + telemetry::CounterPtr num_logs_incoming_metric; + telemetry::CounterPtr num_logs_outgoing_metric; + telemetry::CounterPtr num_ids_incoming_metric; + telemetry::CounterPtr num_ids_outgoing_metric; }; } // namespace Broker From a6843067e9157ad0a7ca6dd1abb6d969cc770e31 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 5 Aug 2024 13:06:49 -0700 Subject: [PATCH 08/11] Split cpu time metric into user/system components like prof.log The total can be calculated from the two parts via Prometheus/Grafana if desired, so it's more informative to pass them as separate parts. --- src/telemetry/Manager.cc | 34 +++++++++++++++------------------- src/telemetry/Manager.h | 3 ++- src/telemetry/ProcessStats.cc | 16 ++++++++++------ src/telemetry/ProcessStats.h | 3 ++- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 6ae4e06ec6..6bfcdc71a9 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -137,13 +137,21 @@ void Manager::InitPostScript() { return metric; }); - cpu_gauge = GaugeInstance("process", "cpu", {}, "Total user and system CPU time spent", "seconds", - []() -> prometheus::ClientMetric { - auto* s = get_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = s->cpu; - return metric; - }); + cpu_user_counter = CounterInstance("process", "cpu_user", {}, "Total user CPU time spent", "seconds", + []() -> prometheus::ClientMetric { + auto* s = get_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = s->cpu_user; + return metric; + }); + + cpu_system_counter = CounterInstance("process", "cpu_system", {}, "Total system CPU time spent", "seconds", + []() -> prometheus::ClientMetric { + auto* s = get_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = s->cpu_system; + return metric; + }); fds_gauge = GaugeInstance("process", "open_fds", {}, "Number of open file descriptors", "", []() -> prometheus::ClientMetric { @@ -623,18 +631,6 @@ void Manager::WaitForPrometheusCallbacks() { using namespace std::literals; using namespace zeek::telemetry; -namespace { - -template -auto toVector(zeek::Span xs) { - std::vector> result; - for ( auto&& x : xs ) - result.emplace_back(x); - return result; -} - -} // namespace - SCENARIO("telemetry managers provide access to counter families") { GIVEN("a telemetry manager") { Manager mgr; diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index d967fe43c0..26647b7cf7 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -263,7 +263,8 @@ private: GaugePtr rss_gauge; GaugePtr vms_gauge; - GaugePtr cpu_gauge; + CounterPtr cpu_user_counter; + CounterPtr cpu_system_counter; GaugePtr fds_gauge; std::shared_ptr prometheus_registry; diff --git a/src/telemetry/ProcessStats.cc b/src/telemetry/ProcessStats.cc index f2a0447b63..476efd4487 100644 --- a/src/telemetry/ProcessStats.cc +++ b/src/telemetry/ProcessStats.cc @@ -34,10 +34,10 @@ process_stats get_process_stats() { if ( task_info(mach_task_self(), TASK_THREAD_TIMES_INFO, reinterpret_cast(&info), &count) == KERN_SUCCESS ) { // Round to milliseconds. - result.cpu += info.user_time.seconds; - result.cpu += ceil(info.user_time.microseconds / 1000.0) / 1000.0; - result.cpu += info.system_time.seconds; - result.cpu += ceil(info.system_time.microseconds / 1000.0) / 1000.0; + result.cpu_user += info.user_time.seconds; + result.cpu_user += ceil(info.user_time.microseconds / 1000.0) / 1000.0; + result.cpu_system += info.system_time.seconds; + result.cpu_system += ceil(info.system_time.microseconds / 1000.0) / 1000.0; } } // Fetch open file handles. @@ -154,7 +154,8 @@ process_stats get_process_stats() { result.rss = rss_pages * page_size; result.vms = vmsize_bytes; - result.cpu = static_cast(utime_ticks + stime_ticks) / ticks_per_second; + result.cpu_user = static_cast(utime_ticks) / ticks_per_second; + result.cpu_system = static_cast(stime_ticks) / ticks_per_second; result.fds = count_entries_in_directory("/proc/self/fd"); } @@ -187,7 +188,10 @@ process_stats get_process_stats() { if ( kp ) { result.vms = kp->ki_size; result.rss = kp->ki_rssize * getpagesize(); - result.cpu = static_cast(kp->ki_runtime) / 1000000.0; + result.cpu_user = static_cast(kp->ki_rusage.ru_utime.tv_sec) + + (static_cast(kp->ki_rusage.ru_utime.tv_usec) / 1e6); + result.cpu_system = static_cast(kp->ki_rusage.ru_stime.tv_sec) + + (static_cast(kp->ki_rusage.ru_stime.tv_usec) / 1e6); struct procstat* procstat = procstat_open_sysctl(); struct filestat_list* files = procstat_getfiles(procstat, kp, 0); diff --git a/src/telemetry/ProcessStats.h b/src/telemetry/ProcessStats.h index 02581362cc..d79bb2cb5f 100644 --- a/src/telemetry/ProcessStats.h +++ b/src/telemetry/ProcessStats.h @@ -9,7 +9,8 @@ namespace zeek::telemetry::detail { struct process_stats { int64_t rss = 0; int64_t vms = 0; - double cpu = 0.0; + double cpu_user = 0.0; + double cpu_system = 0.0; int64_t fds = 0; }; From 73f71e652d0da37f016652983225a789c1c81e69 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Mon, 5 Aug 2024 13:06:55 -0700 Subject: [PATCH 09/11] Make telemetry metrics out of MsgThread statistics --- src/telemetry/Counter.cc | 2 +- src/telemetry/Counter.h | 1 + src/telemetry/Gauge.cc | 2 +- src/telemetry/Gauge.h | 1 + src/threading/Manager.cc | 121 ++++++++++++++++++++++++++++++++++++- src/threading/Manager.h | 32 +++++++++- src/threading/MsgThread.cc | 78 +----------------------- 7 files changed, 155 insertions(+), 82 deletions(-) diff --git a/src/telemetry/Counter.cc b/src/telemetry/Counter.cc index 8b34624254..ef3679329e 100644 --- a/src/telemetry/Counter.cc +++ b/src/telemetry/Counter.cc @@ -3,7 +3,7 @@ using namespace zeek::telemetry; Counter::Counter(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Counter.h b/src/telemetry/Counter.h index f6c49315b7..a0186ad219 100644 --- a/src/telemetry/Counter.h +++ b/src/telemetry/Counter.h @@ -56,6 +56,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/telemetry/Gauge.cc b/src/telemetry/Gauge.cc index 273c9a57bf..114ada3811 100644 --- a/src/telemetry/Gauge.cc +++ b/src/telemetry/Gauge.cc @@ -15,7 +15,7 @@ double Gauge::Value() const noexcept { Gauge::Gauge(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Gauge.h b/src/telemetry/Gauge.h index 900cb7b784..652ff72667 100644 --- a/src/telemetry/Gauge.h +++ b/src/telemetry/Gauge.h @@ -74,6 +74,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 36a4ced939..f97427b08c 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -23,6 +23,9 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) { } // namespace detail +static std::vector pending_bucket_brackets = {1, 10, 100, + 1000, 10000, std::numeric_limits::infinity()}; + Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); @@ -38,14 +41,128 @@ Manager::~Manager() { } void Manager::InitPostScript() { + static auto get_message_thread_stats = []() -> const BucketedMessages* { + if ( ! thread_mgr->terminating ) { + double now = util::current_time(); + if ( thread_mgr->bucketed_messages_last_updated < now - 1 ) { + thread_mgr->current_bucketed_messages.pending_in_total = 0; + thread_mgr->current_bucketed_messages.pending_out_total = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_in ) + m.second = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_out ) + m.second = 0; + + MsgThread::Stats thread_stats; + for ( const auto& t : thread_mgr->msg_threads ) { + t->GetStats(&thread_stats); + + thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; + thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; + thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; + thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; + + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_in < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; + break; + } + } + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_out < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; + break; + } + } + } + + thread_mgr->bucketed_messages_last_updated = now; + } + } + + return &thread_mgr->current_bucketed_messages; + }; + num_threads_metric = - telemetry_mgr->GaugeInstance("zeek", "active_threads", {}, "Number of active threads", "", + telemetry_mgr->GaugeInstance("zeek", "msgthread_active_threads", {}, "Number of active threads", "", []() -> prometheus::ClientMetric { prometheus::ClientMetric metric; metric.gauge.value = thread_mgr ? static_cast(thread_mgr->all_threads.size()) : 0.0; return metric; }); + + total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_in_total); + return metric; + }); + + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_out_total); + return metric; + }); + + pending_messages_in_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", + "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_in_total); + return metric; + }); + pending_messages_out_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_out_messages", {}, + "Pending number of outbound messages", "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_out_total); + return metric; + }); + + pending_message_in_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, + "Number of threads with pending inbound messages split into buckets"); + pending_message_out_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, + "Number of threads with pending outbound messages split into buckets"); + + for ( auto upper_limit : pending_bucket_brackets ) { + std::string upper_limit_str; + if ( upper_limit == std::numeric_limits::infinity() ) + upper_limit_str = "inf"; + else + upper_limit_str = std::to_string(upper_limit); + + current_bucketed_messages.pending_in[upper_limit] = 0; + current_bucketed_messages.pending_out[upper_limit] = 0; + + pending_message_in_buckets[upper_limit] = + pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_in.at(upper_limit)); + return metric; + }); + pending_message_out_buckets[upper_limit] = + pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_out.at(upper_limit)); + return metric; + }); + } } void Manager::Terminate() { @@ -90,6 +207,8 @@ void Manager::AddThread(BasicThread* thread) { if ( ! heartbeat_timer_running ) StartHeartbeatTimer(); + + total_threads_metric->Inc(); } void Manager::AddMsgThread(MsgThread* thread) { diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 306063081f..3b4763a497 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "zeek/Timer.h" @@ -10,7 +11,12 @@ namespace zeek { namespace telemetry { class Gauge; -} +using GaugePtr = std::shared_ptr; +class GaugeFamily; +using GaugeFamilyPtr = std::shared_ptr; +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry namespace threading { namespace detail { @@ -162,7 +168,29 @@ private: msg_stats_list stats; bool heartbeat_timer_running = false; - std::shared_ptr num_threads_metric; + telemetry::GaugePtr num_threads_metric; + telemetry::CounterPtr total_threads_metric; + telemetry::CounterPtr total_messages_in_metric; + telemetry::CounterPtr total_messages_out_metric; + telemetry::GaugePtr pending_messages_in_metric; + telemetry::GaugePtr pending_messages_out_metric; + + telemetry::GaugeFamilyPtr pending_message_in_buckets_fam; + telemetry::GaugeFamilyPtr pending_message_out_buckets_fam; + std::map pending_message_in_buckets; + std::map pending_message_out_buckets; + + struct BucketedMessages { + uint64_t sent_in_total; + uint64_t sent_out_total; + uint64_t pending_in_total; + uint64_t pending_out_total; + std::map pending_in; + std::map pending_out; + }; + + BucketedMessages current_bucketed_messages; + double bucketed_messages_last_updated = 0.0; }; } // namespace threading diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index c33c9d5bd7..6854f5f188 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -9,6 +9,7 @@ #include "zeek/Obj.h" #include "zeek/RunState.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" #include "zeek/threading/Manager.h" // Set by Zeek's main signal handler. @@ -229,83 +230,6 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp // Register IOSource as non-counting lifetime managed IO source. iosource_mgr->Register(io_source, true); - - cnt_sent_in_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_in", {{"thread_name", Name()}}, - "Number of messages sent into thread"); - cnt_sent_out_metric = telemetry_mgr->CounterInstance("zeek", "msg_thread_msgs_sent_out", {{"thread_name", Name()}}, - "Number of messages sent from thread"); - pending_in_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, - "Number of pending messages sent into thread", "", - [this]() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(queue_in.Size()); - return metric; - }); - pending_out_metric = telemetry_mgr->GaugeInstance("zeek", "msg_thread_msgs_pending_in", {{"thread_name", Name()}}, - "Number of pending messages sent from thread", "", - [this]() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - metric.gauge.value = static_cast(queue_out.Size()); - return metric; - }); - - static auto get_queue_in_stats = [this]() -> const Queue::Stats { - double now = util::current_time(); - if ( this->queue_in_stats_last_updated < now - 0.01 ) { - queue_in.GetStats(&queue_in_last_stats); - this->queue_in_stats_last_updated = now; - } - - return queue_in_last_stats; - }; - - queue_in_num_reads_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_reads", {{"thread_name", Name()}}, - "Number of reads from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_in_stats(); - metric.gauge.value = static_cast(stats.num_reads); - return metric; - }); - queue_in_num_writes_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_in_writes", {{"thread_name", Name()}}, - "Number of writes from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_in_stats(); - metric.gauge.value = static_cast(stats.num_writes); - return metric; - }); - - static auto get_queue_out_stats = [this]() -> const Queue::Stats { - double now = util::current_time(); - if ( this->queue_out_stats_last_updated < now - 0.01 ) { - queue_out.GetStats(&queue_out_last_stats); - this->queue_out_stats_last_updated = now; - } - - return queue_out_last_stats; - }; - - queue_out_num_reads_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_reads", {{"thread_name", Name()}}, - "Number of reads from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_out_stats(); - metric.gauge.value = static_cast(stats.num_reads); - return metric; - }); - queue_out_num_writes_metric = - telemetry_mgr->CounterInstance("zeek", "msg_thread_queue_out_writes", {{"thread_name", Name()}}, - "Number of writes from msg thread input queue", "", - []() -> prometheus::ClientMetric { - prometheus::ClientMetric metric; - auto stats = get_queue_out_stats(); - metric.gauge.value = static_cast(stats.num_writes); - return metric; - }); } MsgThread::~MsgThread() { From 7a1eb78b672183ee49a25ef9bb74027b42ff168b Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 2 Aug 2024 16:58:57 -0700 Subject: [PATCH 10/11] Avoid capturing 'this' for callback in telemetry::Manager --- src/telemetry/Manager.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 6bfcdc71a9..69b4ed485a 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -112,14 +112,14 @@ void Manager::InitPostScript() { } #ifdef HAVE_PROCESS_STAT_METRICS - static auto get_stats = [this]() -> const detail::process_stats* { + static auto get_stats = []() -> const detail::process_stats* { double now = util::current_time(); - if ( this->process_stats_last_updated < now - 0.01 ) { - this->current_process_stats = detail::get_process_stats(); - this->process_stats_last_updated = now; + if ( telemetry_mgr->process_stats_last_updated < now - 0.01 ) { + telemetry_mgr->current_process_stats = detail::get_process_stats(); + telemetry_mgr->process_stats_last_updated = now; } - return &this->current_process_stats; + return &telemetry_mgr->current_process_stats; }; rss_gauge = GaugeInstance("process", "resident_memory", {}, "Resident memory size", "bytes", []() -> prometheus::ClientMetric { From 1325e16a0d30f3a514b5d229302c2294090c73da Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Wed, 10 Jul 2024 15:07:46 -0700 Subject: [PATCH 11/11] Remove some unnecessary #includes --- src/Timer.cc | 1 - src/threading/Manager.cc | 1 - src/threading/MsgThread.h | 2 -- 3 files changed, 4 deletions(-) diff --git a/src/Timer.cc b/src/Timer.cc index 191d9c9e6e..f348731c73 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -9,7 +9,6 @@ #include "zeek/RunState.h" #include "zeek/broker/Manager.h" #include "zeek/iosource/Manager.h" -#include "zeek/iosource/PktSrc.h" #include "zeek/telemetry/Manager.h" #include "zeek/util.h" diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index f97427b08c..5499cbb8ed 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -7,7 +7,6 @@ #include "zeek/IPAddr.h" #include "zeek/NetVar.h" #include "zeek/RunState.h" -#include "zeek/iosource/Manager.h" #include "zeek/telemetry/Manager.h" namespace zeek::threading { diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 259e64b11f..0bb3dbaec4 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -3,8 +3,6 @@ #include #include "zeek/DebugLogger.h" -#include "zeek/Flare.h" -#include "zeek/iosource/IOSource.h" #include "zeek/threading/BasicThread.h" #include "zeek/threading/Queue.h"