cluster/serializer/broker: Support generic metadata

Instead of handling just the network timestamp, support extraction of
the whole metadata vector that broker events hold.
This commit is contained in:
Arne Welzel 2025-05-26 15:36:25 +02:00
parent 71412f35b7
commit 46d4b5825b
2 changed files with 104 additions and 18 deletions

View file

@ -2,11 +2,16 @@
#include "zeek/cluster/serializer/broker/Serializer.h" #include "zeek/cluster/serializer/broker/Serializer.h"
#include <cinttypes>
#include <optional> #include <optional>
#include "zeek/DebugLogger.h"
#include "zeek/Desc.h" #include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/Func.h" #include "zeek/Func.h"
#include "zeek/Reporter.h" #include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/broker/Data.h" #include "zeek/broker/Data.h"
#include "zeek/cluster/Backend.h" #include "zeek/cluster/Backend.h"
@ -20,6 +25,36 @@
using namespace zeek::cluster; using namespace zeek::cluster;
zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(const broker::zeek::Event& ev) {
const auto& broker_meta = ev.metadata();
if ( broker_meta.size() == 0 )
return nullptr;
auto meta = std::make_unique<zeek::detail::EventMetadataVector>();
meta->reserve(broker_meta.size());
for ( const auto& [id, v] : broker_meta ) {
const auto* desc = zeek::event_registry->LookupMetadata(id);
if ( ! desc ) {
DBG_LOG(DBG_BROKER, "Ignoring event metadata %" PRId64 " value=%s", id,
broker::to_string(v.to_data()).c_str());
continue;
}
auto d = v.to_data();
auto val = zeek::Broker::detail::data_to_val(d, desc->Type().get());
if ( ! val ) {
zeek::reporter->Error("failure converting metadata '%s' to type %s", broker::to_string(v.to_data()).c_str(),
obj_desc(desc->Type()).c_str());
continue;
}
meta->emplace_back(id, val);
}
return meta;
}
std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event& ev) { std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event& ev) {
broker::vector xs; broker::vector xs;
xs.reserve(ev.Args().size()); xs.reserve(ev.Args().size());
@ -40,24 +75,33 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event&
} }
} }
return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.Timestamp())); // Convert metadata from the cluster::detail::Event event to broker's event metadata format.
broker::vector broker_meta;
if ( const auto* meta = ev.Metadata(); meta != nullptr ) {
broker_meta.reserve(meta->size());
for ( const auto& m : *meta ) {
if ( auto res = zeek::Broker::detail::val_to_data(m.Val().get()); res.has_value() ) {
broker::vector entry(2);
entry[0] = static_cast<broker::count>(m.Id());
entry[1] = res.value();
broker_meta.push_back(std::move(entry));
}
else {
// Just for sanity - we should never get here.
zeek::reporter->Error("failure converting metadata '%s' to broker data",
obj_desc_short(m.Val()).c_str());
}
}
}
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
} }
std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev) { std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
auto&& name = ev.name(); auto&& name = ev.name();
auto&& args = ev.args(); auto&& args = ev.args();
// Meh, technically need to convert ev.metadata() and
// expose it to script land as `table[count] of any`
// where consumers then know what to do with it.
//
// For now, handle the timestamp explicitly.
double ts;
if ( auto ev_ts = ev.ts() )
broker::convert(*ev_ts, ts);
else
ts = zeek::run_state::network_time;
zeek::Args vl; zeek::Args vl;
zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name);
if ( handler == nullptr ) { if ( handler == nullptr ) {
@ -98,7 +142,8 @@ std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev
} }
} }
return detail::Event{handler, std::move(vl), ts}; auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
return cluster::detail::Event{handler, std::move(vl), std::move(meta)};
} }
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
@ -168,11 +213,21 @@ std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(b
TEST_SUITE_BEGIN("cluster serializer broker"); TEST_SUITE_BEGIN("cluster serializer broker");
#include "zeek/EventRegistry.h"
TEST_CASE("roundtrip") { TEST_CASE("roundtrip") {
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)}}; detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
nullptr};
// Register network timestamp metadata. This is idempotent.
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
REQUIRE(nts);
bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME));
REQUIRE(registered);
// Add network timestamp metadata to the event. In previous Zeek versions this happened magically under the hood.
bool added = e.AddMetadata(nts, zeek::make_intrusive<zeek::TimeVal>(0.0));
REQUIRE(added);
zeek::byte_buffer buf; zeek::byte_buffer buf;
SUBCASE("json") { SUBCASE("json") {
@ -189,6 +244,8 @@ TEST_CASE("roundtrip") {
CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->Args().size(), 2); CHECK_EQ(result->Args().size(), 2);
REQUIRE(result->Metadata());
CHECK_EQ(result->Metadata()->size(), 1);
} }
SUBCASE("binary") { SUBCASE("binary") {
@ -212,6 +269,8 @@ TEST_CASE("roundtrip") {
CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->Args().size(), 2); CHECK_EQ(result->Args().size(), 2);
REQUIRE(result->Metadata());
CHECK_EQ(result->Metadata()->size(), 1);
} }
} }
TEST_SUITE_END(); TEST_SUITE_END();

View file

@ -2,13 +2,39 @@
#pragma once #pragma once
#include <memory>
#include <vector>
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
namespace broker::zeek { namespace broker::zeek {
class Event; class Event;
} }
namespace zeek::cluster::detail { namespace zeek {
namespace detail {
class MetadataEntry;
using EventMetadataVector = std::vector<MetadataEntry>;
using EventMetadataVectorPtr = std::unique_ptr<EventMetadataVector>;
} // namespace detail
namespace cluster::detail {
/**
* Produce a EventMetadataVectorPtr from a broker event.
*
* The implementation relies on @ref zeek::EventRegistry::LookupMetadata()
* to find expected metadata types. If there's no metadata at all attached
* to this event, returns a nullptr,
*
* @param ev The broker event.
* @return Pointer to a @ref zeek::detail::EventMetadataVector holding values for all known metadata.
*/
zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const broker::zeek::Event& ev);
/** /**
* Convert a broker::zeek::Event to cluster::detail::Event by looking * Convert a broker::zeek::Event to cluster::detail::Event by looking
@ -50,4 +76,5 @@ public:
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override; std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
}; };
} // namespace zeek::cluster::detail } // namespace cluster::detail
} // namespace zeek