diff --git a/CHANGES b/CHANGES index 0991dfa3f8..b5aed4de4a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,66 @@ +8.0.0-dev.324 | 2025-06-02 17:32:39 +0200 + + * Event: Bail on add_missing_remote_network_timestamp without add_network_timestamp (Arne Welzel, Corelight) + + * btest/plugin: Test custom metadata publish (Arne Welzel, Corelight) + + Usage demo for plugin writers to add custom event metadata and access in + in Zeek scripts. + + * NEWS: Add note about generic event metadata (Arne Welzel, Corelight) + + * cluster: Remove deprecated Event constructor (Arne Welzel, Corelight) + + It is now unused, ditch it. This wasn't available in an LTS release yet + and anyhow is in the detail namespace. + + * cluster: Remove some explicit timestamp handling (Arne Welzel, Corelight) + + Backend::MakeClusterEvent() for now is the only place to add implicit + network timestamp metadata within the cluster component. + + * broker/Manager: Fetch and forward all metadata from events (Arne Welzel, Corelight) + + Also use the generic metadata version for publishing, keep the + ts-based API for now, but only add timestamps when + EventMetadata::add_network_timestamp is T. I'm not sure what the + right way forward here is, maybe deprecating Broker's publish event + variations and funneling through cluster. + + * Event/init-bare: Add add_missing_remote_network_timestamp logic (Arne Welzel, Corelight) + + Make defaulting to the local network timestamp for remote events opt-in. + + * cluster/Backend/DoProcessEvent: Use generic metadata, not just timestamps (Arne Welzel, Corelight) + + * cluster/Event: Support moving args and metadata from event (Arne Welzel, Corelight) + + * cluster/serializer/broker: Support generic metadata (Arne Welzel, Corelight) + + Instead of handling just the network timestamp, support extraction of + the whole metadata vector that broker events hold. + + * cluster/Event: Generic metadata support (Arne Welzel, Corelight) + + Instead of a timestamp attribute, switch to holding a EventMetadataVectorPtr + like zeek::Event instances do. Keep the old constructor until the end of + the patch series. + + * Event: Use -1.0 for undefined/unset timestamps (Arne Welzel, Corelight) + + This can happen if either there's no network timestamp associated with + an event, or there's currently no event being dispatched. Using 0.0 + isn't great as it's the normal start timestamp before reading a network + packet. Using -1.0 gives the caller a chance to check and realize what's + going on. + + * cluster: Use shorter obj_desc versions (Arne Welzel, Corelight) + + * Desc: Add obj_desc() / obj_desc_short() overloads for IntrusivePtr (Arne Welzel, Corelight) + + When using these helpers in code, one barely has raw pointers and the + repeated `.get()` call cumbersome and noisy. + 8.0.0-dev.309 | 2025-06-02 10:17:14 +0200 * Prefer `std::move` over copy (Benjamin Bannier, Corelight) diff --git a/NEWS b/NEWS index 65ca985ca9..799b25e971 100644 --- a/NEWS +++ b/NEWS @@ -35,9 +35,26 @@ Breaking Changes redef LogAscii::json_timestamps = JSON::TS_MILLIS_UNSIGNED; +- The ``current_event_time()`` builtin function as well as ``Event::Time()`` + and ``EventMgr::CurrentEventTime()`` now return ``-1.0`` if no timestamp + metadata is available for the current event, or if no event is being + dispatched. Previously this would've been 0.0, or the timestamp of the previously + dispatched event. + +- Missing network timestamp metadata on remote events is not set to the local + network time anymore by default. This potentially hid useful debugging information + about another node not sending timestamp metadata. The old behavior can be + re-enabled as follows: + + redef EventMetadata::add_missing_remote_network_timestamp = T; + New Functionality ----------------- +- Generic event metadata support. A new ``EventMetadata`` module was added allowing + to register generic event metadata types and accessing the current event's metadata + using the functions ``current()`` and ``current_all()`` of this module. + - A new plugin hook, ``HookPublishEvent()``, has been added for intercepting publishing of Zeek events. This hook may be used for monitoring purposes, modifying or rerouting remote events. diff --git a/VERSION b/VERSION index 9f9d005d81..a4a35023bc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0-dev.309 +8.0.0-dev.324 diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index d19bb85a88..178b0e9cd4 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -601,6 +601,16 @@ export { ## might be a value before the network_time() when the event ## was actually dispatched. const add_network_timestamp: bool = F &redef; + + ## By default, remote events without network timestamp metadata + ## will yield a negative zeek:see:`current_event_time` during + ## processing. To have the receiving Zeek node set the event's + ## network timestamp metadata with its current local network time, + ## set this option to true. + ## + ## This setting is only in effect if :zeek:see:`EventMetadata::add_network_timestamp` + ## is also set to true. + const add_missing_remote_network_timestamp: bool = F &redef; } module FTP; diff --git a/src/Desc.h b/src/Desc.h index ed092cb0b3..b2f4bb56fa 100644 --- a/src/Desc.h +++ b/src/Desc.h @@ -7,6 +7,7 @@ #include #include +#include "zeek/IntrusivePtr.h" #include "zeek/ZeekString.h" // for byte_vec #include "zeek/util.h" // for zeek_int_t @@ -211,13 +212,13 @@ protected: }; // Returns a string representation of an object's description. Used for -// debugging and error messages. takes a bare pointer rather than an -// IntrusivePtr because the latter is harder to deal with when making -// calls from a debugger like lldb, which is the main use of this function. +// debugging and error messages. class Obj; std::string obj_desc(const Obj* o); +inline std::string obj_desc(const IntrusivePtr& o) { return obj_desc(o.get()); } // Same as obj_desc(), but ensure it is short and don't include location info. std::string obj_desc_short(const Obj* o); +inline std::string obj_desc_short(const IntrusivePtr& o) { return obj_desc_short(o.get()); } } // namespace zeek diff --git a/src/Event.cc b/src/Event.cc index 8164ba3dd8..4c6680415d 100644 --- a/src/Event.cc +++ b/src/Event.cc @@ -11,6 +11,7 @@ #include "zeek/Val.h" #include "zeek/iosource/Manager.h" #include "zeek/plugin/Manager.h" +#include "zeek/util.h" #include "const.bif.netvar_h" #include "event.bif.netvar_h" @@ -96,7 +97,7 @@ zeek::VectorValPtr Event::MetadataValues(const EnumValPtr& id) const { double Event::Time() const { if ( ! meta ) - return 0.0; + return detail::NO_TIMESTAMP; for ( const auto& m : *meta ) if ( m.Id() == static_cast(detail::MetadataType::NetworkTimestamp) ) { @@ -109,7 +110,7 @@ double Event::Time() const { return m.Val()->AsTime(); } - return 0.0; + return detail::NO_TIMESTAMP; } void Event::Describe(ODesc* d) const { @@ -165,12 +166,19 @@ void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID detail::EventMetadataVectorPtr meta; double ts = double(deprecated_ts); - if ( src == util::detail::SOURCE_LOCAL && BifConst::EventMetadata::add_network_timestamp ) { - // If this is a local event and EventMetadata::add_network_timestamp is - // enabled automatically set the network timestamp for this event to the - // current network time when it is < 0 (default is -1.0). - // - // See the other Enqueue() implementation for the local motivation. + + // If this is a local event and EventMetadata::add_network_timestamp is + // enabled, automatically set the network timestamp for this event to the + // current network time when it is < 0 (default of deprecated_ts is -1.0). + // + // See the other Enqueue() implementation for the local vs broker/remote + // motivation of want_network_timestamp. + bool want_network_timestamp = + BifConst::EventMetadata::add_network_timestamp && + ((src == util::detail::SOURCE_LOCAL) || + (src == util::detail::SOURCE_BROKER && BifConst::EventMetadata::add_missing_remote_network_timestamp)); + + if ( want_network_timestamp ) { if ( ts < 0.0 ) ts = run_state::network_time; @@ -189,15 +197,23 @@ void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID void EventMgr::Enqueue(detail::EventMetadataVectorPtr meta, const EventHandlerPtr& h, Args vl, util::detail::SourceID src, analyzer::ID aid, Obj* obj) { - if ( src == util::detail::SOURCE_LOCAL && BifConst::EventMetadata::add_network_timestamp ) { - // If all events are supposed to have a network time attached, ensure - // that the meta vector was passed *and* contains a network timestamp. - // - // This is only done for local events, however. For remote events (src == BROKER) - // that do not hold network timestamp metadata, it seems less surprising to keep - // it unset. If it is required that a remote node sends *their* network timestamp, - // defaulting to this node's network time seems more confusing and error prone - // than just leaving it unset and having the consumer deal with the situation. + // Attach network timestamps to all events if EventMetadata::add_network_timestamp is T and + // + // 1) this event is locally generated + // or + // 2) this is a remote event and EventMetadata::add_missing_remote_network_timestamp is T + // + // Why so complicated? It seems less surprising behavior to keep network timestamp metadata unset + // if a remote event didn't have any attached. It should help to more easily figure out what's + // actually going on compared to setting it to the local network time. If all nodes are required to + // send *their* network timestamp, filling it with this node's network time seems more confusing + // and error prone compared to just leaving it unset and having the consumer deal with the situation. + bool want_network_timestamp = + BifConst::EventMetadata::add_network_timestamp && + ((src == util::detail::SOURCE_LOCAL) || + (src == util::detail::SOURCE_BROKER && BifConst::EventMetadata::add_missing_remote_network_timestamp)); + + if ( want_network_timestamp ) { bool has_time = false; if ( ! meta ) { @@ -348,6 +364,14 @@ void EventMgr::InitPostScript() { if ( ! zeek::event_registry->RegisterMetadata(net_ts_val, zeek::base_type(zeek::TYPE_TIME)) ) zeek::reporter->FatalError("Failed to register NETWORK_TIMESTAMP metadata"); + // Remove this if there's ever a use-case to not use them together. + if ( BifConst::EventMetadata::add_missing_remote_network_timestamp && + ! BifConst::EventMetadata::add_network_timestamp ) + zeek::reporter->FatalError( + "Setting EventMetadata::add_missing_remote_network_timestamp is only valid together with " + "EventMetadata::add_network_timestamp"); + + iosource_mgr->Register(this, true, false); } } // namespace zeek diff --git a/src/Event.h b/src/Event.h index 49c78dae6f..82804ca58d 100644 --- a/src/Event.h +++ b/src/Event.h @@ -49,6 +49,8 @@ using EventMetadataVectorPtr = std::unique_ptr; */ EventMetadataVectorPtr MakeEventMetadataVector(double t); +constexpr double NO_TIMESTAMP = -1.0; + } // namespace detail class Event final : public Obj { @@ -170,8 +172,8 @@ public: // the event was intended to be executed. For scheduled events, this is the time the event // was scheduled to. For any other event, this is the time when the event was created. // - // If no event is being processed, returns 0.0. - double CurrentEventTime() const { return current ? current->Time() : 0.0; } + // If no event is being processed or there is no timestamp information, returns -1.0 + double CurrentEventTime() const { return current ? current->Time() : detail::NO_TIMESTAMP; } int Size() const { return num_events_queued - num_events_dispatched; } diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index d3d7020cf9..22c70981d7 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,8 @@ #include "zeek/telemetry/Manager.h" #include "zeek/util.h" +#include "const.bif.netvar_h" + using namespace std; namespace { @@ -850,8 +853,15 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args, if ( peer_count == 0 && hub_count == 0 ) return true; - broker::zeek::Event ev(name, args, broker::to_timestamp(ts)); - DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, ev.args()).c_str()); + broker::vector meta; + if ( BifConst::EventMetadata::add_network_timestamp ) { + broker::vector entry{static_cast(zeek::detail::MetadataType::NetworkTimestamp), + broker::to_timestamp(ts)}; + meta.emplace_back(std::move(entry)); + } + + broker::zeek::Event ev(name, args, meta); + DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str()); bstate->endpoint.publish(std::move(topic), ev.move_data()); num_events_outgoing_metric->Inc(); return true; @@ -1570,15 +1580,10 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { auto&& name = ev.name(); auto&& args = ev.args(); - double ts; + auto meta = cluster::detail::metadata_vector_from_broker_event(ev); - if ( auto ev_ts = ev.ts() ) - broker::convert(*ev_ts, ts); - else - // Default to current network time, if the received event did not contain a timestamp. - ts = run_state::network_time; - - DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", std::string{name}.c_str(), ts, RenderMessage(args).c_str()); + DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(), + meta ? meta->size() : 0, RenderMessage(args).c_str()); num_events_incoming_metric->Inc(); auto handler = event_registry->Lookup(name); @@ -1654,13 +1659,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) { } } - if ( vl.size() == args.size() ) { - zeek::detail::EventMetadataVectorPtr meta; - if ( ts >= 0.0 ) - meta = zeek::detail::MakeEventMetadataVector(ts); - + if ( vl.size() == args.size() ) event_mgr.Enqueue(std::move(meta), handler, std::move(vl), util::detail::SOURCE_BROKER); - } } bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) { diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index ae23a5fbf1..b0a86ef600 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -2,14 +2,17 @@ #include "zeek/cluster/Backend.h" +#include #include #include "zeek/Desc.h" #include "zeek/Event.h" +#include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" #include "zeek/Func.h" #include "zeek/Reporter.h" #include "zeek/Type.h" +#include "zeek/Val.h" #include "zeek/cluster/Manager.h" #include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" @@ -19,15 +22,48 @@ #include "zeek/plugin/Plugin.h" #include "zeek/util.h" +#include "zeek/3rdparty/doctest.h" + using namespace zeek::cluster; +double detail::Event::Timestamp() const { + if ( meta ) { + for ( const auto& m : *meta ) { + if ( m.Id() == static_cast(zeek::detail::MetadataType::NetworkTimestamp) ) + return m.Val()->AsTime(); + } + } + + return zeek::detail::NO_TIMESTAMP; +} + +bool detail::Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) { + if ( ! id || ! val ) + return false; + + const auto* desc = zeek::event_registry->LookupMetadata(id->Get()); + if ( ! desc ) + return false; + + if ( ! same_type(val->GetType(), desc->Type()) ) + return false; + + if ( ! meta ) + meta = std::make_unique(); + + // Internally stored as zeek_uint_t for serializers. + meta->emplace_back(desc->Id(), std::move(val)); + + return true; +} + +std::tuple detail::Event::Take() && { + return {handler, std::move(args), std::move(meta)}; +} bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, detail::Event e) { - zeek::detail::EventMetadataVectorPtr meta; - if ( auto ts = e.Timestamp(); ts >= 0.0 ) - meta = zeek::detail::MakeEventMetadataVector(e.Timestamp()); - - zeek::event_mgr.Enqueue(std::move(meta), e.Handler(), std::move(e.Args()), util::detail::SOURCE_BROKER); + auto [handler, args, meta] = std::move(e).Take(); + zeek::event_mgr.Enqueue(std::move(meta), handler, std::move(args), util::detail::SOURCE_BROKER); return true; } @@ -73,8 +109,7 @@ std::optional detail::check_args(const zeek::FuncValPtr& handler, ze if ( ! same_type(got_type, expected_type) ) { zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1, - zeek::obj_desc_short(got_type.get()).c_str(), - zeek::obj_desc_short(expected_type.get()).c_str()); + zeek::obj_desc_short(got_type).c_str(), zeek::obj_desc_short(expected_type).c_str()); return std::nullopt; } @@ -100,21 +135,38 @@ bool Backend::Init(std::string nid) { return DoInit(); } -std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { +std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const { auto checked_args = detail::check_args(handler, args); if ( ! checked_args ) return std::nullopt; - if ( timestamp == 0.0 ) - timestamp = zeek::event_mgr.CurrentEventTime(); const auto& eh = zeek::event_registry->Lookup(handler->AsFuncPtr()->GetName()); if ( ! eh ) { - zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc(handler.get()).c_str()); + zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc_short(handler).c_str()); return std::nullopt; } - return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp}; + /** + * If you ever stare at this and wonder: Currently, if someone calls + * Cluster::publish() from within a remote event in script land, then + * CurrentEventTime() below will yield the network timestamp of the + * remote event. That means that the outgoing event from this node will + * have the network timestamp of the originating node, which may be + * different from what the local network time is. + * + * This could be confusing and another policy might be to always set + * the network timestamp metadata for for outgoing events to the local + * network time instead, even when currently handling a remote event. + * + * @J-Gras prefers the current behavior. @awelzel wonders if there should + * be an opt-in/opt-out for this behavior. Procrastinating it for now. + */ + zeek::detail::EventMetadataVectorPtr meta; + if ( zeek::BifConst::EventMetadata::add_network_timestamp ) + meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime()); + + return zeek::cluster::detail::Event{eh, std::move(*checked_args), std::move(meta)}; } void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { @@ -244,3 +296,48 @@ void ThreadedBackend::Process(QueueMessage&& msg) { zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); } } + +TEST_SUITE_BEGIN("cluster event"); + +TEST_CASE("add metadata") { + auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); + zeek::Args args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}; + zeek::cluster::detail::Event event{handler, args, nullptr}; + + auto nts = zeek::id::find_val("EventMetadata::NETWORK_TIMESTAMP"); + REQUIRE(nts); + auto unk = zeek::id::find_val("Log::UNKNOWN"); + REQUIRE(unk); + + bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME)); + REQUIRE(registered); + + SUBCASE("valid") { + CHECK_EQ(event.Timestamp(), -1.0); + CHECK(event.AddMetadata(nts, zeek::make_intrusive(42.0))); + CHECK_EQ(event.Timestamp(), 42.0); + } + + SUBCASE("valid-two-times") { + CHECK_EQ(event.Timestamp(), -1.0); + CHECK(event.AddMetadata(nts, zeek::make_intrusive(42.0))); + CHECK(event.AddMetadata(nts, zeek::make_intrusive(43.0))); + CHECK_EQ(event.Timestamp(), 42.0); // finds the first one + CHECK_EQ(event.Metadata()->size(), 2); // both are stored + } + + SUBCASE("invalid-value-type") { + CHECK_EQ(event.Timestamp(), -1.0); + CHECK_FALSE(event.AddMetadata(nts, zeek::make_intrusive(42.0))); + CHECK_EQ(event.Timestamp(), -1.0); + CHECK_EQ(event.Metadata(), nullptr); + } + + SUBCASE("unregistered-metadata-identifier") { + CHECK_EQ(event.Timestamp(), -1.0); + CHECK_FALSE(event.AddMetadata(unk, zeek::make_intrusive(42.0))); + CHECK_EQ(event.Timestamp(), -1.0); + CHECK_EQ(event.Metadata(), nullptr); + } +} +TEST_SUITE_END(); diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index c3506cc3e2..1c6ba82e2e 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -9,6 +9,7 @@ #include #include +#include "zeek/Event.h" #include "zeek/EventHandler.h" #include "zeek/Span.h" #include "zeek/Tag.h" @@ -37,8 +38,8 @@ public: /** * Constructor. */ - Event(const EventHandlerPtr& handler, zeek::Args args, double timestamp = 0.0) - : handler(handler), args(std::move(args)), timestamp(timestamp) {} + Event(const EventHandlerPtr& handler, zeek::Args args, zeek::detail::EventMetadataVectorPtr meta) + : handler(handler), args(std::move(args)), meta(std::move(meta)) {} /** * @return The name of the event. @@ -60,16 +61,40 @@ public: zeek::Args& Args() { return args; } /** - * @return The network timestamp metadata of this event or 0.0. + * @return The network timestamp metadata of this event or -1.0 if not set. */ - double Timestamp() const { return timestamp; } + double Timestamp() const; + + /** + * Add metadata to this cluster event. + * + * The used metadata \a id has to be registered via the Zeek script-layer + * function EventMetadata::register(), or via the C++ API + * EventMgr::RegisterMetadata() during an InitPostScript() hook. + * + * Non-registered metadata will not be added and false is returned. + * + * @param id The enum value identifying the event metadata. + * @param val The value to use. + + * @return true if \a val was was added, else false. + */ + bool AddMetadata(const EnumValPtr& id, ValPtr val); + + /** + * @return A pointer to the metadata vector, or nullptr if no Metadata has been added yet. + */ + const zeek::detail::EventMetadataVector* Metadata() const { return meta.get(); } + + /** + * Move data out of this event as preparation for Enqueue() + */ + std::tuple Take() &&; private: EventHandlerPtr handler; zeek::Args args; - double timestamp; // TODO: This should be more generic, possibly holding a - // vector of key/value metadata, rather than just - // the timestamp. + zeek::detail::EventMetadataVectorPtr meta; }; /** @@ -201,9 +226,8 @@ public: * * @param handler A function val representing an event handler. * @param args The arguments for the event handler. - * @param timestamp The network time to add to the event as metadata. */ - std::optional MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp = 0.0) const; + std::optional MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const; /** * Publish a cluster::detail::Event instance to a given topic. diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc index 434ab2b1e0..7632ee8763 100644 --- a/src/cluster/BifSupport.cc +++ b/src/cluster/BifSupport.cc @@ -17,29 +17,20 @@ namespace { // Convert a script-level Cluster::Event to a cluster::detail::Event. -std::optional to_cluster_event(const zeek::RecordValPtr& rec) { +std::optional to_cluster_event(const zeek::cluster::Backend* backend, + const zeek::RecordValPtr& rec) { const auto& func = rec->GetField(0); const auto& vargs = rec->GetField(1); if ( ! func ) return std::nullopt; - const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName()); - if ( ! eh ) { - zeek::emit_builtin_error( - zeek::util::fmt("event registry lookup of '%s' failed", zeek::obj_desc_short(func.get()).c_str())); - return std::nullopt; - } - // Need to copy from VectorVal to zeek::Args zeek::Args args(vargs->Size()); for ( size_t i = 0; i < vargs->Size(); i++ ) args[i] = vargs->ValAt(i); - // TODO: Support configurable timestamps or custom metadata on the record. - auto timestamp = zeek::event_mgr.CurrentEventTime(); - - return zeek::cluster::detail::Event(eh, std::move(args), timestamp); + return backend->MakeClusterEvent(func, zeek::Span{args}); } } // namespace @@ -60,7 +51,7 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args) { if ( maybe_func_val->GetType()->Tag() != zeek::TYPE_FUNC ) { zeek::emit_builtin_error( - zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType().get()).c_str())); + zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType()).c_str())); return rec; } @@ -97,11 +88,8 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { auto topic_str = topic->AsStringVal()->ToStdString(); - auto timestamp = zeek::event_mgr.CurrentEventTime(); - if ( args[0]->GetType()->Tag() == zeek::TYPE_FUNC ) { - auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1), - timestamp); + auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1)); if ( event ) return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *event)); @@ -109,7 +97,7 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { } else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) { if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type - auto ev = to_cluster_event(zeek::cast_intrusive(args[0])); + auto ev = to_cluster_event(zeek::cluster::backend, zeek::cast_intrusive(args[0])); if ( ! ev ) return zeek::val_mgr->False(); @@ -121,7 +109,7 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { if ( zeek::cluster::backend != zeek::broker_mgr ) { zeek::emit_builtin_error( zeek::util::fmt("Publish of Broker::Event record instance with type '%s' to a non-Broker backend", - zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + zeek::obj_desc_short(args[0]->GetType()).c_str())); return zeek::val_mgr->False(); } @@ -130,13 +118,13 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { } else { zeek::emit_builtin_error(zeek::util::fmt("Publish of unknown record type '%s'", - zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + zeek::obj_desc_short(args[0]->GetType()).c_str())); return zeek::val_mgr->False(); } } zeek::emit_builtin_error(zeek::util::fmt("expected function or record as first argument, got %s", - zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + zeek::obj_desc_short(args[0]->GetType()).c_str())); return zeek::val_mgr->False(); } diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc index d47e61018d..e0bdcde4b2 100644 --- a/src/cluster/serializer/broker/Serializer.cc +++ b/src/cluster/serializer/broker/Serializer.cc @@ -2,11 +2,16 @@ #include "zeek/cluster/serializer/broker/Serializer.h" +#include #include +#include "zeek/DebugLogger.h" #include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" #include "zeek/Func.h" #include "zeek/Reporter.h" +#include "zeek/Type.h" #include "zeek/broker/Data.h" #include "zeek/cluster/Backend.h" @@ -20,6 +25,36 @@ using namespace zeek::cluster; +zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(const broker::zeek::Event& ev) { + const auto& broker_meta = ev.metadata(); + if ( broker_meta.size() == 0 ) + return nullptr; + + auto meta = std::make_unique(); + meta->reserve(broker_meta.size()); + + for ( const auto& [id, v] : broker_meta ) { + const auto* desc = zeek::event_registry->LookupMetadata(id); + if ( ! desc ) { + DBG_LOG(DBG_BROKER, "Ignoring event metadata %" PRId64 " value=%s", id, + broker::to_string(v.to_data()).c_str()); + continue; + } + + auto d = v.to_data(); + auto val = zeek::Broker::detail::data_to_val(d, desc->Type().get()); + if ( ! val ) { + zeek::reporter->Error("failure converting metadata '%s' to type %s", broker::to_string(v.to_data()).c_str(), + obj_desc(desc->Type()).c_str()); + continue; + } + + meta->emplace_back(id, val); + } + + return meta; +} + std::optional detail::to_broker_event(const detail::Event& ev) { broker::vector xs; xs.reserve(ev.Args().size()); @@ -40,24 +75,33 @@ std::optional detail::to_broker_event(const detail::Event& } } - return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.Timestamp())); + // Convert metadata from the cluster::detail::Event event to broker's event metadata format. + broker::vector broker_meta; + if ( const auto* meta = ev.Metadata(); meta != nullptr ) { + broker_meta.reserve(meta->size()); + + for ( const auto& m : *meta ) { + if ( auto res = zeek::Broker::detail::val_to_data(m.Val().get()); res.has_value() ) { + broker::vector entry(2); + entry[0] = static_cast(m.Id()); + entry[1] = res.value(); + broker_meta.push_back(std::move(entry)); + } + else { + // Just for sanity - we should never get here. + zeek::reporter->Error("failure converting metadata '%s' to broker data", + obj_desc_short(m.Val()).c_str()); + } + } + } + + return broker::zeek::Event(ev.HandlerName(), xs, broker_meta); } std::optional detail::to_zeek_event(const broker::zeek::Event& ev) { auto&& name = ev.name(); auto&& args = ev.args(); - // Meh, technically need to convert ev.metadata() and - // expose it to script land as `table[count] of any` - // where consumers then know what to do with it. - // - // For now, handle the timestamp explicitly. - double ts; - if ( auto ev_ts = ev.ts() ) - broker::convert(*ev_ts, ts); - else - ts = zeek::run_state::network_time; - zeek::Args vl; zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); if ( handler == nullptr ) { @@ -98,7 +142,8 @@ std::optional detail::to_zeek_event(const broker::zeek::Event& ev } } - return detail::Event{handler, std::move(vl), ts}; + auto meta = cluster::detail::metadata_vector_from_broker_event(ev); + return cluster::detail::Event{handler, std::move(vl), std::move(meta)}; } bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { @@ -168,11 +213,21 @@ std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(b TEST_SUITE_BEGIN("cluster serializer broker"); -#include "zeek/EventRegistry.h" - TEST_CASE("roundtrip") { auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); - detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}}; + detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}, + nullptr}; + + // Register network timestamp metadata. This is idempotent. + auto nts = zeek::id::find_val("EventMetadata::NETWORK_TIMESTAMP"); + REQUIRE(nts); + bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME)); + REQUIRE(registered); + + // Add network timestamp metadata to the event. In previous Zeek versions this happened magically under the hood. + bool added = e.AddMetadata(nts, zeek::make_intrusive(0.0)); + REQUIRE(added); + zeek::byte_buffer buf; SUBCASE("json") { @@ -189,6 +244,8 @@ TEST_CASE("roundtrip") { CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->Args().size(), 2); + REQUIRE(result->Metadata()); + CHECK_EQ(result->Metadata()->size(), 1); } SUBCASE("binary") { @@ -212,6 +269,8 @@ TEST_CASE("roundtrip") { CHECK_EQ(result->Handler(), handler); CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); CHECK_EQ(result->Args().size(), 2); + REQUIRE(result->Metadata()); + CHECK_EQ(result->Metadata()->size(), 1); } } TEST_SUITE_END(); diff --git a/src/cluster/serializer/broker/Serializer.h b/src/cluster/serializer/broker/Serializer.h index 60e2691dbf..2f817b5cc2 100644 --- a/src/cluster/serializer/broker/Serializer.h +++ b/src/cluster/serializer/broker/Serializer.h @@ -2,13 +2,39 @@ #pragma once +#include +#include + #include "zeek/cluster/Serializer.h" namespace broker::zeek { class Event; } -namespace zeek::cluster::detail { +namespace zeek { + +namespace detail { +class MetadataEntry; + +using EventMetadataVector = std::vector; +using EventMetadataVectorPtr = std::unique_ptr; + +} // namespace detail + +namespace cluster::detail { + +/** + * Produce a EventMetadataVectorPtr from a broker event. + * + * The implementation relies on @ref zeek::EventRegistry::LookupMetadata() + * to find expected metadata types. If there's no metadata at all attached + * to this event, returns a nullptr, + * + * @param ev The broker event. + + * @return Pointer to a @ref zeek::detail::EventMetadataVector holding values for all known metadata. + */ +zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const broker::zeek::Event& ev); /** * Convert a broker::zeek::Event to cluster::detail::Event by looking @@ -50,4 +76,5 @@ public: std::optional UnserializeEvent(byte_buffer_span buf) override; }; -} // namespace zeek::cluster::detail +} // namespace cluster::detail +} // namespace zeek diff --git a/src/const.bif b/src/const.bif index 482a325588..7874352d6d 100644 --- a/src/const.bif +++ b/src/const.bif @@ -36,3 +36,4 @@ const Log::write_buffer_size: count; const Storage::expire_interval: interval; const EventMetadata::add_network_timestamp: bool; +const EventMetadata::add_missing_remote_network_timestamp: bool; diff --git a/testing/btest/Baseline/core.event-metadata.add-missing-remote-network-timestamp-error/.stderr b/testing/btest/Baseline/core.event-metadata.add-missing-remote-network-timestamp-error/.stderr new file mode 100644 index 0000000000..bfe7ca4e75 --- /dev/null +++ b/testing/btest/Baseline/core.event-metadata.add-missing-remote-network-timestamp-error/.stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +fatal error: Setting EventMetadata::add_missing_remote_network_timestamp is only valid together with EventMetadata::add_network_timestamp diff --git a/testing/btest/Baseline/core.event-metadata.current-event-time/output b/testing/btest/Baseline/core.event-metadata.current-event-time/output new file mode 100644 index 0000000000..f4112240a4 --- /dev/null +++ b/testing/btest/Baseline/core.event-metadata.current-event-time/output @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +new_connection add_network_timestamp=T current_event_time=1362692526.869344 network_timestamp=[1362692526.869344] +new_connection add_network_timestamp=F current_event_time=-1.0 network_timestamp=[] diff --git a/testing/btest/Baseline/plugins.publish-event-metadata/manager..stderr b/testing/btest/Baseline/plugins.publish-event-metadata/manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/plugins.publish-event-metadata/manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/plugins.publish-event-metadata/manager..stdout b/testing/btest/Baseline/plugins.publish-event-metadata/manager..stdout new file mode 100644 index 0000000000..902f2cf0d0 --- /dev/null +++ b/testing/btest/Baseline/plugins.publish-event-metadata/manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +0.000000 InitPostScript +App::test_event(1) |mdv|=1 +[id=App::CUSTOM_METADATA_STRING, val=testing string metadata] +custom metadata string, [testing string metadata] +custom metadata count, [] +custom metadata table, [] +App::test_event(2) |mdv|=1 +[id=App::CUSTOM_METADATA_COUNT, val=42424242] +custom metadata string, [] +custom metadata count, [42424242] +custom metadata table, [] +App::test_event(3) |mdv|=1 +[id=App::CUSTOM_METADATA_TABLE, val={ +[key1] = val1 +}] +custom metadata string, [] +custom metadata count, [] +custom metadata table, [{ +[key1] = val1 +}] +App::test_event(4) |mdv|=4 +[id=App::CUSTOM_METADATA_TABLE, val={ +[key1] = val1 +}] +[id=App::CUSTOM_METADATA_COUNT, val=41414242] +[id=App::CUSTOM_METADATA_STRING, val=testing string metadata] +[id=App::CUSTOM_METADATA_STRING, val=more string metadata] +custom metadata string, [testing string metadata, more string metadata] +custom metadata count, [41414242] +custom metadata table, [{ +[key1] = val1 +}] diff --git a/testing/btest/Baseline/plugins.publish-event-metadata/output b/testing/btest/Baseline/plugins.publish-event-metadata/output new file mode 100644 index 0000000000..b88529121f --- /dev/null +++ b/testing/btest/Baseline/plugins.publish-event-metadata/output @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +0.000000 InitPostScript +0.000000 HookPublishEvent backend=Broker topic=/test/topic event=App::test_event +0.000000 HookPublishEvent /test/topic(App::test_event) diff --git a/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stderr b/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stdout b/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stdout new file mode 100644 index 0000000000..ce8310d655 --- /dev/null +++ b/testing/btest/Baseline/plugins.publish-event-metadata/worker-1..stdout @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +0.000000 InitPostScript +0.000000 HookPublishEvent backend=Broker topic=topic1 event=App::test_event +0.000000 HookPublishEvent backend=Broker topic=topic2 event=App::test_event +0.000000 HookPublishEvent backend=Broker topic=topic3 event=App::test_event +0.000000 HookPublishEvent backend=Broker topic=topic4 event=App::test_event diff --git a/testing/btest/broker/remote_event_ts_compat.zeek b/testing/btest/broker/remote_event_ts_compat.zeek index 991e425759..36a00a74c9 100644 --- a/testing/btest/broker/remote_event_ts_compat.zeek +++ b/testing/btest/broker/remote_event_ts_compat.zeek @@ -19,6 +19,11 @@ redef exit_only_after_terminate = T; redef allow_network_time_forward = F; redef EventMetadata::add_network_timestamp = T; +# This is needed so that the receiving node sets its +# own local network timestamp on remote events that do +# not have network timestamp metadata. +redef EventMetadata::add_missing_remote_network_timestamp = T; + event zeek_init() { Broker::subscribe(getenv("TOPIC")); diff --git a/testing/btest/core/event-metadata/add-missing-remote-network-timestamp-error.zeek b/testing/btest/core/event-metadata/add-missing-remote-network-timestamp-error.zeek new file mode 100644 index 0000000000..df3c563689 --- /dev/null +++ b/testing/btest/core/event-metadata/add-missing-remote-network-timestamp-error.zeek @@ -0,0 +1,7 @@ +# @TEST-DOC: Using add_missing_remote_network_timestamp without add_network_timestamp is an error. +# +# @TEST-EXEC-FAIL: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + +redef EventMetadata::add_network_timestamp = F; +redef EventMetadata::add_missing_remote_network_timestamp = T; diff --git a/testing/btest/core/event-metadata/current-event-time.zeek b/testing/btest/core/event-metadata/current-event-time.zeek new file mode 100644 index 0000000000..dd81321d8d --- /dev/null +++ b/testing/btest/core/event-metadata/current-event-time.zeek @@ -0,0 +1,14 @@ +# @TEST-DOC: Check current_event_time() produces the same as event metadata, or else -1.0 +# +# @TEST-EXEC: zeek -r $TRACES/http/get.trace %INPUT EventMetadata::add_network_timestamp=T >> output 2>&1 +# @TEST-EXEC: zeek -r $TRACES/http/get.trace %INPUT EventMetadata::add_network_timestamp=F >> output 2>&1 +# +# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff output + + +event new_connection(c: connection) + { + print fmt("new_connection add_network_timestamp=%s current_event_time=%s network_timestamp=%s", + EventMetadata::add_network_timestamp, current_event_time(), + EventMetadata::current(EventMetadata::NETWORK_TIMESTAMP)); + } diff --git a/testing/btest/plugins/publish-event-metadata-plugin/.btest-ignore b/testing/btest/plugins/publish-event-metadata-plugin/.btest-ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.cc b/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.cc new file mode 100644 index 0000000000..7dba9bbb82 --- /dev/null +++ b/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.cc @@ -0,0 +1,96 @@ + +#include "Plugin.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace btest::plugin::Demo_PublishEventMetadata { +Plugin plugin; +} + +using namespace btest::plugin::Demo_PublishEventMetadata; + +zeek::plugin::Configuration Plugin::Configure() { + EnableHook(zeek::plugin::HOOK_PUBLISH_EVENT); + + zeek::plugin::Configuration config; + config.name = "Demo::PublishEventMetadata"; + config.description = "For testing metadata publish"; + config.version.major = 1; + config.version.minor = 0; + config.version.patch = 0; + return config; +} + +void Plugin::InitPostScript() { + std::fprintf(stdout, "%.6f %-15s\n", zeek::run_state::network_time, " InitPostScript"); +} + +bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, + zeek::cluster::detail::Event& event) { + std::fprintf(stdout, "%.6f %s backend=%s topic=%s event=%s\n", zeek::run_state::network_time, "HookPublishEvent", + backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str()); + + const auto& table_type = zeek::id::find_type("table_string_of_string"); + const auto& string_md = zeek::id::find_val("App::CUSTOM_METADATA_STRING"); + auto count_md = zeek::id::find_val("App::CUSTOM_METADATA_COUNT"); + auto table_md = zeek::id::find_val("App::CUSTOM_METADATA_TABLE"); + + if ( ! count_md || ! table_md ) + zeek::reporter->FatalError("Could not find required enum values"); + + if ( topic == "topic1" ) { + if ( ! event.AddMetadata(string_md, zeek::make_intrusive("testing string metadata")) ) { + zeek::reporter->FatalError("Failed to add string metadata"); + } + } + else if ( topic == "topic2" ) { + if ( ! event.AddMetadata(count_md, zeek::val_mgr->Count(42424242)) ) { + zeek::reporter->FatalError("Failed to add count metadata"); + } + } + else if ( topic == "topic3" ) { + auto tv = zeek::make_intrusive(table_type); + if ( ! tv->Assign(zeek::make_intrusive("key1"), + zeek::make_intrusive("val1")) ) + zeek::reporter->FatalError("Could not update table value"); + + if ( ! event.AddMetadata(table_md, tv) ) { + zeek::reporter->FatalError("Failed to add table metadata"); + } + } + else if ( topic == "topic4" ) { + auto tv = zeek::make_intrusive(table_type); + if ( ! tv->Assign(zeek::make_intrusive("key1"), + zeek::make_intrusive("val1")) ) + zeek::reporter->FatalError("Could not update table value"); + + if ( ! event.AddMetadata(table_md, tv) ) { + zeek::reporter->FatalError("Failed to add table metadata"); + } + + if ( ! event.AddMetadata(count_md, zeek::val_mgr->Count(41414242)) ) { + zeek::reporter->FatalError("Failed to add string metadata"); + } + + if ( ! event.AddMetadata(string_md, zeek::make_intrusive("testing string metadata")) ) { + zeek::reporter->FatalError("Failed to add string metadata"); + } + + // Event metadata is just a vector, so can have duplicate entries. + if ( ! event.AddMetadata(string_md, zeek::make_intrusive("more string metadata")) ) { + zeek::reporter->FatalError("Failed to add string metadata"); + } + } + else { + zeek::reporter->FatalError("Unhandled topic %s", topic.c_str()); + } + + + return true; +} diff --git a/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.h b/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.h new file mode 100644 index 0000000000..724e201e88 --- /dev/null +++ b/testing/btest/plugins/publish-event-metadata-plugin/src/Plugin.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +namespace btest::plugin::Demo_PublishEventMetadata { + +class Plugin : public zeek::plugin::Plugin { +protected: + zeek::plugin::Configuration Configure() override; + void InitPostScript() override; + + bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, + zeek::cluster::detail::Event& event) override; +}; + +extern Plugin plugin; + +} // namespace btest::plugin::Demo_PublishEventMetadata diff --git a/testing/btest/plugins/publish-event-metadata.zeek b/testing/btest/plugins/publish-event-metadata.zeek new file mode 100644 index 0000000000..6e714b2a8e --- /dev/null +++ b/testing/btest/plugins/publish-event-metadata.zeek @@ -0,0 +1,77 @@ +# @TEST-DOC: Smoke test sending metadata from a worker to a manager. The manager uses script level functions. +# +# @TEST-EXEC: ${DIST}/auxil/zeek-aux/plugin-support/init-plugin -u . Demo PublishEventMetadata +# @TEST-EXEC: cp -r %DIR/publish-event-metadata-plugin/* . +# @TEST-EXEC: ./configure --zeek-dist=${DIST} && make +# +# @TEST-PORT: BROKER_MANAGER_PORT +# @TEST-PORT: BROKER_WORKER1_PORT +# +# @TEST-EXEC: cp $FILES/broker/cluster-layout.zeek . +# +# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. ZEEK_PLUGIN_PATH=`pwd` CLUSTER_NODE=manager zeek -b Demo::PublishEventMetadata %INPUT +# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. ZEEK_PLUGIN_PATH=`pwd` CLUSTER_NODE=worker-1 zeek -b Demo::PublishEventMetadata %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff manager/.stdout +# @TEST-EXEC: btest-diff manager/.stderr +# @TEST-EXEC: btest-diff worker-1/.stdout +# @TEST-EXEC: TEST_DIFF_CANONIFIER='grep -v PEER_UNAVAILABLE' btest-diff worker-1/.stderr + +redef allow_network_time_forward = F; + +@load frameworks/cluster/experimental + +module App; + +export { + global test_event: event(c: count); + + redef enum EventMetadata::ID += { + CUSTOM_METADATA_STRING = 4711, + CUSTOM_METADATA_COUNT = 4712, + CUSTOM_METADATA_TABLE = 4713, + }; +} + +event App::test_event(c: count) + { + local mdv = EventMetadata::current_all(); + print fmt("App::test_event(%s) |mdv|=%s", c, |mdv|); + for ( _, md in mdv ) + print md; + + print "custom metadata string", EventMetadata::current(App::CUSTOM_METADATA_STRING); + print "custom metadata count", EventMetadata::current(App::CUSTOM_METADATA_COUNT); + print "custom metadata table", EventMetadata::current(App::CUSTOM_METADATA_TABLE); + + if ( c == 4 ) + terminate(); + } + +event zeek_init() &priority=20 + { + assert EventMetadata::register(CUSTOM_METADATA_STRING, string); + assert EventMetadata::register(CUSTOM_METADATA_COUNT, count); + assert EventMetadata::register(CUSTOM_METADATA_TABLE, table[string] of string); + + Cluster::subscribe("topic1"); + Cluster::subscribe("topic2"); + Cluster::subscribe("topic3"); + Cluster::subscribe("topic4"); + } + +event Cluster::Experimental::cluster_started() + { + if ( Cluster::node == "worker-1" ) + { + Cluster::publish("topic1", test_event, 1); + Cluster::publish("topic2", test_event, 2); + Cluster::publish("topic3", test_event, 3); + Cluster::publish("topic4", test_event, 4); + } + } + +event Cluster::node_down(name: string, id: string) + { + terminate(); + }