diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc index d47e61018d..e0bdcde4b2 100644 --- a/src/cluster/serializer/broker/Serializer.cc +++ b/src/cluster/serializer/broker/Serializer.cc @@ -2,11 +2,16 @@ #include "zeek/cluster/serializer/broker/Serializer.h" +#include #include +#include "zeek/DebugLogger.h" #include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" #include "zeek/Func.h" #include "zeek/Reporter.h" +#include "zeek/Type.h" #include "zeek/broker/Data.h" #include "zeek/cluster/Backend.h" @@ -20,6 +25,36 @@ using namespace zeek::cluster; +zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(const broker::zeek::Event& ev) { + const auto& broker_meta = ev.metadata(); + if ( broker_meta.size() == 0 ) + return nullptr; + + auto meta = std::make_unique(); + meta->reserve(broker_meta.size()); + + for ( const auto& [id, v] : broker_meta ) { + const auto* desc = zeek::event_registry->LookupMetadata(id); + if ( ! desc ) { + DBG_LOG(DBG_BROKER, "Ignoring event metadata %" PRId64 " value=%s", id, + broker::to_string(v.to_data()).c_str()); + continue; + } + + auto d = v.to_data(); + auto val = zeek::Broker::detail::data_to_val(d, desc->Type().get()); + if ( ! val ) { + zeek::reporter->Error("failure converting metadata '%s' to type %s", broker::to_string(v.to_data()).c_str(), + obj_desc(desc->Type()).c_str()); + continue; + } + + meta->emplace_back(id, val); + } + + return meta; +} + std::optional detail::to_broker_event(const detail::Event& ev) { broker::vector xs; xs.reserve(ev.Args().size()); @@ -40,24 +75,33 @@ std::optional detail::to_broker_event(const detail::Event& } } - return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.Timestamp())); + // Convert metadata from the cluster::detail::Event event to broker's event metadata format. + broker::vector broker_meta; + if ( const auto* meta = ev.Metadata(); meta != nullptr ) { + broker_meta.reserve(meta->size()); + + for ( const auto& m : *meta ) { + if ( auto res = zeek::Broker::detail::val_to_data(m.Val().get()); res.has_value() ) { + broker::vector entry(2); + entry[0] = static_cast(m.Id()); + entry[1] = res.value(); + broker_meta.push_back(std::move(entry)); + } + else { + // Just for sanity - we should never get here. + zeek::reporter->Error("failure converting metadata '%s' to broker data", + obj_desc_short(m.Val()).c_str()); + } + } + } + + return broker::zeek::Event(ev.HandlerName(), xs, broker_meta); } std::optional detail::to_zeek_event(const broker::zeek::Event& ev) { auto&& name = ev.name(); auto&& args = ev.args(); - // Meh, technically need to convert ev.metadata() and - // expose it to script land as `table[count] of any` - // where consumers then know what to do with it. - // - // For now, handle the timestamp explicitly. - double ts; - if ( auto ev_ts = ev.ts() ) - broker::convert(*ev_ts, ts); - else - ts = zeek::run_state::network_time; - zeek::Args vl; zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); if ( handler == nullptr ) { @@ -98,7 +142,8 @@ std::optional detail::to_zeek_event(const broker::zeek::Event& ev } } - return detail::Event{handler, std::move(vl), ts}; + auto meta = cluster::detail::metadata_vector_from_broker_event(ev); + return cluster::detail::Event{handler, std::move(vl), std::move(meta)}; } bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { @@ -168,11 +213,21 @@ std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(b TEST_SUITE_BEGIN("cluster serializer broker"); -#include "zeek/EventRegistry.h" - TEST_CASE("roundtrip") { auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); - detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}}; + detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}, + nullptr}; + + // Register network timestamp metadata. This is idempotent. + auto nts = zeek::id::find_val("EventMetadata::NETWORK_TIMESTAMP"); + REQUIRE(nts); + bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME)); + REQUIRE(registered); + + // Add network timestamp metadata to the event. In previous Zeek versions this happened magically under the hood. + bool added = e.AddMetadata(nts, zeek::make_intrusive(0.0)); + REQUIRE(added); + zeek::byte_buffer buf; SUBCASE("json") { @@ -189,6 +244,8 @@ TEST_CASE("roundtrip") { CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->Args().size(), 2); + REQUIRE(result->Metadata()); + CHECK_EQ(result->Metadata()->size(), 1); } SUBCASE("binary") { @@ -212,6 +269,8 @@ TEST_CASE("roundtrip") { CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->Args().size(), 2); + REQUIRE(result->Metadata()); + CHECK_EQ(result->Metadata()->size(), 1); } } TEST_SUITE_END(); diff --git a/src/cluster/serializer/broker/Serializer.h b/src/cluster/serializer/broker/Serializer.h index 60e2691dbf..2f817b5cc2 100644 --- a/src/cluster/serializer/broker/Serializer.h +++ b/src/cluster/serializer/broker/Serializer.h @@ -2,13 +2,39 @@ #pragma once +#include +#include + #include "zeek/cluster/Serializer.h" namespace broker::zeek { class Event; } -namespace zeek::cluster::detail { +namespace zeek { + +namespace detail { +class MetadataEntry; + +using EventMetadataVector = std::vector; +using EventMetadataVectorPtr = std::unique_ptr; + +} // namespace detail + +namespace cluster::detail { + +/** + * Produce a EventMetadataVectorPtr from a broker event. + * + * The implementation relies on @ref zeek::EventRegistry::LookupMetadata() + * to find expected metadata types. If there's no metadata at all attached + * to this event, returns a nullptr, + * + * @param ev The broker event. + + * @return Pointer to a @ref zeek::detail::EventMetadataVector holding values for all known metadata. + */ +zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const broker::zeek::Event& ev); /** * Convert a broker::zeek::Event to cluster::detail::Event by looking @@ -50,4 +76,5 @@ public: std::optional UnserializeEvent(byte_buffer_span buf) override; }; -} // namespace zeek::cluster::detail +} // namespace cluster::detail +} // namespace zeek