From 9e70d8b8ad6dd611a036107f349f2e1161e39d64 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 15 Aug 2025 21:28:18 +0200 Subject: [PATCH 1/2] broker/Data: Support unwrapping Broker::Data records Calling val_to_data() on a Broker::Data ends up wrapping the Broker::Data record instead of using the contained broker::value directly. Seems this should be the default behavior and wonder if the flag even makes sense, but for a 8.0 backport that seems more reasonable. --- src/broker/Data.cc | 28 ++++++++++++++++++++++------ src/broker/Data.h | 3 ++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/broker/Data.cc b/src/broker/Data.cc index bc3db0007e..466c4f8d7b 100644 --- a/src/broker/Data.cc +++ b/src/broker/Data.cc @@ -12,6 +12,7 @@ #include "zeek/IntrusivePtr.h" #include "zeek/RE.h" #include "zeek/Scope.h" +#include "zeek/Type.h" #include "zeek/broker/data.bif.h" #include "zeek/module_util.h" @@ -718,7 +719,7 @@ ValPtr data_to_val(broker::data& d, Type* type) { return visit(val_converter{type}, d); } -std::optional val_to_data(const Val* v) { +std::optional val_to_data(const Val* v, bool unwrap_broker_data) { switch ( v->GetType()->Tag() ) { case TYPE_BOOL: return {v->AsBool()}; case TYPE_INT: return {v->AsInt()}; @@ -804,7 +805,7 @@ std::optional val_to_data(const Val* v) { composite_key.reserve(vl->Length()); for ( auto k = 0; k < vl->Length(); ++k ) { - auto key_part = val_to_data(vl->Idx(k).get()); + auto key_part = val_to_data(vl->Idx(k).get(), unwrap_broker_data); if ( ! key_part ) return std::nullopt; @@ -822,7 +823,7 @@ std::optional val_to_data(const Val* v) { if ( is_set ) get(rval).emplace(std::move(key)); else { - auto val = val_to_data(te.value->GetVal().get()); + auto val = val_to_data(te.value->GetVal().get(), unwrap_broker_data); if ( ! val ) return std::nullopt; @@ -846,7 +847,7 @@ std::optional val_to_data(const Val* v) { return std::nullopt; } - auto item = val_to_data(item_val.get()); + auto item = val_to_data(item_val.get(), unwrap_broker_data); if ( ! item ) return std::nullopt; @@ -871,7 +872,7 @@ std::optional val_to_data(const Val* v) { return std::nullopt; } - auto item = val_to_data(item_val.get()); + auto item = val_to_data(item_val.get(), unwrap_broker_data); if ( ! item ) return std::nullopt; @@ -883,6 +884,21 @@ std::optional val_to_data(const Val* v) { } case TYPE_RECORD: { auto rec = v->AsRecordVal(); + + // If unwrap_broker_data is set and this record is a Broker::Data record, + // use the contained data field directly. + if ( unwrap_broker_data && rec->GetType() == BifType::Record::Broker::Data ) { + const auto ov = rec->GetField(0); + // Sanity. + if ( ov->GetType() != opaque_of_data_type ) { + reporter->Error("Broker::Data data field has wrong type: %s", + obj_desc_short(ov->GetType()).c_str()); + return std::nullopt; + } + + return static_cast(ov.get())->data; + } + broker::vector rval; size_t num_fields = v->GetType()->AsRecordType()->NumFields(); rval.reserve(num_fields); @@ -895,7 +911,7 @@ std::optional val_to_data(const Val* v) { continue; } - auto item = val_to_data(item_val.get()); + auto item = val_to_data(item_val.get(), unwrap_broker_data); if ( ! item ) return std::nullopt; diff --git a/src/broker/Data.h b/src/broker/Data.h index 0735acb28b..568e72e7ee 100644 --- a/src/broker/Data.h +++ b/src/broker/Data.h @@ -76,9 +76,10 @@ EnumValPtr get_data_type(RecordVal* v, zeek::detail::Frame* frame); /** * Convert a Zeek value to a Broker data value. * @param v a Zeek value. + * @param unwrap_broker_data If v or any nested value is a Broker::Data record, use its data broker::value directly. * @return a Broker data value if the Zeek value could be converted to one. */ -std::optional val_to_data(const Val* v); +std::optional val_to_data(const Val* v, bool unwrap_broker_data = false); /** * Convert a Broker data value to a Zeek value. From f57a1263d4c3ab2c5811e31ffd282de7cf2bf08e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Fri, 15 Aug 2025 21:33:52 +0200 Subject: [PATCH 2/2] cluster/serializer/broker: Do not special case Broker::Data anymore The previous approach ignored the fact that nested / inner values might also be Broker::Data values. I'm not super sure about the validity of the test, because it's essentially demonstrating any-nesting, but it's not leading to extra Broker::Data encoding. --- src/cluster/serializer/broker/Serializer.cc | 13 ++----------- .../..manager..stdout | 7 +++++++ .../..worker-1..stdout | 3 +++ testing/btest/cluster/generic/publish-any.zeek | 17 ++++++++++++----- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc index e17ea9a38d..5bbb8a2bcd 100644 --- a/src/cluster/serializer/broker/Serializer.cc +++ b/src/cluster/serializer/broker/Serializer.cc @@ -59,19 +59,10 @@ std::optional detail::to_broker_event(const zeek::cluster:: xs.reserve(ev.Args().size()); for ( const auto& a : ev.Args() ) { - if ( a->GetType() == zeek::BifType::Record::Broker::Data ) { - // When encountering a Broker::Data instance within args, pick out - // the broker::data directly to avoid double encoding, Broker::Data. - const auto& val = a->AsRecordVal()->GetField(0); - auto* data_val = static_cast(val.get()); - xs.emplace_back(data_val->data); - } - else if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) { + if ( auto res = zeek::Broker::detail::val_to_data(a.get(), /*flatten_broker_dataval=*/true) ) xs.emplace_back(std::move(res.value())); - } - else { + else return std::nullopt; - } } // Convert metadata from the cluster::detail::Event event to broker's event metadata format. diff --git a/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout b/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout index 1bb58756f0..f8a26eff5b 100644 --- a/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout +++ b/testing/btest/Baseline/cluster.generic.publish-any/..manager..stdout @@ -35,3 +35,10 @@ got pong, 27, with, 4, time (cluster publish), Broker::Data, [data=broker::data{ got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}] got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}] +sending pings, 5, R, [c=42, a=[[c=42, a=hello]]] +got pong, 31, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got pong, 32, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got pong, 33, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got pong, 34, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got pong, 35, with, 5, R (cluster publish), Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got pong, 36, with, 5, R (cluster event ), Broker::Data, [data=broker::data{(42, ((42, hello)))}] diff --git a/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout b/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout index 805fd81613..eae3c74d8c 100644 --- a/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout +++ b/testing/btest/Baseline/cluster.generic.publish-any/..worker-1..stdout @@ -14,4 +14,7 @@ got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}] +got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}] +got ping, 5, R, Broker::Data, [data=broker::data{(42, ((42, hello)))}] got finish! diff --git a/testing/btest/cluster/generic/publish-any.zeek b/testing/btest/cluster/generic/publish-any.zeek index 3866bd9812..5a09dc7687 100644 --- a/testing/btest/cluster/generic/publish-any.zeek +++ b/testing/btest/cluster/generic/publish-any.zeek @@ -34,9 +34,14 @@ global pong: event(c: count, what: string, val: any) &is_used; global i = 0; global pongs = 0; +type R: record { + c: count; + a: any; +}; + event send_any() { - if ( i > 4 ) + if ( i > 5 ) return; local val: any; @@ -48,8 +53,10 @@ event send_any() val = 42/tcp; else if ( i == 3 ) val = vector(1, 2, 3); - else + else if ( i == 4 ) val = double_to_time(42.0); + else + val = R($c=42, $a=vector(R($c=42, $a="hello"))); print "sending pings", i, type_name(val), val; Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val); @@ -64,10 +71,10 @@ event pong(c: count, what: string, val: any) ++pongs; print "got pong", pongs, "with", c, what, type_name(val), val; - # The manager sends 5 types of pings, in 3 different ways. The worker - # answers each ping in two ways, for a total of 30 expected pongs at the + # The manager sends 6 types of pings, in 3 different ways. The worker + # answers each ping in two ways, for a total of 36 expected pongs at the # manager. Every batch of pings involves 6 pongs. - if ( pongs == 30 ) + if ( pongs == 36 ) Cluster::publish(Cluster::worker_topic, finish); else if ( pongs > 0 && pongs % 6 == 0 ) {