cluster/serializer/broker: Do not special case Broker::Data anymore

The previous approach ignored the fact that nested / inner values might
also be Broker::Data values. I'm not super sure about the validity of
the test, because it's essentially demonstrating any-nesting, but
it's not leading to extra Broker::Data encoding.
This commit is contained in:
Arne Welzel 2025-08-15 21:33:52 +02:00
parent 9e70d8b8ad
commit f57a1263d4
4 changed files with 24 additions and 16 deletions

View file

@ -59,20 +59,11 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const zeek::cluster::
xs.reserve(ev.Args().size()); xs.reserve(ev.Args().size());
for ( const auto& a : ev.Args() ) { for ( const auto& a : ev.Args() ) {
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) { if ( auto res = zeek::Broker::detail::val_to_data(a.get(), /*flatten_broker_dataval=*/true) )
// When encountering a Broker::Data instance within args, pick out
// the broker::data directly to avoid double encoding, Broker::Data.
const auto& val = a->AsRecordVal()->GetField(0);
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
xs.emplace_back(data_val->data);
}
else if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
xs.emplace_back(std::move(res.value())); xs.emplace_back(std::move(res.value()));
} else
else {
return std::nullopt; return std::nullopt;
} }
}
// Convert metadata from the cluster::detail::Event event to broker's event metadata format. // Convert metadata from the cluster::detail::Event event to broker's event metadata format.
broker::vector broker_meta; broker::vector broker_meta;

View file

@ -35,3 +35,10 @@ got pong, 27, with, 4, time (cluster publish), Broker::Data, [data=broker::data{
got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
sending pings, 5, R, [c=42, a=[[c=42, a=hello]]]
got pong, 31, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got pong, 32, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got pong, 33, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got pong, 34, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got pong, 35, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got pong, 36, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}]

View file

@ -14,4 +14,7 @@ got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}]
got finish! got finish!

View file

@ -34,9 +34,14 @@ global pong: event(c: count, what: string, val: any) &is_used;
global i = 0; global i = 0;
global pongs = 0; global pongs = 0;
type R: record {
c: count;
a: any;
};
event send_any() event send_any()
{ {
if ( i > 4 ) if ( i > 5 )
return; return;
local val: any; local val: any;
@ -48,8 +53,10 @@ event send_any()
val = 42/tcp; val = 42/tcp;
else if ( i == 3 ) else if ( i == 3 )
val = vector(1, 2, 3); val = vector(1, 2, 3);
else else if ( i == 4 )
val = double_to_time(42.0); val = double_to_time(42.0);
else
val = R($c=42, $a=vector(R($c=42, $a="hello")));
print "sending pings", i, type_name(val), val; print "sending pings", i, type_name(val), val;
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val); Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val);
@ -64,10 +71,10 @@ event pong(c: count, what: string, val: any)
++pongs; ++pongs;
print "got pong", pongs, "with", c, what, type_name(val), val; print "got pong", pongs, "with", c, what, type_name(val), val;
# The manager sends 5 types of pings, in 3 different ways. The worker # The manager sends 6 types of pings, in 3 different ways. The worker
# answers each ping in two ways, for a total of 30 expected pongs at the # answers each ping in two ways, for a total of 36 expected pongs at the
# manager. Every batch of pings involves 6 pongs. # manager. Every batch of pings involves 6 pongs.
if ( pongs == 30 ) if ( pongs == 36 )
Cluster::publish(Cluster::worker_topic, finish); Cluster::publish(Cluster::worker_topic, finish);
else if ( pongs > 0 && pongs % 6 == 0 ) else if ( pongs > 0 && pongs % 6 == 0 )
{ {