mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/awelzel/cluster-event-out-of-detail'
* origin/topic/awelzel/cluster-event-out-of-detail: cluster::Event: Move implementation into cluster/Event.{h,cc} cluster: Move cluster::detail::Event to cluster::Event
This commit is contained in:
commit
ab282e3637
24 changed files with 194 additions and 168 deletions
13
CHANGES
13
CHANGES
|
@ -1,3 +1,16 @@
|
|||
8.0.0-dev.797 | 2025-07-29 18:23:35 +0200
|
||||
|
||||
* cluster::Event: Move implementation into cluster/Event.{h,cc} (Arne Welzel, Corelight)
|
||||
|
||||
* cluster: Move cluster::detail::Event to cluster::Event (Arne Welzel, Corelight)
|
||||
|
||||
This class is a parameter of virtual methods of the Backend API for users
|
||||
to implement and also a parameter to the HookPublishEvent() API. Seems it
|
||||
shouldn't be in detail and instead we should own it.
|
||||
|
||||
Alternatively, could mark the cluster APIs as not-stable-yet, but I
|
||||
think we can move forward and make it non-detail for 8.0.
|
||||
|
||||
8.0.0-dev.794 | 2025-07-29 07:41:13 -0700
|
||||
|
||||
* Fix ConnKey deprecation warnings from generic fuzzer (Tim Wojtulewicz, Corelight)
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
8.0.0-dev.794
|
||||
8.0.0-dev.797
|
||||
|
|
|
@ -828,7 +828,7 @@ std::vector<broker::peer_info> Manager::Peers() const {
|
|||
|
||||
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
|
||||
|
||||
bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) {
|
||||
bool Manager::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
||||
if ( ! do_publish )
|
||||
return true;
|
||||
|
|
|
@ -420,7 +420,7 @@ private:
|
|||
void DoTerminate() override;
|
||||
|
||||
// Broker overrides this to do its own serialization.
|
||||
bool DoPublishEvent(const std::string& topic, cluster::detail::Event& event) override;
|
||||
bool DoPublishEvent(const std::string& topic, cluster::Event& event) override;
|
||||
|
||||
// This should never be reached, broker itself doesn't call this and overrides
|
||||
// the generic DoPublishEvent() method that would call this.
|
||||
|
|
|
@ -111,7 +111,7 @@ void WebSocketShim::DoTerminate() {
|
|||
}
|
||||
}
|
||||
|
||||
bool WebSocketShim::DoPublishEvent(const std::string& topic, zeek::cluster::detail::Event& event) {
|
||||
bool WebSocketShim::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||
auto r = cluster::detail::to_broker_event(event);
|
||||
if ( ! r ) {
|
||||
ProcessError("broker_error", "Failed to convert Zeek event to Broker event");
|
||||
|
|
|
@ -45,7 +45,7 @@ private:
|
|||
void DoInitPostScript() override {}
|
||||
bool DoInit() override;
|
||||
void DoTerminate() override;
|
||||
bool DoPublishEvent(const std::string& topic, zeek::cluster::detail::Event& event) override;
|
||||
bool DoPublishEvent(const std::string& topic, zeek::cluster::Event& event) override;
|
||||
bool DoPublishEvent(const std::string& topic, const std::string& format, const zeek::byte_buffer& buf) override {
|
||||
throw new std::logic_error("not implemented");
|
||||
}
|
||||
|
|
|
@ -27,42 +27,7 @@
|
|||
|
||||
using namespace zeek::cluster;
|
||||
|
||||
double detail::Event::Timestamp() const {
|
||||
if ( meta ) {
|
||||
for ( const auto& m : *meta ) {
|
||||
if ( m.Id() == static_cast<zeek_uint_t>(zeek::detail::MetadataType::NetworkTimestamp) )
|
||||
return m.Val()->AsTime();
|
||||
}
|
||||
}
|
||||
|
||||
return zeek::detail::NO_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool detail::Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
|
||||
if ( ! id || ! val )
|
||||
return false;
|
||||
|
||||
const auto* desc = zeek::event_registry->LookupMetadata(id->Get());
|
||||
if ( ! desc )
|
||||
return false;
|
||||
|
||||
if ( ! same_type(val->GetType(), desc->Type()) )
|
||||
return false;
|
||||
|
||||
if ( ! meta )
|
||||
meta = std::make_unique<zeek::detail::EventMetadataVector>();
|
||||
|
||||
// Internally stored as zeek_uint_t for serializers.
|
||||
meta->emplace_back(desc->Id(), std::move(val));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> detail::Event::Take() && {
|
||||
return {handler, std::move(args), std::move(meta)};
|
||||
}
|
||||
|
||||
bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, detail::Event e) {
|
||||
bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, cluster::Event e) {
|
||||
auto [handler, args, meta] = std::move(e).Take();
|
||||
zeek::event_mgr.Enqueue(std::move(meta), handler, std::move(args), util::detail::SOURCE_BROKER);
|
||||
return true;
|
||||
|
@ -139,7 +104,7 @@ bool Backend::Init(std::string nid) {
|
|||
return DoInit();
|
||||
}
|
||||
|
||||
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const {
|
||||
std::optional<Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const {
|
||||
auto checked_args = detail::check_args(handler, args);
|
||||
if ( ! checked_args )
|
||||
return std::nullopt;
|
||||
|
@ -170,7 +135,7 @@ std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsS
|
|||
if ( zeek::BifConst::EventMetadata::add_network_timestamp )
|
||||
meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime());
|
||||
|
||||
return zeek::cluster::detail::Event{eh, std::move(*checked_args), std::move(meta)};
|
||||
return Event{eh, std::move(*checked_args), std::move(meta)};
|
||||
}
|
||||
|
||||
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
|
||||
|
@ -179,7 +144,7 @@ void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
|
|||
}
|
||||
|
||||
// Default implementation doing the serialization.
|
||||
bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) {
|
||||
bool Backend::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||
byte_buffer buf;
|
||||
|
||||
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
||||
|
@ -209,7 +174,7 @@ void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) {
|
|||
event_handling_strategy->ProcessLocalEvent(h, std::move(args));
|
||||
}
|
||||
|
||||
bool Backend::ProcessEvent(std::string_view topic, detail::Event e) {
|
||||
bool Backend::ProcessEvent(std::string_view topic, cluster::Event e) {
|
||||
return event_handling_strategy->ProcessEvent(topic, std::move(e));
|
||||
}
|
||||
|
||||
|
@ -316,7 +281,7 @@ TEST_SUITE_BEGIN("cluster event");
|
|||
TEST_CASE("add metadata") {
|
||||
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
||||
zeek::Args args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)};
|
||||
zeek::cluster::detail::Event event{handler, std::move(args), nullptr};
|
||||
zeek::cluster::Event event{handler, std::move(args), nullptr};
|
||||
|
||||
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
|
||||
REQUIRE(nts);
|
||||
|
|
|
@ -9,13 +9,13 @@
|
|||
#include <string_view>
|
||||
#include <variant>
|
||||
|
||||
#include "zeek/Event.h"
|
||||
#include "zeek/EventHandler.h"
|
||||
#include "zeek/Span.h"
|
||||
#include "zeek/Tag.h"
|
||||
#include "zeek/Val.h"
|
||||
#include "zeek/ZeekArgs.h"
|
||||
#include "zeek/cluster/BifSupport.h"
|
||||
#include "zeek/cluster/Event.h"
|
||||
#include "zeek/cluster/OnLoop.h"
|
||||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/Telemetry.h"
|
||||
|
@ -32,73 +32,6 @@ namespace cluster {
|
|||
|
||||
namespace detail {
|
||||
|
||||
/**
|
||||
* Cluster event class.
|
||||
*/
|
||||
class Event {
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
Event(const EventHandlerPtr& handler, zeek::Args args, zeek::detail::EventMetadataVectorPtr meta)
|
||||
: handler(handler), args(std::move(args)), meta(std::move(meta)) {}
|
||||
|
||||
/**
|
||||
* @return The name of the event.
|
||||
*/
|
||||
std::string_view HandlerName() const { return handler->Name(); }
|
||||
|
||||
/**
|
||||
* @return The event's handler.
|
||||
*/
|
||||
const EventHandlerPtr& Handler() const { return handler; }
|
||||
|
||||
/**
|
||||
* @return The event's arguments.
|
||||
*/
|
||||
const zeek::Args& Args() const { return args; }
|
||||
/**
|
||||
* @return The event's arguments.
|
||||
*/
|
||||
zeek::Args& Args() { return args; }
|
||||
|
||||
/**
|
||||
* @return The network timestamp metadata of this event or -1.0 if not set.
|
||||
*/
|
||||
double Timestamp() const;
|
||||
|
||||
/**
|
||||
* Add metadata to this cluster event.
|
||||
*
|
||||
* The used metadata \a id has to be registered via the Zeek script-layer
|
||||
* function EventMetadata::register(), or via the C++ API
|
||||
* EventMgr::RegisterMetadata() during an InitPostScript() hook.
|
||||
*
|
||||
* Non-registered metadata will not be added and false is returned.
|
||||
*
|
||||
* @param id The enum value identifying the event metadata.
|
||||
* @param val The value to use.
|
||||
|
||||
* @return true if \a val was was added, else false.
|
||||
*/
|
||||
bool AddMetadata(const EnumValPtr& id, ValPtr val);
|
||||
|
||||
/**
|
||||
* @return A pointer to the metadata vector, or nullptr if no Metadata has been added yet.
|
||||
*/
|
||||
const zeek::detail::EventMetadataVector* Metadata() const { return meta.get(); }
|
||||
|
||||
/**
|
||||
* Move data out of this event as preparation for Enqueue()
|
||||
*/
|
||||
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> Take() &&;
|
||||
|
||||
private:
|
||||
EventHandlerPtr handler;
|
||||
zeek::Args args;
|
||||
zeek::detail::EventMetadataVectorPtr meta;
|
||||
};
|
||||
|
||||
/**
|
||||
* Interface for processing cluster::Event instances received
|
||||
* on a given topic.
|
||||
|
@ -123,7 +56,7 @@ public:
|
|||
*
|
||||
* @return true if the remote event was handled successfully, else false.
|
||||
*/
|
||||
bool ProcessEvent(std::string_view topic, Event e) { return DoProcessEvent(topic, std::move(e)); }
|
||||
bool ProcessEvent(std::string_view topic, cluster::Event e) { return DoProcessEvent(topic, std::move(e)); }
|
||||
|
||||
/**
|
||||
* Method for enquing backend specific events.
|
||||
|
@ -156,7 +89,7 @@ private:
|
|||
*
|
||||
* @return true if the remote event was handled successfully, else false.
|
||||
*/
|
||||
virtual bool DoProcessEvent(std::string_view topic, Event e) = 0;
|
||||
virtual bool DoProcessEvent(std::string_view topic, cluster::Event e) = 0;
|
||||
|
||||
/**
|
||||
* Hook method for implementing ProcessLocalEvent().
|
||||
|
@ -180,7 +113,7 @@ private:
|
|||
*/
|
||||
class LocalEventHandlingStrategy : public EventHandlingStrategy {
|
||||
private:
|
||||
bool DoProcessEvent(std::string_view topic, Event e) override;
|
||||
bool DoProcessEvent(std::string_view topic, cluster::Event e) override;
|
||||
void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override;
|
||||
void DoProcessError(std::string_view tag, std::string_view message) override;
|
||||
};
|
||||
|
@ -223,13 +156,13 @@ public:
|
|||
void Terminate() { DoTerminate(); }
|
||||
|
||||
/**
|
||||
* Create a cluster::detail::Event instance given an event handler and the
|
||||
* Create a cluster::Event instance given an event handler and the
|
||||
* script function arguments to it.
|
||||
*
|
||||
* @param handler A function val representing an event handler.
|
||||
* @param args The arguments for the event handler.
|
||||
*/
|
||||
std::optional<detail::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const;
|
||||
std::optional<cluster::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const;
|
||||
|
||||
/**
|
||||
* Publish a cluster::detail::Event instance to a given topic.
|
||||
|
@ -244,7 +177,7 @@ public:
|
|||
*
|
||||
* @return true if the event was successfully published.
|
||||
*/
|
||||
bool PublishEvent(const std::string& topic, cluster::detail::Event& event) { return DoPublishEvent(topic, event); }
|
||||
bool PublishEvent(const std::string& topic, cluster::Event& event) { return DoPublishEvent(topic, event); }
|
||||
|
||||
/**
|
||||
* Status codes for callbacks.
|
||||
|
@ -385,7 +318,7 @@ protected:
|
|||
* @param topic The topic on which the event was received.
|
||||
* @param e The event as cluster::detail::Event.
|
||||
*/
|
||||
bool ProcessEvent(std::string_view topic, detail::Event e);
|
||||
bool ProcessEvent(std::string_view topic, cluster::Event e);
|
||||
|
||||
/**
|
||||
* An error happened, pass it to the event handling strategy.
|
||||
|
@ -465,7 +398,7 @@ private:
|
|||
* This hook method only exists for the existing Broker implementation that
|
||||
* short-circuits serialization. Other backends should not override this.
|
||||
*/
|
||||
virtual bool DoPublishEvent(const std::string& topic, cluster::detail::Event& event);
|
||||
virtual bool DoPublishEvent(const std::string& topic, cluster::Event& event);
|
||||
|
||||
/**
|
||||
* Send a serialized cluster::detail::Event to the given topic.
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
#include "zeek/cluster/BifSupport.h"
|
||||
|
||||
#include "zeek/Desc.h"
|
||||
#include "zeek/Event.h"
|
||||
#include "zeek/EventRegistry.h"
|
||||
#include "zeek/Frame.h"
|
||||
#include "zeek/Func.h"
|
||||
|
@ -16,9 +15,9 @@
|
|||
|
||||
namespace {
|
||||
|
||||
// Convert a script-level Cluster::Event to a cluster::detail::Event.
|
||||
std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::cluster::Backend* backend,
|
||||
const zeek::RecordValPtr& rec) {
|
||||
// Convert a script-level Cluster::Event to a cluster::Event.
|
||||
std::optional<zeek::cluster::Event> to_cluster_event(const zeek::cluster::Backend* backend,
|
||||
const zeek::RecordValPtr& rec) {
|
||||
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
||||
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ zeek_add_subdir_library(
|
|||
Backend.cc
|
||||
BifSupport.cc
|
||||
Component.cc
|
||||
Event.cc
|
||||
Manager.cc
|
||||
Telemetry.cc
|
||||
BIFS
|
||||
|
|
43
src/cluster/Event.cc
Normal file
43
src/cluster/Event.cc
Normal file
|
@ -0,0 +1,43 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "zeek/cluster/Event.h"
|
||||
|
||||
#include "zeek/Event.h"
|
||||
#include "zeek/EventRegistry.h"
|
||||
|
||||
using namespace zeek::cluster;
|
||||
|
||||
double Event::Timestamp() const {
|
||||
if ( meta ) {
|
||||
for ( const auto& m : *meta ) {
|
||||
if ( m.Id() == static_cast<zeek_uint_t>(zeek::detail::MetadataType::NetworkTimestamp) )
|
||||
return m.Val()->AsTime();
|
||||
}
|
||||
}
|
||||
|
||||
return zeek::detail::NO_TIMESTAMP;
|
||||
}
|
||||
|
||||
bool Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
|
||||
if ( ! id || ! val )
|
||||
return false;
|
||||
|
||||
const auto* desc = zeek::event_registry->LookupMetadata(id->Get());
|
||||
if ( ! desc )
|
||||
return false;
|
||||
|
||||
if ( ! same_type(val->GetType(), desc->Type()) )
|
||||
return false;
|
||||
|
||||
if ( ! meta )
|
||||
meta = std::make_unique<zeek::detail::EventMetadataVector>();
|
||||
|
||||
// Internally stored as zeek_uint_t for serializers.
|
||||
meta->emplace_back(desc->Id(), std::move(val));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> Event::Take() && {
|
||||
return {handler, std::move(args), std::move(meta)};
|
||||
}
|
82
src/cluster/Event.h
Normal file
82
src/cluster/Event.h
Normal file
|
@ -0,0 +1,82 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
// Val-based representation of an event for cluster communication.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/Event.h"
|
||||
#include "zeek/EventHandler.h"
|
||||
#include "zeek/Val.h"
|
||||
#include "zeek/ZeekArgs.h"
|
||||
|
||||
namespace zeek::cluster {
|
||||
|
||||
/**
|
||||
* Cluster event class.
|
||||
*/
|
||||
class Event {
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
Event(const EventHandlerPtr& handler, zeek::Args args, zeek::detail::EventMetadataVectorPtr meta)
|
||||
: handler(handler), args(std::move(args)), meta(std::move(meta)) {}
|
||||
|
||||
/**
|
||||
* @return The name of the event.
|
||||
*/
|
||||
std::string_view HandlerName() const { return handler->Name(); }
|
||||
|
||||
/**
|
||||
* @return The event's handler.
|
||||
*/
|
||||
const EventHandlerPtr& Handler() const { return handler; }
|
||||
|
||||
/**
|
||||
* @return The event's arguments.
|
||||
*/
|
||||
const zeek::Args& Args() const { return args; }
|
||||
/**
|
||||
* @return The event's arguments.
|
||||
*/
|
||||
zeek::Args& Args() { return args; }
|
||||
|
||||
/**
|
||||
* @return The network timestamp metadata of this event or -1.0 if not set.
|
||||
*/
|
||||
double Timestamp() const;
|
||||
|
||||
/**
|
||||
* Add metadata to this cluster event.
|
||||
*
|
||||
* The used metadata \a id has to be registered via the Zeek script-layer
|
||||
* function EventMetadata::register(), or via the C++ API
|
||||
* EventMgr::RegisterMetadata() during an InitPostScript() hook.
|
||||
*
|
||||
* Non-registered metadata will not be added and false is returned.
|
||||
*
|
||||
* @param id The enum value identifying the event metadata.
|
||||
* @param val The value to use.
|
||||
|
||||
* @return true if \a val was was added, else false.
|
||||
*/
|
||||
bool AddMetadata(const EnumValPtr& id, ValPtr val);
|
||||
|
||||
/**
|
||||
* @return A pointer to the metadata vector, or nullptr if no Metadata has been added yet.
|
||||
*/
|
||||
const zeek::detail::EventMetadataVector* Metadata() const { return meta.get(); }
|
||||
|
||||
/**
|
||||
* Move data out of this event as preparation for Enqueue()
|
||||
*/
|
||||
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> Take() &&;
|
||||
|
||||
private:
|
||||
EventHandlerPtr handler;
|
||||
zeek::Args args;
|
||||
zeek::detail::EventMetadataVectorPtr meta;
|
||||
};
|
||||
|
||||
|
||||
} // namespace zeek::cluster
|
|
@ -12,9 +12,7 @@
|
|||
|
||||
namespace zeek::cluster {
|
||||
|
||||
namespace detail {
|
||||
class Event;
|
||||
} // namespace detail
|
||||
|
||||
/**
|
||||
* This class handles encoding of events into byte buffers and back.
|
||||
|
@ -34,7 +32,7 @@ public:
|
|||
*
|
||||
* @returns True on success, false in exceptional cases (e.g. unsupported serialization).
|
||||
*/
|
||||
virtual bool SerializeEvent(byte_buffer& buf, const detail::Event& event) = 0;
|
||||
virtual bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) = 0;
|
||||
|
||||
/**
|
||||
* Unserialize an event from a given byte buffer.
|
||||
|
@ -43,7 +41,7 @@ public:
|
|||
*
|
||||
* @returns The event, or std::nullopt on error.
|
||||
*/
|
||||
virtual std::optional<cluster::detail::Event> UnserializeEvent(byte_buffer_span buf) = 0;
|
||||
virtual std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) = 0;
|
||||
|
||||
/**
|
||||
* @returns The name of this event serializer instance.
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
#include "zeek/cluster/serializer/broker/Serializer.h"
|
||||
|
||||
#include <cinttypes>
|
||||
#include <optional>
|
||||
|
||||
#include "zeek/DebugLogger.h"
|
||||
|
@ -13,7 +12,7 @@
|
|||
#include "zeek/Reporter.h"
|
||||
#include "zeek/Type.h"
|
||||
#include "zeek/broker/Data.h"
|
||||
#include "zeek/cluster/Backend.h"
|
||||
#include "zeek/cluster/Event.h"
|
||||
|
||||
#include "broker/data.bif.h"
|
||||
#include "broker/data_envelope.hh"
|
||||
|
@ -55,7 +54,7 @@ zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(c
|
|||
return meta;
|
||||
}
|
||||
|
||||
std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event& ev) {
|
||||
std::optional<broker::zeek::Event> detail::to_broker_event(const zeek::cluster::Event& ev) {
|
||||
broker::vector xs;
|
||||
xs.reserve(ev.Args().size());
|
||||
|
||||
|
@ -96,7 +95,7 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event&
|
|||
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
|
||||
}
|
||||
|
||||
std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
|
||||
std::optional<zeek::cluster::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
|
||||
auto&& name = ev.name();
|
||||
auto&& args = ev.args();
|
||||
|
||||
|
@ -141,10 +140,10 @@ std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev
|
|||
}
|
||||
|
||||
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
|
||||
return cluster::detail::Event{handler, std::move(vl), std::move(meta)};
|
||||
return zeek::cluster::Event{handler, std::move(vl), std::move(meta)};
|
||||
}
|
||||
|
||||
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
|
||||
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const zeek::cluster::Event& event) {
|
||||
auto ev = to_broker_event(event);
|
||||
if ( ! ev )
|
||||
return false;
|
||||
|
@ -160,7 +159,7 @@ bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const deta
|
|||
return true;
|
||||
}
|
||||
|
||||
std::optional<detail::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||
std::optional<zeek::cluster::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||
auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "",
|
||||
buf.data(), buf.size());
|
||||
if ( ! r )
|
||||
|
@ -185,7 +184,7 @@ struct PushBackAdapter {
|
|||
};
|
||||
|
||||
|
||||
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
|
||||
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const zeek::cluster::Event& event) {
|
||||
auto ev = to_broker_event(event);
|
||||
if ( ! ev )
|
||||
return false;
|
||||
|
@ -195,7 +194,7 @@ bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const det
|
|||
return true;
|
||||
}
|
||||
|
||||
std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||
std::optional<zeek::cluster::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||
broker::variant res;
|
||||
auto err =
|
||||
broker::format::json::v1::decode(std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()}, res);
|
||||
|
@ -213,8 +212,8 @@ TEST_SUITE_BEGIN("cluster serializer broker");
|
|||
|
||||
TEST_CASE("roundtrip") {
|
||||
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
||||
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
|
||||
nullptr};
|
||||
zeek::cluster::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
|
||||
nullptr};
|
||||
|
||||
// Register network timestamp metadata. This is idempotent.
|
||||
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
|
||||
|
|
|
@ -42,9 +42,9 @@ zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const bro
|
|||
* to the appropriate Val instances.
|
||||
*
|
||||
* @param ev The broker side event.
|
||||
* @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed.
|
||||
* @returns A zeek::cluster::Event instance, or std::nullopt if the conversion failed.
|
||||
*/
|
||||
std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev);
|
||||
std::optional<cluster::Event> to_zeek_event(const broker::zeek::Event& ev);
|
||||
|
||||
/**
|
||||
* Convert a cluster::detail::Event to a broker::zeek::Event.
|
||||
|
@ -52,7 +52,7 @@ std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev);
|
|||
* @param ev The cluster::detail::Event
|
||||
* @return A broker::zeek::Event to be serialized, or nullopt in case of errors.
|
||||
*/
|
||||
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev);
|
||||
std::optional<broker::zeek::Event> to_broker_event(const cluster::Event& ev);
|
||||
|
||||
// Implementation of the EventSerializer using the existing broker::detail::val_to_data()
|
||||
// and broker::format::bin::v1::encode().
|
||||
|
@ -60,9 +60,9 @@ class BrokerBinV1_Serializer : public EventSerializer {
|
|||
public:
|
||||
BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {}
|
||||
|
||||
bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override;
|
||||
bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) override;
|
||||
|
||||
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||
std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||
};
|
||||
|
||||
// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data()
|
||||
|
@ -71,9 +71,9 @@ class BrokerJsonV1_Serializer : public EventSerializer {
|
|||
public:
|
||||
BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {}
|
||||
|
||||
bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override;
|
||||
bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) override;
|
||||
|
||||
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||
std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||
};
|
||||
|
||||
} // namespace cluster::detail
|
||||
|
|
|
@ -55,7 +55,7 @@ private:
|
|||
* will need some abstractions if client's can opt to use different encodings
|
||||
* of events in the future.
|
||||
*/
|
||||
bool DoProcessEvent(std::string_view topic, zeek::cluster::detail::Event e) override {
|
||||
bool DoProcessEvent(std::string_view topic, zeek::cluster::Event e) override {
|
||||
// If the client has left, no point in sending it any pending event.
|
||||
if ( wsc->IsTerminated() )
|
||||
return true;
|
||||
|
@ -71,7 +71,7 @@ private:
|
|||
// if that's faster.
|
||||
auto ev = zeek::cluster::detail::to_broker_event(e);
|
||||
if ( ! ev ) {
|
||||
fprintf(stderr, "[ERROR] Unable to go from detail::Event to broker::event\n");
|
||||
fprintf(stderr, "[ERROR] Unable to go from cluster::Event to broker::event\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -938,7 +938,7 @@ void Manager::HookUnprocessedPacket(const Packet* packet) const {
|
|||
}
|
||||
|
||||
bool Manager::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) const {
|
||||
zeek::cluster::Event& event) const {
|
||||
HookArgumentList args;
|
||||
|
||||
if ( HavePluginForHook(META_HOOK_PRE) ) {
|
||||
|
|
|
@ -456,8 +456,7 @@ public:
|
|||
* @return true if event should be published, false if the publish
|
||||
* operation should be skipped.
|
||||
*/
|
||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) const;
|
||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) const;
|
||||
|
||||
/**
|
||||
* Internal method that registers a freshly instantiated plugin with
|
||||
|
|
|
@ -350,8 +350,7 @@ bool Plugin::HookReporter(const std::string& prefix, const EventHandlerPtr event
|
|||
|
||||
void Plugin::HookUnprocessedPacket(const Packet* packet) {}
|
||||
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) {
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -54,10 +54,7 @@ class Frame;
|
|||
|
||||
namespace cluster {
|
||||
class Backend;
|
||||
|
||||
namespace detail {
|
||||
class Event;
|
||||
}
|
||||
} // namespace cluster
|
||||
|
||||
namespace plugin {
|
||||
|
@ -433,7 +430,7 @@ public:
|
|||
/**
|
||||
* Constructor with cluster event argument.
|
||||
*/
|
||||
explicit HookArgument(zeek::cluster::detail::Event* event) {
|
||||
explicit HookArgument(zeek::cluster::Event* event) {
|
||||
type = CLUSTER_EVENT;
|
||||
arg.cluster_event = event;
|
||||
}
|
||||
|
@ -592,7 +589,7 @@ public:
|
|||
/**
|
||||
* Returns the value for a cluster event argument.
|
||||
*/
|
||||
const zeek::cluster::detail::Event* AsClusterEvent() const {
|
||||
const zeek::cluster::Event* AsClusterEvent() const {
|
||||
assert(type == CLUSTER_EVENT);
|
||||
return arg.cluster_event;
|
||||
}
|
||||
|
@ -627,7 +624,7 @@ private:
|
|||
const zeek::detail::Location* loc;
|
||||
const Packet* packet;
|
||||
const cluster::Backend* cluster_backend;
|
||||
const cluster::detail::Event* cluster_event;
|
||||
const cluster::Event* cluster_event;
|
||||
} arg;
|
||||
|
||||
// Outside union because these have dtors.
|
||||
|
@ -1148,7 +1145,7 @@ protected:
|
|||
* operation should be skipped.
|
||||
*/
|
||||
virtual bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event);
|
||||
zeek::cluster::Event& event);
|
||||
|
||||
// Meta hooks.
|
||||
virtual void MetaHookPre(HookType hook, const HookArgumentList& args);
|
||||
|
|
|
@ -43,8 +43,7 @@ static void describe_hook_args(const zeek::plugin::HookArgumentList& args, zeek:
|
|||
}
|
||||
}
|
||||
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) {
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||
std::fprintf(stdout, "%.6f %-15s backend=%s topic=%s event=%s\n", zeek::run_state::network_time,
|
||||
" HookPublishEvent", backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ protected:
|
|||
void InitPostScript() override;
|
||||
|
||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) override;
|
||||
zeek::cluster::Event& event) override;
|
||||
|
||||
void MetaHookPre(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args) override;
|
||||
void MetaHookPost(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args,
|
||||
|
|
|
@ -31,8 +31,7 @@ void Plugin::InitPostScript() {
|
|||
std::fprintf(stdout, "%.6f %-15s\n", zeek::run_state::network_time, " InitPostScript");
|
||||
}
|
||||
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) {
|
||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||
std::fprintf(stdout, "%.6f %s backend=%s topic=%s event=%s\n", zeek::run_state::network_time, "HookPublishEvent",
|
||||
backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ protected:
|
|||
void InitPostScript() override;
|
||||
|
||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||
zeek::cluster::detail::Event& event) override;
|
||||
zeek::cluster::Event& event) override;
|
||||
};
|
||||
|
||||
extern Plugin plugin;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue