diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index e036f82c27..0fe9332830 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -34,6 +34,7 @@ #include "zeek/broker/data.bif.h" #include "zeek/broker/messaging.bif.h" #include "zeek/broker/store.bif.h" +#include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" #include "zeek/logging/Types.h" @@ -624,28 +625,16 @@ std::vector Manager::Peers() const { std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { - broker::vector xs; - xs.reserve(event.args.size()); + auto maybe_ev = zeek::cluster::detail::to_broker_event(event); + if ( ! maybe_ev ) + return false; - for ( const auto& a : event.args ) { - if ( a->GetType() == zeek::BifType::Record::Broker::Data ) { - // When encountering a Broker::Data instance within args, pick out - // the broker::data directly to avoid double encoding of the record. - const auto& val = a->AsRecordVal()->GetField(0); - auto* data_val = static_cast(val.get()); - xs.emplace_back(data_val->data); - } - else if ( auto r = detail::val_to_data(a.get()) ) { - xs.emplace_back(std::move(r.value())); - } - else { - Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str()); - return false; - } - } + auto& ev = maybe_ev.value(); - std::string name(event.HandlerName()); - return PublishEvent(topic, std::move(name), std::move(xs), event.timestamp); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); + bstate->endpoint.publish(topic, ev.move_data()); + num_events_outgoing_metric->Inc(); + return true; } bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) {