diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc index a707d8ead0..33da7dd0ae 100644 --- a/src/DNS_Mgr.cc +++ b/src/DNS_Mgr.cc @@ -45,6 +45,7 @@ using ztd::out_ptr::out_ptr; #include "zeek/Val.h" #include "zeek/ZeekString.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" // Number of seconds we'll wait for a reply. constexpr int DNS_TIMEOUT = 5; @@ -545,6 +546,55 @@ void DNS_Mgr::InitSource() { } void DNS_Mgr::InitPostScript() { + num_requests_metric = + telemetry_mgr->CounterInstance("zeek", "dnsmgr_requests", {}, "Total number of requests through DNS_Mgr"); + successful_metric = telemetry_mgr->CounterInstance("zeek", "dnsmgr_successful_requests", {}, + "Total number of successful requests through DNS_Mgr"); + failed_metric = telemetry_mgr->CounterInstance("zeek", "dnsmgr_failed_requests", {}, + "Total number of failed requests through DNS_Mgr"); + asyncs_pending_metric = telemetry_mgr->GaugeInstance("zeek", "dnsmgr_pending_asyncs_requests", {}, + "Number of pending async requests through DNS_Mgr"); + + cached_hosts_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "host"}}, + "Number of cached hosts in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = static_cast(dns_mgr->last_cached_stats.hosts); + } + return metric; + }); + + cached_addresses_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "address"}}, + "Number of cached addresses in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = + static_cast(dns_mgr->last_cached_stats.addresses); + } + return metric; + }); + + cached_texts_metric = + telemetry_mgr->GaugeInstance("zeek", "dnsmgr_cache_entries", {{"type", "text"}}, + "Number of cached texts in DNS_Mgr", "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = 0; + + if ( dns_mgr ) { + dns_mgr->UpdateCachedStats(false); + metric.gauge.value = static_cast(dns_mgr->last_cached_stats.texts); + } + return metric; + }); + if ( ! doctest::is_running_in_test ) { dm_rec = id::find_type("dns_mapping"); @@ -1158,7 +1208,7 @@ void DNS_Mgr::IssueAsyncRequests() { AsyncRequest* req = asyncs_queued.front(); asyncs_queued.pop_front(); - ++num_requests; + num_requests_metric->Inc(); req->time = util::current_time(); if ( req->type == T_PTR ) @@ -1173,6 +1223,7 @@ void DNS_Mgr::IssueAsyncRequests() { dns_req->MakeRequest(channel, this); ++asyncs_pending; + asyncs_pending_metric->Inc(); } } @@ -1182,11 +1233,11 @@ void DNS_Mgr::CheckAsyncHostRequest(const std::string& host, bool timeout) { if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto addrs = LookupNameInCache(host, true, false) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(addrs); } else @@ -1195,6 +1246,7 @@ void DNS_Mgr::CheckAsyncHostRequest(const std::string& host, bool timeout) { delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1207,11 +1259,11 @@ void DNS_Mgr::CheckAsyncAddrRequest(const IPAddr& addr, bool timeout) { if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto name = LookupAddrInCache(addr, true, false) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(name->CheckString()); } else @@ -1220,6 +1272,7 @@ void DNS_Mgr::CheckAsyncAddrRequest(const IPAddr& addr, bool timeout) { delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1229,11 +1282,11 @@ void DNS_Mgr::CheckAsyncOtherRequest(const std::string& host, bool timeout, int auto i = asyncs.find(std::make_pair(request_type, host)); if ( i != asyncs.end() ) { if ( timeout ) { - ++failed; + failed_metric->Inc(); i->second->Timeout(); } else if ( auto name = LookupOtherInCache(host, request_type, true) ) { - ++successful; + successful_metric->Inc(); i->second->Resolved(name->CheckString()); } else @@ -1242,6 +1295,7 @@ void DNS_Mgr::CheckAsyncOtherRequest(const std::string& host, bool timeout, int delete i->second; asyncs.erase(i); --asyncs_pending; + asyncs_pending_metric->Dec(); } } @@ -1293,26 +1347,35 @@ void DNS_Mgr::Process() { ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD); } +void DNS_Mgr::UpdateCachedStats(bool force) { + double now = util::current_time(); + if ( force || last_cached_stats_update < now - 0.01 ) { + last_cached_stats.hosts = 0; + last_cached_stats.addresses = 0; + last_cached_stats.texts = 0; + last_cached_stats.total = all_mappings.size(); + + for ( const auto& [key, mapping] : all_mappings ) { + if ( mapping->ReqType() == T_PTR ) + last_cached_stats.addresses++; + else if ( mapping->ReqType() == T_A ) + last_cached_stats.hosts++; + else + last_cached_stats.texts++; + } + + last_cached_stats_update = now; + } +} + void DNS_Mgr::GetStats(Stats* stats) { - // TODO: can this use the telemetry framework? - stats->requests = num_requests; - stats->successful = successful; - stats->failed = failed; + stats->requests = static_cast(num_requests_metric->Value()); + stats->successful = static_cast(successful_metric->Value()); + stats->failed = static_cast(failed_metric->Value()); stats->pending = asyncs_pending; - stats->cached_hosts = 0; - stats->cached_addresses = 0; - stats->cached_texts = 0; - stats->cached_total = all_mappings.size(); - - for ( const auto& [key, mapping] : all_mappings ) { - if ( mapping->ReqType() == T_PTR ) - stats->cached_addresses++; - else if ( mapping->ReqType() == T_A ) - stats->cached_hosts++; - else - stats->cached_texts++; - } + UpdateCachedStats(true); + stats->cached = last_cached_stats; } void DNS_Mgr::AsyncRequest::Resolved(const std::string& name) { diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h index 5d0f9a84b7..7e063b28a3 100644 --- a/src/DNS_Mgr.h +++ b/src/DNS_Mgr.h @@ -42,6 +42,13 @@ using TableValPtr = IntrusivePtr; using StringValPtr = IntrusivePtr; using RecordValPtr = IntrusivePtr; +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + } // namespace zeek namespace zeek::detail { @@ -198,15 +205,19 @@ public: */ bool Save(); + struct CachedStats { + unsigned long hosts; + unsigned long addresses; + unsigned long texts; + unsigned long total; + }; + struct Stats { unsigned long requests; // These count only async requests. unsigned long successful; unsigned long failed; unsigned long pending; - unsigned long cached_hosts; - unsigned long cached_addresses; - unsigned long cached_texts; - unsigned long cached_total; + CachedStats cached; }; /** @@ -285,6 +296,8 @@ protected: const char* Tag() override { return "DNS_Mgr"; } double GetNextTimeout() override; + void UpdateCachedStats(bool force); + DNS_MgrMode mode; MappingMap all_mappings; @@ -293,7 +306,6 @@ protected: std::string dir; // directory in which cache_name resides bool did_init = false; - int asyncs_pending = 0; RecordTypePtr dm_rec; @@ -327,9 +339,19 @@ protected: using QueuedList = std::list; QueuedList asyncs_queued; - unsigned long num_requests = 0; - unsigned long successful = 0; - unsigned long failed = 0; + telemetry::CounterPtr num_requests_metric; + telemetry::CounterPtr successful_metric; + telemetry::CounterPtr failed_metric; + telemetry::GaugePtr asyncs_pending_metric; + + telemetry::GaugePtr cached_hosts_metric; + telemetry::GaugePtr cached_addresses_metric; + telemetry::GaugePtr cached_texts_metric; + + double last_cached_stats_update = 0; + CachedStats last_cached_stats; + + int asyncs_pending = 0; std::set socket_fds; std::set write_socket_fds; diff --git a/src/Stats.cc b/src/Stats.cc index 7f49585121..7b776b0138 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -119,7 +119,7 @@ void ProfileLogger::Log() { // TODO: This previously output the number of connections, but now that we're storing // sessions as well as connections, this might need to be renamed. - file->Write(util::fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%u\n", run_state::network_time, + file->Write(util::fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%zu\n", run_state::network_time, Connection::TotalConnections(), Connection::CurrentConnections(), session_mgr->CurrentSessions())); @@ -173,7 +173,7 @@ void ProfileLogger::Log() { util::fmt("%.06f DNS_Mgr: requests=%lu successful=%lu failed=%lu pending=%lu " "cached_hosts=%lu cached_addrs=%lu\n", run_state::network_time, dstats.requests, dstats.successful, dstats.failed, dstats.pending, - dstats.cached_hosts, dstats.cached_addresses)); + dstats.cached.hosts, dstats.cached.addresses)); trigger::Manager::Stats tstats; trigger_mgr->GetStats(&tstats); @@ -188,7 +188,7 @@ void ProfileLogger::Log() { timer_type_to_string(static_cast(i)), current_timers[i])); } - file->Write(util::fmt("%0.6f Threads: current=%d\n", run_state::network_time, thread_mgr->NumThreads())); + file->Write(util::fmt("%0.6f Threads: current=%zu\n", run_state::network_time, thread_mgr->NumThreads())); const threading::Manager::msg_stats_list& thread_stats = thread_mgr->GetMsgThreadStats(); for ( threading::Manager::msg_stats_list::const_iterator i = thread_stats.begin(); i != thread_stats.end(); ++i ) { @@ -213,14 +213,12 @@ void ProfileLogger::Log() { cs.num_events_outgoing, cs.num_logs_incoming, cs.num_logs_outgoing, cs.num_ids_incoming, cs.num_ids_outgoing)); - // Script-level state. - const auto& globals = global_scope()->Vars(); - if ( expensive ) { + // Script-level state. int total_table_entries = 0; int total_table_rentries = 0; - for ( const auto& global : globals ) { + for ( const auto& global : global_scope()->Vars() ) { auto& id = global.second; // We don't show/count internal globals as they are always diff --git a/src/Timer.cc b/src/Timer.cc index ef2763071a..f348731c73 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -9,7 +9,7 @@ #include "zeek/RunState.h" #include "zeek/broker/Manager.h" #include "zeek/iosource/Manager.h" -#include "zeek/iosource/PktSrc.h" +#include "zeek/telemetry/Manager.h" #include "zeek/util.h" namespace zeek::detail { @@ -97,6 +97,34 @@ void TimerMgr::InitPostScript() { iosource_mgr->Register(this, true); dispatch_all_expired = zeek::detail::max_timer_expires == 0; + + cumulative_num_metric = telemetry_mgr->CounterInstance("zeek", "timers", {}, "Cumulative number of timers", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.counter.value = + static_cast(timer_mgr->CumulativeNum()); + return metric; + }); + + lag_time_metric = telemetry_mgr->GaugeInstance("zeek", "timers_lag_time", {}, + "Lag between current network time and last expired timer", "seconds", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + run_state::network_time - timer_mgr->last_timestamp; + return metric; + }); + + std::shared_ptr family = + telemetry_mgr->GaugeFamily("zeek", "timers_pending", {"type"}, "Number of timers for a certain type"); + for ( int i = 0; i < NUM_TIMER_TYPES; i++ ) { + current_timer_metrics[i] = family->GetOrAdd({{"type", timer_type_to_string(static_cast(i))}}, + [i]() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = TimerMgr::CurrentTimers()[i]; + return metric; + }); + } } void TimerMgr::Add(Timer* timer) { diff --git a/src/Timer.h b/src/Timer.h index ff535f64e1..45ba50314e 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -10,7 +10,14 @@ namespace zeek { class ODesc; -} + +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry +} // namespace zeek namespace zeek::detail { @@ -153,10 +160,12 @@ private: // for the max_timer_expires=0 case. bool dispatch_all_expired = false; - size_t peak_size = 0; - size_t cumulative_num = 0; - static unsigned int current_timers[NUM_TIMER_TYPES]; + + telemetry::CounterPtr cumulative_num_metric; + telemetry::GaugePtr lag_time_metric; + telemetry::GaugePtr current_timer_metrics[NUM_TIMER_TYPES]; + std::unique_ptr q; }; diff --git a/src/Trigger.cc b/src/Trigger.cc index dae99bab37..3dfe76e2dc 100644 --- a/src/Trigger.cc +++ b/src/Trigger.cc @@ -13,6 +13,7 @@ #include "zeek/Traverse.h" #include "zeek/Val.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" using namespace zeek::detail; using namespace zeek::detail::trigger; @@ -437,7 +438,19 @@ Manager::Manager() : iosource::IOSource() { pending = new TriggerList(); } Manager::~Manager() { delete pending; } -void Manager::InitPostScript() { iosource_mgr->Register(this, true); } +void Manager::InitPostScript() { + trigger_count = telemetry_mgr->CounterInstance("zeek", "triggers", {}, "Total number of triggers scheduled"); + trigger_pending = + telemetry_mgr->GaugeInstance("zeek", "pending_triggers", {}, "Pending number of triggers", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + trigger_mgr ? static_cast(trigger_mgr->pending->size()) : 0.0; + return metric; + }); + + iosource_mgr->Register(this, true); +} double Manager::GetNextTimeout() { return pending->empty() ? -1 : run_state::network_time + 0.100; } @@ -468,13 +481,13 @@ void Manager::Queue(Trigger* trigger) { if ( std::find(pending->begin(), pending->end(), trigger) == pending->end() ) { Ref(trigger); pending->push_back(trigger); - total_triggers++; + trigger_count->Inc(); iosource_mgr->Wakeup(Tag()); } } void Manager::GetStats(Stats* stats) { - stats->total = total_triggers; + stats->total = static_cast(trigger_count->Value()); stats->pending = pending->size(); } diff --git a/src/Trigger.h b/src/Trigger.h index 6c4fade3be..5955cc25b4 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -18,6 +18,13 @@ class Val; using ValPtr = IntrusivePtr; +namespace telemetry { +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + namespace detail { class Frame; @@ -187,7 +194,8 @@ public: private: using TriggerList = std::list; TriggerList* pending; - unsigned long total_triggers = 0; + telemetry::CounterPtr trigger_count; + telemetry::GaugePtr trigger_pending; }; } // namespace trigger diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 015f53d10e..dec03afb8e 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -340,6 +340,43 @@ void Manager::InitPostScript() { bstate->subscriber.add_topic(broker::topic::store_events(), true); InitializeBrokerStoreForwarding(); + + num_peers_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->peer_count); + return metric; + }); + + num_stores_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->data_stores.size()); + return metric; + }); + + num_pending_queries_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries", + "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->pending_queries.size()); + return metric; + }); + + num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {}, + "Total number of incoming events via broker"); + num_events_outgoing_metric = telemetry_mgr->CounterInstance("zeek", "broker_outgoing_events", {}, + "Total number of outgoing events via broker"); + num_logs_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_logs", {}, "Total number of incoming logs via broker"); + num_logs_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_logs", {}, "Total number of outgoing logs via broker"); + num_ids_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_ids", {}, "Total number of incoming ids via broker"); + num_ids_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_ids", {}, "Total number of outgoing ids via broker"); } void Manager::InitializeBrokerStoreForwarding() { @@ -528,7 +565,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts)); bstate->endpoint.publish(std::move(topic), ev.move_data()); - ++statistics.num_events_outgoing; + num_events_outgoing_metric->Inc(); return true; } @@ -588,7 +625,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) { broker::zeek::IdentifierUpdate msg(std::move(id), std::move(data.value_)); DBG_LOG(DBG_BROKER, "Publishing id-update: %s", RenderMessage(topic, msg.as_data()).c_str()); bstate->endpoint.publish(std::move(topic), msg.move_data()); - ++statistics.num_ids_outgoing; + num_ids_outgoing_metric->Inc(); return true; } @@ -715,8 +752,10 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int ++lb.message_count; lb.msgs[topic].add(std::move(msg)); - if ( lb.message_count >= log_batch_size ) - statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); + if ( lb.message_count >= log_batch_size ) { + auto outgoing_logs = static_cast(lb.Flush(bstate->endpoint, log_batch_size)); + num_logs_outgoing_metric->Inc(outgoing_logs); + } return true; } @@ -746,7 +785,8 @@ size_t Manager::FlushLogBuffers() { for ( auto& lb : log_buffers ) rval += lb.Flush(bstate->endpoint, log_batch_size); - statistics.num_logs_outgoing += rval; + num_logs_outgoing_metric->Inc(rval); + return rval; } @@ -1141,7 +1181,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { ts = run_state::network_time; DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str()); - ++statistics.num_events_incoming; + num_events_incoming_metric->Inc(); auto handler = event_registry->Lookup(name); if ( ! handler ) @@ -1286,7 +1326,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { return false; } - ++statistics.num_logs_incoming; + num_logs_incoming_metric->Inc(); auto&& stream_id_name = lw.stream_id().name; // Get stream ID. @@ -1352,7 +1392,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i return false; } - ++statistics.num_ids_incoming; + num_ids_incoming_metric->Inc(); auto id_name = c_str_safe(iu.id_name()); auto id_value = convert_if_broker_variant_or_move(iu.id_value()); const auto& id = zeek::detail::global_scope()->Find(id_name); @@ -1706,7 +1746,12 @@ const Stats& Manager::GetStatistics() { statistics.num_stores = data_stores.size(); statistics.num_pending_queries = pending_queries.size(); - // The other attributes are set as activity happens. + statistics.num_events_incoming = static_cast(num_events_incoming_metric->Value()); + statistics.num_events_outgoing = static_cast(num_events_outgoing_metric->Value()); + statistics.num_logs_incoming = static_cast(num_logs_incoming_metric->Value()); + statistics.num_logs_outgoing = static_cast(num_logs_outgoing_metric->Value()); + statistics.num_ids_incoming = static_cast(num_ids_incoming_metric->Value()); + statistics.num_ids_outgoing = static_cast(num_ids_outgoing_metric->Value()); return statistics; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index d373d8883e..43bd00fbc9 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -27,8 +27,11 @@ using VectorTypePtr = IntrusivePtr; using TableValPtr = IntrusivePtr; namespace telemetry { -class Manager; -} +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry namespace detail { class Frame; @@ -451,6 +454,16 @@ private: std::string zeek_table_db_directory; static int script_scope; + + telemetry::GaugePtr num_peers_metric; + telemetry::GaugePtr num_stores_metric; + telemetry::GaugePtr num_pending_queries_metric; + telemetry::CounterPtr num_events_incoming_metric; + telemetry::CounterPtr num_events_outgoing_metric; + telemetry::CounterPtr num_logs_incoming_metric; + telemetry::CounterPtr num_logs_outgoing_metric; + telemetry::CounterPtr num_ids_incoming_metric; + telemetry::CounterPtr num_ids_outgoing_metric; }; } // namespace Broker diff --git a/src/session/Manager.cc b/src/session/Manager.cc index 8f2fbc6881..d33182a705 100644 --- a/src/session/Manager.cc +++ b/src/session/Manager.cc @@ -46,9 +46,9 @@ public: ProtocolMap::iterator InitCounters(const std::string& protocol) { auto active_family = - telemetry_mgr->GaugeFamily("zeek", "active-sessions", {"protocol"}, "Active Zeek Sessions"); + telemetry_mgr->GaugeFamily("zeek", "active_sessions", {"protocol"}, "Active Zeek Sessions"); auto total_family = - telemetry_mgr->CounterFamily("zeek", "total-sessions", {"protocol"}, "Total number of sessions"); + telemetry_mgr->CounterFamily("zeek", "total_sessions", {"protocol"}, "Total number of sessions"); auto [it, inserted] = entries.insert({protocol, Protocol{active_family, total_family, protocol}}); @@ -75,7 +75,17 @@ private: } // namespace detail -Manager::Manager() { stats = new detail::ProtocolStats(); } +Manager::Manager() { + stats = new detail::ProtocolStats(); + ended_sessions_metric_family = telemetry_mgr->CounterFamily("zeek", "ended_sessions", {"reason"}, + "Number of sessions ended for specific reasons"); + ended_by_inactivity_metric = + ended_sessions_metric_family->GetOrAdd({{"reason", "inactivity"}}, []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.counter.value = static_cast(zeek::detail::killed_by_inactivity); + return metric; + }); +} Manager::~Manager() { Clear(); diff --git a/src/session/Manager.h b/src/session/Manager.h index 6bbb128d95..eb02e87498 100644 --- a/src/session/Manager.h +++ b/src/session/Manager.h @@ -13,6 +13,13 @@ namespace zeek { +namespace telemetry { +class CounterFamily; +using CounterFamilyPtr = std::shared_ptr; +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + namespace detail { class PacketFilter; } @@ -82,7 +89,7 @@ public: void Weird(const char* name, const Packet* pkt, const char* addl = "", const char* source = ""); void Weird(const char* name, const IP_Hdr* ip, const char* addl = ""); - unsigned int CurrentSessions() { return session_map.size(); } + size_t CurrentSessions() { return session_map.size(); } private: using SessionMap = std::unordered_map; @@ -96,6 +103,8 @@ private: SessionMap session_map; detail::ProtocolStats* stats; + telemetry::CounterFamilyPtr ended_sessions_metric_family; + telemetry::CounterPtr ended_by_inactivity_metric; }; } // namespace session diff --git a/src/stats.bif b/src/stats.bif index 50dcc5685f..5d410bc9bf 100644 --- a/src/stats.bif +++ b/src/stats.bif @@ -83,7 +83,7 @@ function get_conn_stats%(%): ConnStats r->Assign(n++, Connection::TotalConnections()); r->Assign(n++, Connection::CurrentConnections()); - r->Assign(n++, session_mgr->CurrentSessions()); + r->Assign(n++, static_cast(session_mgr->CurrentSessions())); session::Stats s; if ( session_mgr ) @@ -252,10 +252,10 @@ function get_dns_stats%(%): DNSStats r->Assign(n++, static_cast(dstats.successful)); r->Assign(n++, static_cast(dstats.failed)); r->Assign(n++, static_cast(dstats.pending)); - r->Assign(n++, static_cast(dstats.cached_hosts)); - r->Assign(n++, static_cast(dstats.cached_addresses)); - r->Assign(n++, static_cast(dstats.cached_texts)); - r->Assign(n++, static_cast(dstats.cached_total)); + r->Assign(n++, static_cast(dstats.cached.hosts)); + r->Assign(n++, static_cast(dstats.cached.addresses)); + r->Assign(n++, static_cast(dstats.cached.texts)); + r->Assign(n++, static_cast(dstats.cached.total)); return std::move(r); %} @@ -337,7 +337,7 @@ function get_thread_stats%(%): ThreadStats auto r = zeek::make_intrusive(ThreadStats); int n = 0; - r->Assign(n++, zeek::thread_mgr->NumThreads()); + r->Assign(n++, static_cast(zeek::thread_mgr->NumThreads())); return std::move(r); %} diff --git a/src/telemetry/Counter.cc b/src/telemetry/Counter.cc index 8b34624254..ef3679329e 100644 --- a/src/telemetry/Counter.cc +++ b/src/telemetry/Counter.cc @@ -3,7 +3,7 @@ using namespace zeek::telemetry; Counter::Counter(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Counter.h b/src/telemetry/Counter.h index f6c49315b7..a0186ad219 100644 --- a/src/telemetry/Counter.h +++ b/src/telemetry/Counter.h @@ -56,6 +56,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/telemetry/Gauge.cc b/src/telemetry/Gauge.cc index 273c9a57bf..114ada3811 100644 --- a/src/telemetry/Gauge.cc +++ b/src/telemetry/Gauge.cc @@ -15,7 +15,7 @@ double Gauge::Value() const noexcept { Gauge::Gauge(FamilyType* family, const prometheus::Labels& labels, prometheus::CollectCallbackPtr callback) noexcept - : handle(family->Add(labels)), labels(labels) { + : family(family), handle(family->Add(labels)), labels(labels) { if ( callback ) { handle.AddCollectCallback(std::move(callback)); has_callback = true; diff --git a/src/telemetry/Gauge.h b/src/telemetry/Gauge.h index 900cb7b784..652ff72667 100644 --- a/src/telemetry/Gauge.h +++ b/src/telemetry/Gauge.h @@ -74,6 +74,7 @@ public: bool CompareLabels(const prometheus::Labels& lbls) const { return labels == lbls; } private: + FamilyType* family = nullptr; Handle& handle; prometheus::Labels labels; bool has_callback = false; diff --git a/src/telemetry/Manager.cc b/src/telemetry/Manager.cc index 6ae4e06ec6..69b4ed485a 100644 --- a/src/telemetry/Manager.cc +++ b/src/telemetry/Manager.cc @@ -112,14 +112,14 @@ void Manager::InitPostScript() { } #ifdef HAVE_PROCESS_STAT_METRICS - static auto get_stats = [this]() -> const detail::process_stats* { + static auto get_stats = []() -> const detail::process_stats* { double now = util::current_time(); - if ( this->process_stats_last_updated < now - 0.01 ) { - this->current_process_stats = detail::get_process_stats(); - this->process_stats_last_updated = now; + if ( telemetry_mgr->process_stats_last_updated < now - 0.01 ) { + telemetry_mgr->current_process_stats = detail::get_process_stats(); + telemetry_mgr->process_stats_last_updated = now; } - return &this->current_process_stats; + return &telemetry_mgr->current_process_stats; }; rss_gauge = GaugeInstance("process", "resident_memory", {}, "Resident memory size", "bytes", []() -> prometheus::ClientMetric { @@ -137,13 +137,21 @@ void Manager::InitPostScript() { return metric; }); - cpu_gauge = GaugeInstance("process", "cpu", {}, "Total user and system CPU time spent", "seconds", - []() -> prometheus::ClientMetric { - auto* s = get_stats(); - prometheus::ClientMetric metric; - metric.gauge.value = s->cpu; - return metric; - }); + cpu_user_counter = CounterInstance("process", "cpu_user", {}, "Total user CPU time spent", "seconds", + []() -> prometheus::ClientMetric { + auto* s = get_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = s->cpu_user; + return metric; + }); + + cpu_system_counter = CounterInstance("process", "cpu_system", {}, "Total system CPU time spent", "seconds", + []() -> prometheus::ClientMetric { + auto* s = get_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = s->cpu_system; + return metric; + }); fds_gauge = GaugeInstance("process", "open_fds", {}, "Number of open file descriptors", "", []() -> prometheus::ClientMetric { @@ -623,18 +631,6 @@ void Manager::WaitForPrometheusCallbacks() { using namespace std::literals; using namespace zeek::telemetry; -namespace { - -template -auto toVector(zeek::Span xs) { - std::vector> result; - for ( auto&& x : xs ) - result.emplace_back(x); - return result; -} - -} // namespace - SCENARIO("telemetry managers provide access to counter families") { GIVEN("a telemetry manager") { Manager mgr; diff --git a/src/telemetry/Manager.h b/src/telemetry/Manager.h index d967fe43c0..26647b7cf7 100644 --- a/src/telemetry/Manager.h +++ b/src/telemetry/Manager.h @@ -263,7 +263,8 @@ private: GaugePtr rss_gauge; GaugePtr vms_gauge; - GaugePtr cpu_gauge; + CounterPtr cpu_user_counter; + CounterPtr cpu_system_counter; GaugePtr fds_gauge; std::shared_ptr prometheus_registry; diff --git a/src/telemetry/ProcessStats.cc b/src/telemetry/ProcessStats.cc index f2a0447b63..476efd4487 100644 --- a/src/telemetry/ProcessStats.cc +++ b/src/telemetry/ProcessStats.cc @@ -34,10 +34,10 @@ process_stats get_process_stats() { if ( task_info(mach_task_self(), TASK_THREAD_TIMES_INFO, reinterpret_cast(&info), &count) == KERN_SUCCESS ) { // Round to milliseconds. - result.cpu += info.user_time.seconds; - result.cpu += ceil(info.user_time.microseconds / 1000.0) / 1000.0; - result.cpu += info.system_time.seconds; - result.cpu += ceil(info.system_time.microseconds / 1000.0) / 1000.0; + result.cpu_user += info.user_time.seconds; + result.cpu_user += ceil(info.user_time.microseconds / 1000.0) / 1000.0; + result.cpu_system += info.system_time.seconds; + result.cpu_system += ceil(info.system_time.microseconds / 1000.0) / 1000.0; } } // Fetch open file handles. @@ -154,7 +154,8 @@ process_stats get_process_stats() { result.rss = rss_pages * page_size; result.vms = vmsize_bytes; - result.cpu = static_cast(utime_ticks + stime_ticks) / ticks_per_second; + result.cpu_user = static_cast(utime_ticks) / ticks_per_second; + result.cpu_system = static_cast(stime_ticks) / ticks_per_second; result.fds = count_entries_in_directory("/proc/self/fd"); } @@ -187,7 +188,10 @@ process_stats get_process_stats() { if ( kp ) { result.vms = kp->ki_size; result.rss = kp->ki_rssize * getpagesize(); - result.cpu = static_cast(kp->ki_runtime) / 1000000.0; + result.cpu_user = static_cast(kp->ki_rusage.ru_utime.tv_sec) + + (static_cast(kp->ki_rusage.ru_utime.tv_usec) / 1e6); + result.cpu_system = static_cast(kp->ki_rusage.ru_stime.tv_sec) + + (static_cast(kp->ki_rusage.ru_stime.tv_usec) / 1e6); struct procstat* procstat = procstat_open_sysctl(); struct filestat_list* files = procstat_getfiles(procstat, kp, 0); diff --git a/src/telemetry/ProcessStats.h b/src/telemetry/ProcessStats.h index 02581362cc..d79bb2cb5f 100644 --- a/src/telemetry/ProcessStats.h +++ b/src/telemetry/ProcessStats.h @@ -9,7 +9,8 @@ namespace zeek::telemetry::detail { struct process_stats { int64_t rss = 0; int64_t vms = 0; - double cpu = 0.0; + double cpu_user = 0.0; + double cpu_system = 0.0; int64_t fds = 0; }; diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 5620a0bf80..5499cbb8ed 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -7,7 +7,7 @@ #include "zeek/IPAddr.h" #include "zeek/NetVar.h" #include "zeek/RunState.h" -#include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" namespace zeek::threading { namespace detail { @@ -22,6 +22,9 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire) { } // namespace detail +static std::vector pending_bucket_brackets = {1, 10, 100, + 1000, 10000, std::numeric_limits::infinity()}; + Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); @@ -36,6 +39,131 @@ Manager::~Manager() { Terminate(); } +void Manager::InitPostScript() { + static auto get_message_thread_stats = []() -> const BucketedMessages* { + if ( ! thread_mgr->terminating ) { + double now = util::current_time(); + if ( thread_mgr->bucketed_messages_last_updated < now - 1 ) { + thread_mgr->current_bucketed_messages.pending_in_total = 0; + thread_mgr->current_bucketed_messages.pending_out_total = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_in ) + m.second = 0; + for ( auto& m : thread_mgr->current_bucketed_messages.pending_out ) + m.second = 0; + + MsgThread::Stats thread_stats; + for ( const auto& t : thread_mgr->msg_threads ) { + t->GetStats(&thread_stats); + + thread_mgr->current_bucketed_messages.sent_in_total += thread_stats.sent_in; + thread_mgr->current_bucketed_messages.sent_out_total += thread_stats.sent_out; + thread_mgr->current_bucketed_messages.pending_in_total += thread_stats.pending_in; + thread_mgr->current_bucketed_messages.pending_out_total += thread_stats.pending_out; + + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_in < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_in[upper_limit]++; + break; + } + } + for ( auto upper_limit : pending_bucket_brackets ) { + if ( thread_stats.pending_out < upper_limit ) { + thread_mgr->current_bucketed_messages.pending_out[upper_limit]++; + break; + } + } + } + + thread_mgr->bucketed_messages_last_updated = now; + } + } + + return &thread_mgr->current_bucketed_messages; + }; + + num_threads_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_active_threads", {}, "Number of active threads", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = + thread_mgr ? static_cast(thread_mgr->all_threads.size()) : 0.0; + return metric; + }); + + total_threads_metric = telemetry_mgr->CounterInstance("zeek", "msgthread_threads", {}, "Total number of threads"); + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_in_messages", {}, "Number of inbound messages received", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_in_total); + return metric; + }); + + total_messages_in_metric = + telemetry_mgr->CounterInstance("zeek", "msgthread_out_messages", {}, "Number of outbound messages sent", "", + []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->sent_out_total); + return metric; + }); + + pending_messages_in_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_in_messages", {}, "Pending number of inbound messages", + "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_in_total); + return metric; + }); + pending_messages_out_metric = + telemetry_mgr->GaugeInstance("zeek", "msgthread_pending_out_messages", {}, + "Pending number of outbound messages", "", []() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(s->pending_out_total); + return metric; + }); + + pending_message_in_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_in_buckets", {"lt"}, + "Number of threads with pending inbound messages split into buckets"); + pending_message_out_buckets_fam = + telemetry_mgr->GaugeFamily("zeek", "msgthread_pending_messages_out_buckets", {"lt"}, + "Number of threads with pending outbound messages split into buckets"); + + for ( auto upper_limit : pending_bucket_brackets ) { + std::string upper_limit_str; + if ( upper_limit == std::numeric_limits::infinity() ) + upper_limit_str = "inf"; + else + upper_limit_str = std::to_string(upper_limit); + + current_bucketed_messages.pending_in[upper_limit] = 0; + current_bucketed_messages.pending_out[upper_limit] = 0; + + pending_message_in_buckets[upper_limit] = + pending_message_in_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_in.at(upper_limit)); + return metric; + }); + pending_message_out_buckets[upper_limit] = + pending_message_out_buckets_fam->GetOrAdd({{"lt", upper_limit_str}}, + [upper_limit]() -> prometheus::ClientMetric { + auto* s = get_message_thread_stats(); + prometheus::ClientMetric metric; + metric.gauge.value = + static_cast(s->pending_out.at(upper_limit)); + return metric; + }); + } +} + void Manager::Terminate() { DBG_LOG(DBG_THREADING, "Terminating thread manager ..."); terminating = true; @@ -78,6 +206,8 @@ void Manager::AddThread(BasicThread* thread) { if ( ! heartbeat_timer_running ) StartHeartbeatTimer(); + + total_threads_metric->Inc(); } void Manager::AddMsgThread(MsgThread* thread) { diff --git a/src/threading/Manager.h b/src/threading/Manager.h index b075e6a70d..3b4763a497 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -1,12 +1,23 @@ #pragma once #include +#include #include #include "zeek/Timer.h" #include "zeek/threading/MsgThread.h" namespace zeek { + +namespace telemetry { +class Gauge; +using GaugePtr = std::shared_ptr; +class GaugeFamily; +using GaugeFamilyPtr = std::shared_ptr; +class Counter; +using CounterPtr = std::shared_ptr; +} // namespace telemetry + namespace threading { namespace detail { @@ -46,6 +57,12 @@ public: */ ~Manager(); + /** + * Performs initialization that can only happen after script parsing has + * completed. + */ + void InitPostScript(); + /** * Terminates the manager's processor. The method signals all threads * to terminates and wait for them to do so. It then joins them and @@ -77,7 +94,7 @@ public: * threads that are not yet joined, including any potentially in * Terminating() state. */ - int NumThreads() const { return all_threads.size(); } + size_t NumThreads() const { return all_threads.size(); } /** * Signals a specific threads to terminate immediately. @@ -151,6 +168,29 @@ private: msg_stats_list stats; bool heartbeat_timer_running = false; + telemetry::GaugePtr num_threads_metric; + telemetry::CounterPtr total_threads_metric; + telemetry::CounterPtr total_messages_in_metric; + telemetry::CounterPtr total_messages_out_metric; + telemetry::GaugePtr pending_messages_in_metric; + telemetry::GaugePtr pending_messages_out_metric; + + telemetry::GaugeFamilyPtr pending_message_in_buckets_fam; + telemetry::GaugeFamilyPtr pending_message_out_buckets_fam; + std::map pending_message_in_buckets; + std::map pending_message_out_buckets; + + struct BucketedMessages { + uint64_t sent_in_total; + uint64_t sent_out_total; + uint64_t pending_in_total; + uint64_t pending_out_total; + std::map pending_in; + std::map pending_out; + }; + + BucketedMessages current_bucketed_messages; + double bucketed_messages_last_updated = 0.0; }; } // namespace threading diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 022a8ce2b4..6854f5f188 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -9,6 +9,7 @@ #include "zeek/Obj.h" #include "zeek/RunState.h" #include "zeek/iosource/Manager.h" +#include "zeek/telemetry/Manager.h" #include "zeek/threading/Manager.h" // Set by Zeek's main signal handler. diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 259e64b11f..0bb3dbaec4 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -3,8 +3,6 @@ #include #include "zeek/DebugLogger.h" -#include "zeek/Flare.h" -#include "zeek/iosource/IOSource.h" #include "zeek/threading/BasicThread.h" #include "zeek/threading/Queue.h" diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 3f5549b78a..c97458984c 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -802,6 +802,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) { RecordType::InitPostScript(); telemetry_mgr->InitPostScript(); + thread_mgr->InitPostScript(); iosource_mgr->InitPostScript(); log_mgr->InitPostScript(); plugin_mgr->InitPostScript(); diff --git a/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out b/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out index d35b64e3d4..fdde7d52ff 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out +++ b/testing/btest/Baseline/scripts.base.frameworks.telemetry.basic/out @@ -1,5 +1,6 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -### zeek_session_metrics |2| +### zeek_session_metrics |3| +Telemetry::COUNTER, zeek, zeek_ended_sessions_total, [reason], [inactivity], 0.0 Telemetry::COUNTER, zeek, zeek_total_sessions_total, [protocol], [tcp], 500.0 Telemetry::GAUGE, zeek, zeek_active_sessions, [protocol], [tcp], 500.0 ### bt* metrics |5| diff --git a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered index c7b26a1f28..b5c04c3f44 100644 --- a/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered +++ b/testing/btest/Baseline/scripts.policy.frameworks.telemetry.log/telemetry.log.filtered @@ -1,5 +1,7 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +XXXXXXXXXX.XXXXXX zeek counter zeek_ended_sessions_total reason inactivity 0.0 XXXXXXXXXX.XXXXXX zeek counter zeek_total_sessions_total protocol tcp 1.0 XXXXXXXXXX.XXXXXX zeek gauge zeek_active_sessions protocol tcp 1.0 +XXXXXXXXXX.XXXXXX zeek counter zeek_ended_sessions_total reason inactivity 0.0 XXXXXXXXXX.XXXXXX zeek counter zeek_total_sessions_total protocol tcp 500.0 XXXXXXXXXX.XXXXXX zeek gauge zeek_active_sessions protocol tcp 500.0