From f011e7e6676daf745fe62ead578fdeabe5bff371 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 6 Jun 2025 15:05:14 +0200 Subject: [PATCH] broker: Hook up generic cluster telemetry --- src/broker/Manager.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 06cbb431be..67204893f7 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -34,6 +34,7 @@ #include "zeek/broker/comm.bif.h" #include "zeek/broker/messaging.bif.h" #include "zeek/broker/store.bif.h" +#include "zeek/cluster/Telemetry.h" #include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" @@ -838,6 +839,9 @@ bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& e auto& ev = maybe_ev.value(); + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnOutgoingEvent(topic, event.HandlerName(), cluster::detail::SerializationInfo{size}); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(topic, ev.move_data()); num_events_outgoing_metric->Inc(); @@ -859,6 +863,10 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, } broker::zeek::Event ev(name, args, meta); + + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnOutgoingEvent(topic, name, cluster::detail::SerializationInfo{size}); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(std::move(topic), ev.move_data()); num_events_outgoing_metric->Inc(); @@ -1583,6 +1591,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), meta ? meta->size() : 0, RenderMessage(args).c_str()); num_events_incoming_metric->Inc(); + size_t size = ev.as_data().shared_envelope()->raw_bytes().second; + Telemetry().OnIncomingEvent(topic, name, cluster::detail::SerializationInfo{size}); auto handler = event_registry->Lookup(name); if ( ! handler )