Merge remote-tracking branch 'origin/topic/awelzel/4177-4178-custom-event-metadata-part-2'

* origin/topic/awelzel/4177-4178-custom-event-metadata-part-2:
  Event: Bail on add_missing_remote_network_timestamp without add_network_timestamp
  btest/plugin: Test custom metadata publish
  NEWS: Add note about generic event metadata
  cluster: Remove deprecated Event constructor
  cluster: Remove some explicit timestamp handling
  broker/Manager: Fetch and forward all metadata from events
  Event/init-bare: Add add_missing_remote_network_timestamp logic
  cluster/Backend/DoProcessEvent: Use generic metadata, not just timestamps
  cluster/Event: Support moving args and metadata from event
  cluster/serializer/broker: Support generic metadata
  cluster/Event: Generic metadata support
  Event: Use -1.0 for undefined/unset timestamps
  cluster: Use shorter obj_desc versions
  Desc: Add obj_desc() / obj_desc_short() overloads for IntrusivePtr
This commit is contained in:
Arne Welzel 2025-06-02 17:32:39 +02:00
commit 0a34b39e7a
28 changed files with 682 additions and 99 deletions

63
CHANGES
View file

@ -1,3 +1,66 @@
8.0.0-dev.324 | 2025-06-02 17:32:39 +0200
* Event: Bail on add_missing_remote_network_timestamp without add_network_timestamp (Arne Welzel, Corelight)
* btest/plugin: Test custom metadata publish (Arne Welzel, Corelight)
Usage demo for plugin writers to add custom event metadata and access in
in Zeek scripts.
* NEWS: Add note about generic event metadata (Arne Welzel, Corelight)
* cluster: Remove deprecated Event constructor (Arne Welzel, Corelight)
It is now unused, ditch it. This wasn't available in an LTS release yet
and anyhow is in the detail namespace.
* cluster: Remove some explicit timestamp handling (Arne Welzel, Corelight)
Backend::MakeClusterEvent() for now is the only place to add implicit
network timestamp metadata within the cluster component.
* broker/Manager: Fetch and forward all metadata from events (Arne Welzel, Corelight)
Also use the generic metadata version for publishing, keep the
ts-based API for now, but only add timestamps when
EventMetadata::add_network_timestamp is T. I'm not sure what the
right way forward here is, maybe deprecating Broker's publish event
variations and funneling through cluster.
* Event/init-bare: Add add_missing_remote_network_timestamp logic (Arne Welzel, Corelight)
Make defaulting to the local network timestamp for remote events opt-in.
* cluster/Backend/DoProcessEvent: Use generic metadata, not just timestamps (Arne Welzel, Corelight)
* cluster/Event: Support moving args and metadata from event (Arne Welzel, Corelight)
* cluster/serializer/broker: Support generic metadata (Arne Welzel, Corelight)
Instead of handling just the network timestamp, support extraction of
the whole metadata vector that broker events hold.
* cluster/Event: Generic metadata support (Arne Welzel, Corelight)
Instead of a timestamp attribute, switch to holding a EventMetadataVectorPtr
like zeek::Event instances do. Keep the old constructor until the end of
the patch series.
* Event: Use -1.0 for undefined/unset timestamps (Arne Welzel, Corelight)
This can happen if either there's no network timestamp associated with
an event, or there's currently no event being dispatched. Using 0.0
isn't great as it's the normal start timestamp before reading a network
packet. Using -1.0 gives the caller a chance to check and realize what's
going on.
* cluster: Use shorter obj_desc versions (Arne Welzel, Corelight)
* Desc: Add obj_desc() / obj_desc_short() overloads for IntrusivePtr (Arne Welzel, Corelight)
When using these helpers in code, one barely has raw pointers and the
repeated `.get()` call cumbersome and noisy.
8.0.0-dev.309 | 2025-06-02 10:17:14 +0200
* Prefer `std::move` over copy (Benjamin Bannier, Corelight)

17
NEWS
View file

@ -35,9 +35,26 @@ Breaking Changes
redef LogAscii::json_timestamps = JSON::TS_MILLIS_UNSIGNED;
- The ``current_event_time()`` builtin function as well as ``Event::Time()``
and ``EventMgr::CurrentEventTime()`` now return ``-1.0`` if no timestamp
metadata is available for the current event, or if no event is being
dispatched. Previously this would've been 0.0, or the timestamp of the previously
dispatched event.
- Missing network timestamp metadata on remote events is not set to the local
network time anymore by default. This potentially hid useful debugging information
about another node not sending timestamp metadata. The old behavior can be
re-enabled as follows:
redef EventMetadata::add_missing_remote_network_timestamp = T;
New Functionality
-----------------
- Generic event metadata support. A new ``EventMetadata`` module was added allowing
to register generic event metadata types and accessing the current event's metadata
using the functions ``current()`` and ``current_all()`` of this module.
- A new plugin hook, ``HookPublishEvent()``, has been added for intercepting
publishing of Zeek events. This hook may be used for monitoring purposes,
modifying or rerouting remote events.

View file

@ -1 +1 @@
8.0.0-dev.309
8.0.0-dev.324

View file

