From 206f5cd52282a0f049b00bd78618998946da42b4 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Thu, 6 Jun 2024 14:51:09 -0700 Subject: [PATCH] Move broker statistics to be telemetry metrics --- src/broker/Manager.cc | 63 ++++++++++++++++++++++++++++++++++++------- src/broker/Manager.h | 17 ++++++++++-- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 015f53d10e..dec03afb8e 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -340,6 +340,43 @@ void Manager::InitPostScript() { bstate->subscriber.add_topic(broker::topic::store_events(), true); InitializeBrokerStoreForwarding(); + + num_peers_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_peers", {}, "Current number of peers connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->peer_count); + return metric; + }); + + num_stores_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_stores", {}, "Current number of stores connected via broker", "", + []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->data_stores.size()); + return metric; + }); + + num_pending_queries_metric = + telemetry_mgr->GaugeInstance("zeek", "broker_pending_queries", {}, "Current number of pending broker queries", + "", []() -> prometheus::ClientMetric { + prometheus::ClientMetric metric; + metric.gauge.value = static_cast(broker_mgr->pending_queries.size()); + return metric; + }); + + num_events_incoming_metric = telemetry_mgr->CounterInstance("zeek", "broker_incoming_events", {}, + "Total number of incoming events via broker"); + num_events_outgoing_metric = telemetry_mgr->CounterInstance("zeek", "broker_outgoing_events", {}, + "Total number of outgoing events via broker"); + num_logs_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_logs", {}, "Total number of incoming logs via broker"); + num_logs_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_logs", {}, "Total number of outgoing logs via broker"); + num_ids_incoming_metric = + telemetry_mgr->CounterInstance("zeek", "broker_incoming_ids", {}, "Total number of incoming ids via broker"); + num_ids_outgoing_metric = + telemetry_mgr->CounterInstance("zeek", "broker_outgoing_ids", {}, "Total number of outgoing ids via broker"); } void Manager::InitializeBrokerStoreForwarding() { @@ -528,7 +565,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, args).c_str()); broker::zeek::Event ev(std::move(name), std::move(args), broker::to_timestamp(ts)); bstate->endpoint.publish(std::move(topic), ev.move_data()); - ++statistics.num_events_outgoing; + num_events_outgoing_metric->Inc(); return true; } @@ -588,7 +625,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id) { broker::zeek::IdentifierUpdate msg(std::move(id), std::move(data.value_)); DBG_LOG(DBG_BROKER, "Publishing id-update: %s", RenderMessage(topic, msg.as_data()).c_str()); bstate->endpoint.publish(std::move(topic), msg.move_data()); - ++statistics.num_ids_outgoing; + num_ids_outgoing_metric->Inc(); return true; } @@ -715,8 +752,10 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int ++lb.message_count; lb.msgs[topic].add(std::move(msg)); - if ( lb.message_count >= log_batch_size ) - statistics.num_logs_outgoing += lb.Flush(bstate->endpoint, log_batch_size); + if ( lb.message_count >= log_batch_size ) { + auto outgoing_logs = static_cast(lb.Flush(bstate->endpoint, log_batch_size)); + num_logs_outgoing_metric->Inc(outgoing_logs); + } return true; } @@ -746,7 +785,8 @@ size_t Manager::FlushLogBuffers() { for ( auto& lb : log_buffers ) rval += lb.Flush(bstate->endpoint, log_batch_size); - statistics.num_logs_outgoing += rval; + num_logs_outgoing_metric->Inc(rval); + return rval; } @@ -1141,7 +1181,7 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { ts = run_state::network_time; DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", c_str_safe(name).c_str(), ts, RenderMessage(args).c_str()); - ++statistics.num_events_incoming; + num_events_incoming_metric->Inc(); auto handler = event_registry->Lookup(name); if ( ! handler ) @@ -1286,7 +1326,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { return false; } - ++statistics.num_logs_incoming; + num_logs_incoming_metric->Inc(); auto&& stream_id_name = lw.stream_id().name; // Get stream ID. @@ -1352,7 +1392,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::IdentifierUpdate& i return false; } - ++statistics.num_ids_incoming; + num_ids_incoming_metric->Inc(); auto id_name = c_str_safe(iu.id_name()); auto id_value = convert_if_broker_variant_or_move(iu.id_value()); const auto& id = zeek::detail::global_scope()->Find(id_name); @@ -1706,7 +1746,12 @@ const Stats& Manager::GetStatistics() { statistics.num_stores = data_stores.size(); statistics.num_pending_queries = pending_queries.size(); - // The other attributes are set as activity happens. + statistics.num_events_incoming = static_cast(num_events_incoming_metric->Value()); + statistics.num_events_outgoing = static_cast(num_events_outgoing_metric->Value()); + statistics.num_logs_incoming = static_cast(num_logs_incoming_metric->Value()); + statistics.num_logs_outgoing = static_cast(num_logs_outgoing_metric->Value()); + statistics.num_ids_incoming = static_cast(num_ids_incoming_metric->Value()); + statistics.num_ids_outgoing = static_cast(num_ids_outgoing_metric->Value()); return statistics; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index d373d8883e..43bd00fbc9 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -27,8 +27,11 @@ using VectorTypePtr = IntrusivePtr; using TableValPtr = IntrusivePtr; namespace telemetry { -class Manager; -} +class Gauge; +class Counter; +using GaugePtr = std::shared_ptr; +using CounterPtr = std::shared_ptr; +} // namespace telemetry namespace detail { class Frame; @@ -451,6 +454,16 @@ private: std::string zeek_table_db_directory; static int script_scope; + + telemetry::GaugePtr num_peers_metric; + telemetry::GaugePtr num_stores_metric; + telemetry::GaugePtr num_pending_queries_metric; + telemetry::CounterPtr num_events_incoming_metric; + telemetry::CounterPtr num_events_outgoing_metric; + telemetry::CounterPtr num_logs_incoming_metric; + telemetry::CounterPtr num_logs_outgoing_metric; + telemetry::CounterPtr num_ids_incoming_metric; + telemetry::CounterPtr num_ids_outgoing_metric; }; } // namespace Broker