broker/Manager: Re-use broker serializer for conversion

This commit is contained in:
Arne Welzel 2025-04-07 14:03:01 +02:00
parent 0e027fa4e3
commit c4a48baeda

View file

@ -34,6 +34,7 @@
#include "zeek/broker/data.bif.h" #include "zeek/broker/data.bif.h"
#include "zeek/broker/messaging.bif.h" #include "zeek/broker/messaging.bif.h"
#include "zeek/broker/store.bif.h" #include "zeek/broker/store.bif.h"
#include "zeek/cluster/serializer/broker/Serializer.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/logging/Types.h" #include "zeek/logging/Types.h"
@ -624,28 +625,16 @@ std::vector<broker::peer_info> Manager::Peers() const {
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
broker::vector xs; auto maybe_ev = zeek::cluster::detail::to_broker_event(event);
xs.reserve(event.args.size()); if ( ! maybe_ev )
return false;
for ( const auto& a : event.args ) { auto& ev = maybe_ev.value();
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) {
// When encountering a Broker::Data instance within args, pick out
// the broker::data directly to avoid double encoding of the record.
const auto& val = a->AsRecordVal()->GetField(0);
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
xs.emplace_back(data_val->data);
}
else if ( auto r = detail::val_to_data(a.get()) ) {
xs.emplace_back(std::move(r.value()));
}
else {
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
return false;
}
}
std::string name(event.HandlerName()); DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
return PublishEvent(topic, std::move(name), std::move(xs), event.timestamp); bstate->endpoint.publish(topic, ev.move_data());
num_events_outgoing_metric->Inc();
return true;
} }
bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) {