From e1f70164e0d6a273924b2fba312821a6e6c1c86b Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 26 May 2025 16:35:50 +0200 Subject: [PATCH] broker/Manager: Fetch and forward all metadata from events Also use the generic metadata version for publishing, keep the ts-based API for now, but only add timestamps when EventMetadata::add_network_timestamp is T. I'm not sure what the right way forward here is, maybe deprecating Broker's publish event variations and funneling through cluster. --- src/broker/Manager.cc | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 2e36509a2f..48e3a50396 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,8 @@ #include "zeek/telemetry/Manager.h" #include "zeek/util.h" +#include "const.bif.netvar_h" + using namespace std; namespace { @@ -849,8 +852,15 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, if ( peer_count == 0 && hub_count == 0 ) return true; - broker::zeek::Event ev(name, args, broker::to_timestamp(ts)); - DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, ev.args()).c_str()); + broker::vector meta; + if ( BifConst::EventMetadata::add_network_timestamp ) { + broker::vector entry{static_cast(zeek::detail::MetadataType::NetworkTimestamp), + broker::to_timestamp(ts)}; + meta.emplace_back(std::move(entry)); + } + + broker::zeek::Event ev(name, args, meta); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(std::move(topic), ev.move_data()); num_events_outgoing_metric->Inc(); return true; @@ -1571,15 +1581,10 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { auto&& name = ev.name(); auto&& args = ev.args(); - double ts; + auto meta = cluster::detail::metadata_vector_from_broker_event(ev); - if ( auto ev_ts = ev.ts() ) - broker::convert(*ev_ts, ts); - else - // Default to current network time, if the received event did not contain a timestamp. - ts = run_state::network_time; - - DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", std::string{name}.c_str(), ts, RenderMessage(args).c_str()); + DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), + meta ? meta->size() : 0, RenderMessage(args).c_str()); num_events_incoming_metric->Inc(); auto handler = event_registry->Lookup(name); @@ -1655,13 +1660,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { } } - if ( vl.size() == args.size() ) { - zeek::detail::EventMetadataVectorPtr meta; - if ( ts >= 0.0 ) - meta = zeek::detail::MakeEventMetadataVector(ts); - + if ( vl.size() == args.size() ) event_mgr.Enqueue(std::move(meta), handler, std::move(vl), util::detail::SOURCE_BROKER); - } } bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {