broker: Hook up generic cluster telemetry

This commit is contained in:
Arne Welzel 2025-06-06 15:05:14 +02:00
parent 4c34274a6c
commit f011e7e667

View file

@ -34,6 +34,7 @@
#include "zeek/broker/comm.bif.h" #include "zeek/broker/comm.bif.h"
#include "zeek/broker/messaging.bif.h" #include "zeek/broker/messaging.bif.h"
#include "zeek/broker/store.bif.h" #include "zeek/broker/store.bif.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/cluster/serializer/broker/Serializer.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
@ -838,6 +839,9 @@ bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& e
auto& ev = maybe_ev.value(); auto& ev = maybe_ev.value();
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnOutgoingEvent(topic, event.HandlerName(), cluster::detail::SerializationInfo{size});
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
bstate->endpoint.publish(topic, ev.move_data()); bstate->endpoint.publish(topic, ev.move_data());
num_events_outgoing_metric->Inc(); num_events_outgoing_metric->Inc();
@ -859,6 +863,10 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
} }
broker::zeek::Event ev(name, args, meta); broker::zeek::Event ev(name, args, meta);
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnOutgoingEvent(topic, name, cluster::detail::SerializationInfo{size});
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
bstate->endpoint.publish(std::move(topic), ev.move_data()); bstate->endpoint.publish(std::move(topic), ev.move_data());
num_events_outgoing_metric->Inc(); num_events_outgoing_metric->Inc();
@ -1583,6 +1591,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(),
meta ? meta->size() : 0, RenderMessage(args).c_str()); meta ? meta->size() : 0, RenderMessage(args).c_str());
num_events_incoming_metric->Inc(); num_events_incoming_metric->Inc();
size_t size = ev.as_data().shared_envelope()->raw_bytes().second;
Telemetry().OnIncomingEvent(topic, name, cluster::detail::SerializationInfo{size});
auto handler = event_registry->Lookup(name); auto handler = event_registry->Lookup(name);
if ( ! handler ) if ( ! handler )