@ -601,6 +601,16 @@ export {
## might be a value before the network_time() when the event
## was actually dispatched.
const add_network_timestamp: bool = F &redef;
## By default, remote events without network timestamp metadata
## will yield a negative zeek:see:`current_event_time` during
## processing. To have the receiving Zeek node set the event's
## network timestamp metadata with its current local network time,
## set this option to true.
##
## This setting is only in effect if :zeek:see:`EventMetadata::add_network_timestamp`
## is also set to true.
const add_missing_remote_network_timestamp: bool = F &redef;
}
module FTP;

View file

@ -7,6 +7,7 @@
#include <string>
#include <utility>
#include "zeek/IntrusivePtr.h"
#include "zeek/ZeekString.h" // for byte_vec
#include "zeek/util.h" // for zeek_int_t
@ -211,13 +212,13 @@ protected:
};
// Returns a string representation of an object's description. Used for
// debugging and error messages. takes a bare pointer rather than an
// IntrusivePtr because the latter is harder to deal with when making
// calls from a debugger like lldb, which is the main use of this function.
// debugging and error messages.
class Obj;
std::string obj_desc(const Obj* o);
inline std::string obj_desc(const IntrusivePtr<Obj>& o) { return obj_desc(o.get()); }
// Same as obj_desc(), but ensure it is short and don't include location info.
std::string obj_desc_short(const Obj* o);
inline std::string obj_desc_short(const IntrusivePtr<Obj>& o) { return obj_desc_short(o.get()); }
} // namespace zeek

View file

@ -11,6 +11,7 @@
#include "zeek/Val.h"
#include "zeek/iosource/Manager.h"
#include "zeek/plugin/Manager.h"
#include "zeek/util.h"
#include "const.bif.netvar_h"
#include "event.bif.netvar_h"
@ -96,7 +97,7 @@ zeek::VectorValPtr Event::MetadataValues(const EnumValPtr& id) const {
double Event::Time() const {
if ( ! meta )
return 0.0;
return detail::NO_TIMESTAMP;
for ( const auto& m : *meta )
if ( m.Id() == static_cast<zeek_uint_t>(detail::MetadataType::NetworkTimestamp) ) {
@ -109,7 +110,7 @@ double Event::Time() const {
return m.Val()->AsTime();
}
return 0.0;
return detail::NO_TIMESTAMP;
}
void Event::Describe(ODesc* d) const {
@ -165,12 +166,19 @@ void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID
detail::EventMetadataVectorPtr meta;
double ts = double(deprecated_ts);
if ( src == util::detail::SOURCE_LOCAL && BifConst::EventMetadata::add_network_timestamp ) {
// If this is a local event and EventMetadata::add_network_timestamp is
// enabled automatically set the network timestamp for this event to the
// current network time when it is < 0 (default is -1.0).
// enabled, automatically set the network timestamp for this event to the
// current network time when it is < 0 (default of deprecated_ts is -1.0).
//
// See the other Enqueue() implementation for the local motivation.
// See the other Enqueue() implementation for the local vs broker/remote
// motivation of want_network_timestamp.
bool want_network_timestamp =
BifConst::EventMetadata::add_network_timestamp &&
((src == util::detail::SOURCE_LOCAL) ||
(src == util::detail::SOURCE_BROKER && BifConst::EventMetadata::add_missing_remote_network_timestamp));
if ( want_network_timestamp ) {
if ( ts < 0.0 )
ts = run_state::network_time;
@ -189,15 +197,23 @@ void EventMgr::Enqueue(const EventHandlerPtr& h, Args vl, util::detail::SourceID
void EventMgr::Enqueue(detail::EventMetadataVectorPtr meta, const EventHandlerPtr& h, Args vl,
util::detail::SourceID src, analyzer::ID aid, Obj* obj) {
if ( src == util::detail::SOURCE_LOCAL && BifConst::EventMetadata::add_network_timestamp ) {
// If all events are supposed to have a network time attached, ensure
// that the meta vector was passed *and* contains a network timestamp.
// Attach network timestamps to all events if EventMetadata::add_network_timestamp is T and
//
// This is only done for local events, however. For remote events (src == BROKER)
// that do not hold network timestamp metadata, it seems less surprising to keep
// it unset. If it is required that a remote node sends *their* network timestamp,
// defaulting to this node's network time seems more confusing and error prone
// than just leaving it unset and having the consumer deal with the situation.
// 1) this event is locally generated
// or
// 2) this is a remote event and EventMetadata::add_missing_remote_network_timestamp is T
//
// Why so complicated? It seems less surprising behavior to keep network timestamp metadata unset
// if a remote event didn't have any attached. It should help to more easily figure out what's
// actually going on compared to setting it to the local network time. If all nodes are required to
// send *their* network timestamp, filling it with this node's network time seems more confusing
// and error prone compared to just leaving it unset and having the consumer deal with the situation.
bool want_network_timestamp =
BifConst::EventMetadata::add_network_timestamp &&
((src == util::detail::SOURCE_LOCAL) ||
(src == util::detail::SOURCE_BROKER && BifConst::EventMetadata::add_missing_remote_network_timestamp));
if ( want_network_timestamp ) {
bool has_time = false;
if ( ! meta ) {
@ -348,6 +364,14 @@ void EventMgr::InitPostScript() {
if ( ! zeek::event_registry->RegisterMetadata(net_ts_val, zeek::base_type(zeek::TYPE_TIME)) )
zeek::reporter->FatalError("Failed to register NETWORK_TIMESTAMP metadata");
// Remove this if there's ever a use-case to not use them together.
if ( BifConst::EventMetadata::add_missing_remote_network_timestamp &&
! BifConst::EventMetadata::add_network_timestamp )
zeek::reporter->FatalError(
"Setting EventMetadata::add_missing_remote_network_timestamp is only valid together with "
"EventMetadata::add_network_timestamp");
iosource_mgr->Register(this, true, false);
}
} // namespace zeek

View file

@ -49,6 +49,8 @@ using EventMetadataVectorPtr = std::unique_ptr<EventMetadataVector>;
*/
EventMetadataVectorPtr MakeEventMetadataVector(double t);
constexpr double NO_TIMESTAMP = -1.0;
} // namespace detail
class Event final : public Obj {
@ -170,8 +172,8 @@ public:
// the event was intended to be executed. For scheduled events, this is the time the event
// was scheduled to. For any other event, this is the time when the event was created.
//
// If no event is being processed, returns 0.0.
double CurrentEventTime() const { return current ? current->Time() : 0.0; }
// If no event is being processed or there is no timestamp information, returns -1.0
double CurrentEventTime() const { return current ? current->Time() : detail::NO_TIMESTAMP; }
int Size() const { return num_events_queued - num_events_dispatched; }

View file

@ -8,6 +8,7 @@
#include <broker/event.hh>
#include <broker/event_observer.hh>
#include <broker/logger.hh>
#include <broker/time.hh>
#include <broker/variant.hh>
#include <broker/zeek.hh>
#include <unistd.h>
@ -42,6 +43,8 @@
#include "zeek/telemetry/Manager.h"
#include "zeek/util.h"
#include "const.bif.netvar_h"
using namespace std;
namespace {
@ -850,8 +853,15 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args,
if ( peer_count == 0 && hub_count == 0 )
return true;
broker::zeek::Event ev(name, args, broker::to_timestamp(ts));
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, name, ev.args()).c_str());
broker::vector meta;
if ( BifConst::EventMetadata::add_network_timestamp ) {
broker::vector entry{static_cast<broker::count>(zeek::detail::MetadataType::NetworkTimestamp),
broker::to_timestamp(ts)};
meta.emplace_back(std::move(entry));
}
broker::zeek::Event ev(name, args, meta);
DBG_LOG(DBG_BROKER, "Publishing event: %s", RenderEvent(topic, std::string(ev.name()), ev.args()).c_str());
bstate->endpoint.publish(std::move(topic), ev.move_data());
num_events_outgoing_metric->Inc();
return true;
@ -1570,15 +1580,10 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
auto&& name = ev.name();
auto&& args = ev.args();
double ts;
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
if ( auto ev_ts = ev.ts() )
broker::convert(*ev_ts, ts);
else
// Default to current network time, if the received event did not contain a timestamp.
ts = run_state::network_time;
DBG_LOG(DBG_BROKER, "Process event: %s (%.6f) %s", std::string{name}.c_str(), ts, RenderMessage(args).c_str());
DBG_LOG(DBG_BROKER, "Process event: %s (with %zu metadata entries) %s", std::string{name}.c_str(),
meta ? meta->size() : 0, RenderMessage(args).c_str());
num_events_incoming_metric->Inc();
auto handler = event_registry->Lookup(name);
@ -1654,13 +1659,8 @@ void Manager::ProcessMessage(std::string_view topic, broker::zeek::Event& ev) {
}
}
if ( vl.size() == args.size() ) {
zeek::detail::EventMetadataVectorPtr meta;
if ( ts >= 0.0 )
meta = zeek::detail::MakeEventMetadataVector(ts);
if ( vl.size() == args.size() )
event_mgr.Enqueue(std::move(meta), handler, std::move(vl), util::detail::SOURCE_BROKER);
}
}
bool Manager::ProcessMessage(std::string_view, broker::zeek::LogCreate& lc) {

View file

@ -2,14 +2,17 @@
#include "zeek/cluster/Backend.h"
#include <memory>
#include <optional>
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h"
#include "zeek/Func.h"
#include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/Val.h"
#include "zeek/cluster/Manager.h"
#include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h"
@ -19,15 +22,48 @@
#include "zeek/plugin/Plugin.h"
#include "zeek/util.h"
#include "zeek/3rdparty/doctest.h"
using namespace zeek::cluster;
double detail::Event::Timestamp() const {
if ( meta ) {
for ( const auto& m : *meta ) {
if ( m.Id() == static_cast<zeek_uint_t>(zeek::detail::MetadataType::NetworkTimestamp) )
return m.Val()->AsTime();
}
}
return zeek::detail::NO_TIMESTAMP;
}
bool detail::Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
if ( ! id || ! val )
return false;
const auto* desc = zeek::event_registry->LookupMetadata(id->Get());
if ( ! desc )
return false;
if ( ! same_type(val->GetType(), desc->Type()) )
return false;
if ( ! meta )
meta = std::make_unique<zeek::detail::EventMetadataVector>();
// Internally stored as zeek_uint_t for serializers.
meta->emplace_back(desc->Id(), std::move(val));
return true;
}
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> detail::Event::Take() && {
return {handler, std::move(args), std::move(meta)};
}
bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, detail::Event e) {
zeek::detail::EventMetadataVectorPtr meta;
if ( auto ts = e.Timestamp(); ts >= 0.0 )
meta = zeek::detail::MakeEventMetadataVector(e.Timestamp());
zeek::event_mgr.Enqueue(std::move(meta), e.Handler(), std::move(e.Args()), util::detail::SOURCE_BROKER);
auto [handler, args, meta] = std::move(e).Take();
zeek::event_mgr.Enqueue(std::move(meta), handler, std::move(args), util::detail::SOURCE_BROKER);
return true;
}
@ -73,8 +109,7 @@ std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, ze
if ( ! same_type(got_type, expected_type) ) {
zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1,
zeek::obj_desc_short(got_type.get()).c_str(),
zeek::obj_desc_short(expected_type.get()).c_str());
zeek::obj_desc_short(got_type).c_str(), zeek::obj_desc_short(expected_type).c_str());
return std::nullopt;
}
@ -100,21 +135,38 @@ bool Backend::Init(std::string nid) {
return DoInit();
}
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const {
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const {
auto checked_args = detail::check_args(handler, args);
if ( ! checked_args )
return std::nullopt;
if ( timestamp == 0.0 )
timestamp = zeek::event_mgr.CurrentEventTime();
const auto& eh = zeek::event_registry->Lookup(handler->AsFuncPtr()->GetName());
if ( ! eh ) {
zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc(handler.get()).c_str());
zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc_short(handler).c_str());
return std::nullopt;
}
return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp};
/**
* If you ever stare at this and wonder: Currently, if someone calls
* Cluster::publish() from within a remote event in script land, then
* CurrentEventTime() below will yield the network timestamp of the
* remote event. That means that the outgoing event from this node will
* have the network timestamp of the originating node, which may be
* different from what the local network time is.
*
* This could be confusing and another policy might be to always set
* the network timestamp metadata for for outgoing events to the local
* network time instead, even when currently handling a remote event.
*
* @J-Gras prefers the current behavior. @awelzel wonders if there should
* be an opt-in/opt-out for this behavior. Procrastinating it for now.
*/
zeek::detail::EventMetadataVectorPtr meta;
if ( zeek::BifConst::EventMetadata::add_network_timestamp )
meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime());
return zeek::cluster::detail::Event{eh, std::move(*checked_args), std::move(meta)};
}
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
@ -244,3 +296,48 @@ void ThreadedBackend::Process(QueueMessage&& msg) {
zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index());
}
}
TEST_SUITE_BEGIN("cluster event");
TEST_CASE("add metadata") {
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
zeek::Args args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)};
zeek::cluster::detail::Event event{handler, args, nullptr};
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
REQUIRE(nts);
auto unk = zeek::id::find_val<zeek::EnumVal>("Log::UNKNOWN");
REQUIRE(unk);
bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME));
REQUIRE(registered);
SUBCASE("valid") {
CHECK_EQ(event.Timestamp(), -1.0);
CHECK(event.AddMetadata(nts, zeek::make_intrusive<zeek::TimeVal>(42.0)));
CHECK_EQ(event.Timestamp(), 42.0);
}
SUBCASE("valid-two-times") {
CHECK_EQ(event.Timestamp(), -1.0);
CHECK(event.AddMetadata(nts, zeek::make_intrusive<zeek::TimeVal>(42.0)));
CHECK(event.AddMetadata(nts, zeek::make_intrusive<zeek::TimeVal>(43.0)));
CHECK_EQ(event.Timestamp(), 42.0); // finds the first one
CHECK_EQ(event.Metadata()->size(), 2); // both are stored
}
SUBCASE("invalid-value-type") {
CHECK_EQ(event.Timestamp(), -1.0);
CHECK_FALSE(event.AddMetadata(nts, zeek::make_intrusive<zeek::DoubleVal>(42.0)));
CHECK_EQ(event.Timestamp(), -1.0);
CHECK_EQ(event.Metadata(), nullptr);
}
SUBCASE("unregistered-metadata-identifier") {
CHECK_EQ(event.Timestamp(), -1.0);
CHECK_FALSE(event.AddMetadata(unk, zeek::make_intrusive<zeek::DoubleVal>(42.0)));
CHECK_EQ(event.Timestamp(), -1.0);
CHECK_EQ(event.Metadata(), nullptr);
}
}
TEST_SUITE_END();

View file

@ -9,6 +9,7 @@
#include <string_view>
#include <variant>
#include "zeek/Event.h"
#include "zeek/EventHandler.h"
#include "zeek/Span.h"
#include "zeek/Tag.h"
@ -37,8 +38,8 @@ public:
/**
* Constructor.
*/
Event(const EventHandlerPtr& handler, zeek::Args args, double timestamp = 0.0)
: handler(handler), args(std::move(args)), timestamp(timestamp) {}
Event(const EventHandlerPtr& handler, zeek::Args args, zeek::detail::EventMetadataVectorPtr meta)
: handler(handler), args(std::move(args)), meta(std::move(meta)) {}
/**
* @return The name of the event.
@ -60,16 +61,40 @@ public:
zeek::Args& Args() { return args; }
/**
* @return The network timestamp metadata of this event or 0.0.
* @return The network timestamp metadata of this event or -1.0 if not set.
*/
double Timestamp() const { return timestamp; }
double Timestamp() const;
/**
* Add metadata to this cluster event.
*
* The used metadata \a id has to be registered via the Zeek script-layer
* function EventMetadata::register(), or via the C++ API
* EventMgr::RegisterMetadata() during an InitPostScript() hook.
*
* Non-registered metadata will not be added and false is returned.
*
* @param id The enum value identifying the event metadata.
* @param val The value to use.
* @return true if \a val was was added, else false.
*/
bool AddMetadata(const EnumValPtr& id, ValPtr val);
/**
* @return A pointer to the metadata vector, or nullptr if no Metadata has been added yet.
*/
const zeek::detail::EventMetadataVector* Metadata() const { return meta.get(); }
/**
* Move data out of this event as preparation for Enqueue()
*/
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> Take() &&;
private:
EventHandlerPtr handler;
zeek::Args args;
double timestamp; // TODO: This should be more generic, possibly holding a
// vector of key/value metadata, rather than just
// the timestamp.
zeek::detail::EventMetadataVectorPtr meta;
};
/**
@ -201,9 +226,8 @@ public:
*
* @param handler A function val representing an event handler.
* @param args The arguments for the event handler.
* @param timestamp The network time to add to the event as metadata.
*/
std::optional<detail::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp = 0.0) const;
std::optional<detail::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const;
/**
* Publish a cluster::detail::Event instance to a given topic.

View file

@ -17,29 +17,20 @@
namespace {
// Convert a script-level Cluster::Event to a cluster::detail::Event.
std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::RecordValPtr& rec) {
std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::cluster::Backend* backend,
const zeek::RecordValPtr& rec) {
const auto& func = rec->GetField<zeek::FuncVal>(0);
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
if ( ! func )
return std::nullopt;
const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName());
if ( ! eh ) {
zeek::emit_builtin_error(
zeek::util::fmt("event registry lookup of '%s' failed", zeek::obj_desc_short(func.get()).c_str()));
return std::nullopt;
}
// Need to copy from VectorVal to zeek::Args
zeek::Args args(vargs->Size());
for ( size_t i = 0; i < vargs->Size(); i++ )
args[i] = vargs->ValAt(i);
// TODO: Support configurable timestamps or custom metadata on the record.
auto timestamp = zeek::event_mgr.CurrentEventTime();
return zeek::cluster::detail::Event(eh, std::move(args), timestamp);
return backend->MakeClusterEvent(func, zeek::Span{args});
}
} // namespace
@ -60,7 +51,7 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args) {
if ( maybe_func_val->GetType()->Tag() != zeek::TYPE_FUNC ) {
zeek::emit_builtin_error(
zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType().get()).c_str()));
zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType()).c_str()));
return rec;
}
@ -97,11 +88,8 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
auto topic_str = topic->AsStringVal()->ToStdString();
auto timestamp = zeek::event_mgr.CurrentEventTime();
if ( args[0]->GetType()->Tag() == zeek::TYPE_FUNC ) {
auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1),
timestamp);
auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1));
if ( event )
return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *event));
@ -109,7 +97,7 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
}
else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) {
if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type
auto ev = to_cluster_event(zeek::cast_intrusive<zeek::RecordVal>(args[0]));
auto ev = to_cluster_event(zeek::cluster::backend, zeek::cast_intrusive<zeek::RecordVal>(args[0]));
if ( ! ev )
return zeek::val_mgr->False();
@ -121,7 +109,7 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
if ( zeek::cluster::backend != zeek::broker_mgr ) {
zeek::emit_builtin_error(
zeek::util::fmt("Publish of Broker::Event record instance with type '%s' to a non-Broker backend",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
zeek::obj_desc_short(args[0]->GetType()).c_str()));
return zeek::val_mgr->False();
}
@ -130,13 +118,13 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
}
else {
zeek::emit_builtin_error(zeek::util::fmt("Publish of unknown record type '%s'",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
zeek::obj_desc_short(args[0]->GetType()).c_str()));
return zeek::val_mgr->False();
}
}
zeek::emit_builtin_error(zeek::util::fmt("expected function or record as first argument, got %s",
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
zeek::obj_desc_short(args[0]->GetType()).c_str()));
return zeek::val_mgr->False();
}

View file

@ -2,11 +2,16 @@
#include "zeek/cluster/serializer/broker/Serializer.h"
#include <cinttypes>
#include <optional>
#include "zeek/DebugLogger.h"
#include "zeek/Desc.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/Func.h"
#include "zeek/Reporter.h"
#include "zeek/Type.h"
#include "zeek/broker/Data.h"
#include "zeek/cluster/Backend.h"
@ -20,6 +25,36 @@
using namespace zeek::cluster;
zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(const broker::zeek::Event& ev) {
const auto& broker_meta = ev.metadata();
if ( broker_meta.size() == 0 )
return nullptr;
auto meta = std::make_unique<zeek::detail::EventMetadataVector>();
meta->reserve(broker_meta.size());
for ( const auto& [id, v] : broker_meta ) {
const auto* desc = zeek::event_registry->LookupMetadata(id);
if ( ! desc ) {
DBG_LOG(DBG_BROKER, "Ignoring event metadata %" PRId64 " value=%s", id,
broker::to_string(v.to_data()).c_str());
continue;
}
auto d = v.to_data();
auto val = zeek::Broker::detail::data_to_val(d, desc->Type().get());
if ( ! val ) {
zeek::reporter->Error("failure converting metadata '%s' to type %s", broker::to_string(v.to_data()).c_str(),
obj_desc(desc->Type()).c_str());
continue;
}
meta->emplace_back(id, val);
}
return meta;
}
std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event& ev) {
broker::vector xs;
xs.reserve(ev.Args().size());
@ -40,24 +75,33 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event&
}
}
return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.Timestamp()));
// Convert metadata from the cluster::detail::Event event to broker's event metadata format.
broker::vector broker_meta;
if ( const auto* meta = ev.Metadata(); meta != nullptr ) {
broker_meta.reserve(meta->size());
for ( const auto& m : *meta ) {
if ( auto res = zeek::Broker::detail::val_to_data(m.Val().get()); res.has_value() ) {
broker::vector entry(2);
entry[0] = static_cast<broker::count>(m.Id());
entry[1] = res.value();
broker_meta.push_back(std::move(entry));
}
else {
// Just for sanity - we should never get here.
zeek::reporter->Error("failure converting metadata '%s' to broker data",
obj_desc_short(m.Val()).c_str());
}
}
}
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
}
std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
auto&& name = ev.name();
auto&& args = ev.args();
// Meh, technically need to convert ev.metadata() and
// expose it to script land as `table[count] of any`
// where consumers then know what to do with it.
//
// For now, handle the timestamp explicitly.
double ts;
if ( auto ev_ts = ev.ts() )
broker::convert(*ev_ts, ts);
else
ts = zeek::run_state::network_time;
zeek::Args vl;
zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name);
if ( handler == nullptr ) {
@ -98,7 +142,8 @@ std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev
}
}
return detail::Event{handler, std::move(vl), ts};
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
return cluster::detail::Event{handler, std::move(vl), std::move(meta)};
}
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
@ -168,11 +213,21 @@ std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(b
TEST_SUITE_BEGIN("cluster serializer broker");
#include "zeek/EventRegistry.h"
TEST_CASE("roundtrip") {
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)}};
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
nullptr};
// Register network timestamp metadata. This is idempotent.
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
REQUIRE(nts);
bool registered = zeek::event_registry->RegisterMetadata(nts, zeek::base_type(zeek::TYPE_TIME));
REQUIRE(registered);
// Add network timestamp metadata to the event. In previous Zeek versions this happened magically under the hood.
bool added = e.AddMetadata(nts, zeek::make_intrusive<zeek::TimeVal>(0.0));
REQUIRE(added);
zeek::byte_buffer buf;
SUBCASE("json") {
@ -189,6 +244,8 @@ TEST_CASE("roundtrip") {
CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->Args().size(), 2);
REQUIRE(result->Metadata());
CHECK_EQ(result->Metadata()->size(), 1);
}
SUBCASE("binary") {
@ -212,6 +269,8 @@ TEST_CASE("roundtrip") {
CHECK_EQ(result->Handler(), handler);
CHECK_EQ(result->HandlerName(), "Supervisor::node_status");
CHECK_EQ(result->Args().size(), 2);
REQUIRE(result->Metadata());
CHECK_EQ(result->Metadata()->size(), 1);
}
}
TEST_SUITE_END();

View file

@ -2,13 +2,39 @@
#pragma once
#include <memory>
#include <vector>
#include "zeek/cluster/Serializer.h"
namespace broker::zeek {
class Event;
}
namespace zeek::cluster::detail {
namespace zeek {
namespace detail {
class MetadataEntry;
using EventMetadataVector = std::vector<MetadataEntry>;
using EventMetadataVectorPtr = std::unique_ptr<EventMetadataVector>;
} // namespace detail
namespace cluster::detail {
/**
* Produce a EventMetadataVectorPtr from a broker event.
*
* The implementation relies on @ref zeek::EventRegistry::LookupMetadata()
* to find expected metadata types. If there's no metadata at all attached
* to this event, returns a nullptr,
*
* @param ev The broker event.
* @return Pointer to a @ref zeek::detail::EventMetadataVector holding values for all known metadata.
*/
zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const broker::zeek::Event& ev);
/**
* Convert a broker::zeek::Event to cluster::detail::Event by looking
@ -50,4 +76,5 @@ public:
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
};
} // namespace zeek::cluster::detail
} // namespace cluster::detail
} // namespace zeek

View file

@ -36,3 +36,4 @@ const Log::write_buffer_size: count;
const Storage::expire_interval: interval;
const EventMetadata::add_network_timestamp: bool;
const EventMetadata::add_missing_remote_network_timestamp: bool;

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
fatal error: Setting EventMetadata::add_missing_remote_network_timestamp is only valid together with EventMetadata::add_network_timestamp

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
new_connection add_network_timestamp=T current_event_time=1362692526.869344 network_timestamp=[1362692526.869344]
new_connection add_network_timestamp=F current_event_time=-1.0 network_timestamp=[]

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,33 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
0.000000 InitPostScript
App::test_event(1) |mdv|=1
[id=App::CUSTOM_METADATA_STRING, val=testing string metadata]
custom metadata string, [testing string metadata]
custom metadata count, []
custom metadata table, []
App::test_event(2) |mdv|=1
[id=App::CUSTOM_METADATA_COUNT, val=42424242]
custom metadata string, []
custom metadata count, [42424242]
custom metadata table, []
App::test_event(3) |mdv|=1
[id=App::CUSTOM_METADATA_TABLE, val={
[key1] = val1
}]
custom metadata string, []
custom metadata count, []
custom metadata table, [{
[key1] = val1
}]
App::test_event(4) |mdv|=4
[id=App::CUSTOM_METADATA_TABLE, val={
[key1] = val1
}]
[id=App::CUSTOM_METADATA_COUNT, val=41414242]
[id=App::CUSTOM_METADATA_STRING, val=testing string metadata]
[id=App::CUSTOM_METADATA_STRING, val=more string metadata]
custom metadata string, [testing string metadata, more string metadata]
custom metadata count, [41414242]
custom metadata table, [{
[key1] = val1
}]

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
0.000000 InitPostScript
0.000000 HookPublishEvent backend=Broker topic=/test/topic event=App::test_event
0.000000 HookPublishEvent /test/topic(App::test_event)

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
0.000000 InitPostScript
0.000000 HookPublishEvent backend=Broker topic=topic1 event=App::test_event
0.000000 HookPublishEvent backend=Broker topic=topic2 event=App::test_event
0.000000 HookPublishEvent backend=Broker topic=topic3 event=App::test_event
0.000000 HookPublishEvent backend=Broker topic=topic4 event=App::test_event

View file

@ -19,6 +19,11 @@ redef exit_only_after_terminate = T;
redef allow_network_time_forward = F;
redef EventMetadata::add_network_timestamp = T;
# This is needed so that the receiving node sets its
# own local network timestamp on remote events that do
# not have network timestamp metadata.
redef EventMetadata::add_missing_remote_network_timestamp = T;
event zeek_init()
{
Broker::subscribe(getenv("TOPIC"));

View file

@ -0,0 +1,7 @@
# @TEST-DOC: Using add_missing_remote_network_timestamp without add_network_timestamp is an error.
#
# @TEST-EXEC-FAIL: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
redef EventMetadata::add_network_timestamp = F;
redef EventMetadata::add_missing_remote_network_timestamp = T;

View file

@ -0,0 +1,14 @@
# @TEST-DOC: Check current_event_time() produces the same as event metadata, or else -1.0
#
# @TEST-EXEC: zeek -r $TRACES/http/get.trace %INPUT EventMetadata::add_network_timestamp=T >> output 2>&1
# @TEST-EXEC: zeek -r $TRACES/http/get.trace %INPUT EventMetadata::add_network_timestamp=F >> output 2>&1
#
# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff output
event new_connection(c: connection)
{
print fmt("new_connection add_network_timestamp=%s current_event_time=%s network_timestamp=%s",
EventMetadata::add_network_timestamp, current_event_time(),
EventMetadata::current(EventMetadata::NETWORK_TIMESTAMP));
}

View file

@ -0,0 +1,96 @@
#include "Plugin.h"
#include <zeek/Desc.h>
#include <zeek/ID.h>
#include <zeek/Reporter.h>
#include <zeek/Val.h>
#include <zeek/cluster/Backend.h>
#include <cstdio>
#include <string>
namespace btest::plugin::Demo_PublishEventMetadata {
Plugin plugin;
}
using namespace btest::plugin::Demo_PublishEventMetadata;
zeek::plugin::Configuration Plugin::Configure() {
EnableHook(zeek::plugin::HOOK_PUBLISH_EVENT);
zeek::plugin::Configuration config;
config.name = "Demo::PublishEventMetadata";
config.description = "For testing metadata publish";
config.version.major = 1;
config.version.minor = 0;
config.version.patch = 0;
return config;
}
void Plugin::InitPostScript() {
std::fprintf(stdout, "%.6f %-15s\n", zeek::run_state::network_time, " InitPostScript");
}
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
zeek::cluster::detail::Event& event) {
std::fprintf(stdout, "%.6f %s backend=%s topic=%s event=%s\n", zeek::run_state::network_time, "HookPublishEvent",
backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
const auto& table_type = zeek::id::find_type<zeek::TableType>("table_string_of_string");
const auto& string_md = zeek::id::find_val<zeek::EnumVal>("App::CUSTOM_METADATA_STRING");
auto count_md = zeek::id::find_val<zeek::EnumVal>("App::CUSTOM_METADATA_COUNT");
auto table_md = zeek::id::find_val<zeek::EnumVal>("App::CUSTOM_METADATA_TABLE");
if ( ! count_md || ! table_md )
zeek::reporter->FatalError("Could not find required enum values");
if ( topic == "topic1" ) {
if ( ! event.AddMetadata(string_md, zeek::make_intrusive<zeek::StringVal>("testing string metadata")) ) {
zeek::reporter->FatalError("Failed to add string metadata");
}
}
else if ( topic == "topic2" ) {
if ( ! event.AddMetadata(count_md, zeek::val_mgr->Count(42424242)) ) {
zeek::reporter->FatalError("Failed to add count metadata");
}
}
else if ( topic == "topic3" ) {
auto tv = zeek::make_intrusive<zeek::TableVal>(table_type);
if ( ! tv->Assign(zeek::make_intrusive<zeek::StringVal>("key1"),
zeek::make_intrusive<zeek::StringVal>("val1")) )
zeek::reporter->FatalError("Could not update table value");
if ( ! event.AddMetadata(table_md, tv) ) {
zeek::reporter->FatalError("Failed to add table metadata");
}
}
else if ( topic == "topic4" ) {
auto tv = zeek::make_intrusive<zeek::TableVal>(table_type);
if ( ! tv->Assign(zeek::make_intrusive<zeek::StringVal>("key1"),
zeek::make_intrusive<zeek::StringVal>("val1")) )
zeek::reporter->FatalError("Could not update table value");
if ( ! event.AddMetadata(table_md, tv) ) {
zeek::reporter->FatalError("Failed to add table metadata");
}
if ( ! event.AddMetadata(count_md, zeek::val_mgr->Count(41414242)) ) {
zeek::reporter->FatalError("Failed to add string metadata");
}
if ( ! event.AddMetadata(string_md, zeek::make_intrusive<zeek::StringVal>("testing string metadata")) ) {
zeek::reporter->FatalError("Failed to add string metadata");
}
// Event metadata is just a vector, so can have duplicate entries.
if ( ! event.AddMetadata(string_md, zeek::make_intrusive<zeek::StringVal>("more string metadata")) ) {
zeek::reporter->FatalError("Failed to add string metadata");
}
}
else {
zeek::reporter->FatalError("Unhandled topic %s", topic.c_str());
}
return true;
}

View file

@ -0,0 +1,19 @@
#pragma once
#include <zeek/plugin/Plugin.h>
#include <string>
namespace btest::plugin::Demo_PublishEventMetadata {
class Plugin : public zeek::plugin::Plugin {
protected:
zeek::plugin::Configuration Configure() override;
void InitPostScript() override;
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
zeek::cluster::detail::Event& event) override;
};
extern Plugin plugin;
} // namespace btest::plugin::Demo_PublishEventMetadata

View file

@ -0,0 +1,77 @@
# @TEST-DOC: Smoke test sending metadata from a worker to a manager. The manager uses script level functions.
#
# @TEST-EXEC: ${DIST}/auxil/zeek-aux/plugin-support/init-plugin -u . Demo PublishEventMetadata
# @TEST-EXEC: cp -r %DIR/publish-event-metadata-plugin/* .
# @TEST-EXEC: ./configure --zeek-dist=${DIST} && make
#
# @TEST-PORT: BROKER_MANAGER_PORT
# @TEST-PORT: BROKER_WORKER1_PORT
#
# @TEST-EXEC: cp $FILES/broker/cluster-layout.zeek .
#
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. ZEEK_PLUGIN_PATH=`pwd` CLUSTER_NODE=manager zeek -b Demo::PublishEventMetadata %INPUT
# @TEST-EXEC: btest-bg-run worker-1 ZEEKPATH=$ZEEKPATH:.. ZEEK_PLUGIN_PATH=`pwd` CLUSTER_NODE=worker-1 zeek -b Demo::PublishEventMetadata %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff manager/.stdout
# @TEST-EXEC: btest-diff manager/.stderr
# @TEST-EXEC: btest-diff worker-1/.stdout
# @TEST-EXEC: TEST_DIFF_CANONIFIER='grep -v PEER_UNAVAILABLE' btest-diff worker-1/.stderr
redef allow_network_time_forward = F;
@load frameworks/cluster/experimental
module App;
export {
global test_event: event(c: count);
redef enum EventMetadata::ID += {
CUSTOM_METADATA_STRING = 4711,
CUSTOM_METADATA_COUNT = 4712,
CUSTOM_METADATA_TABLE = 4713,
};
}
event App::test_event(c: count)
{
local mdv = EventMetadata::current_all();
print fmt("App::test_event(%s) |mdv|=%s", c, |mdv|);
for ( _, md in mdv )
print md;
print "custom metadata string", EventMetadata::current(App::CUSTOM_METADATA_STRING);
print "custom metadata count", EventMetadata::current(App::CUSTOM_METADATA_COUNT);
print "custom metadata table", EventMetadata::current(App::CUSTOM_METADATA_TABLE);
if ( c == 4 )
terminate();
}
event zeek_init() &priority=20
{
assert EventMetadata::register(CUSTOM_METADATA_STRING, string);
assert EventMetadata::register(CUSTOM_METADATA_COUNT, count);
assert EventMetadata::register(CUSTOM_METADATA_TABLE, table[string] of string);
Cluster::subscribe("topic1");
Cluster::subscribe("topic2");
Cluster::subscribe("topic3");
Cluster::subscribe("topic4");
}
event Cluster::Experimental::cluster_started()
{
if ( Cluster::node == "worker-1" )
{
Cluster::publish("topic1", test_event, 1);
Cluster::publish("topic2", test_event, 2);
Cluster::publish("topic3", test_event, 3);
Cluster::publish("topic4", test_event, 4);
}
}
event Cluster::node_down(name: string, id: string)
{
terminate();
}