mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
cluster: Move cluster::detail::Event to cluster::Event
This class is a parameter of virtual methods of the Backend API for users to implement and also a parameter to the HookPublishEvent() API. Seems it shouldn't be in detail and instead we should own it. Alternatively, could mark the cluster APIs as not-stable-yet, but I think we can move forward and make it non-detail for 8.0.
This commit is contained in:
parent
cd7836dda2
commit
bda70067ec
19 changed files with 57 additions and 68 deletions
|
@ -828,7 +828,7 @@ std::vector<broker::peer_info> Manager::Peers() const {
|
||||||
|
|
||||||
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
|
std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); }
|
||||||
|
|
||||||
bool Manager::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) {
|
bool Manager::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||||
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
||||||
if ( ! do_publish )
|
if ( ! do_publish )
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -420,7 +420,7 @@ private:
|
||||||
void DoTerminate() override;
|
void DoTerminate() override;
|
||||||
|
|
||||||
// Broker overrides this to do its own serialization.
|
// Broker overrides this to do its own serialization.
|
||||||
bool DoPublishEvent(const std::string& topic, cluster::detail::Event& event) override;
|
bool DoPublishEvent(const std::string& topic, cluster::Event& event) override;
|
||||||
|
|
||||||
// This should never be reached, broker itself doesn't call this and overrides
|
// This should never be reached, broker itself doesn't call this and overrides
|
||||||
// the generic DoPublishEvent() method that would call this.
|
// the generic DoPublishEvent() method that would call this.
|
||||||
|
|
|
@ -111,7 +111,7 @@ void WebSocketShim::DoTerminate() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WebSocketShim::DoPublishEvent(const std::string& topic, zeek::cluster::detail::Event& event) {
|
bool WebSocketShim::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||||
auto r = cluster::detail::to_broker_event(event);
|
auto r = cluster::detail::to_broker_event(event);
|
||||||
if ( ! r ) {
|
if ( ! r ) {
|
||||||
ProcessError("broker_error", "Failed to convert Zeek event to Broker event");
|
ProcessError("broker_error", "Failed to convert Zeek event to Broker event");
|
||||||
|
|
|
@ -45,7 +45,7 @@ private:
|
||||||
void DoInitPostScript() override {}
|
void DoInitPostScript() override {}
|
||||||
bool DoInit() override;
|
bool DoInit() override;
|
||||||
void DoTerminate() override;
|
void DoTerminate() override;
|
||||||
bool DoPublishEvent(const std::string& topic, zeek::cluster::detail::Event& event) override;
|
bool DoPublishEvent(const std::string& topic, zeek::cluster::Event& event) override;
|
||||||
bool DoPublishEvent(const std::string& topic, const std::string& format, const zeek::byte_buffer& buf) override {
|
bool DoPublishEvent(const std::string& topic, const std::string& format, const zeek::byte_buffer& buf) override {
|
||||||
throw new std::logic_error("not implemented");
|
throw new std::logic_error("not implemented");
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
|
|
||||||
using namespace zeek::cluster;
|
using namespace zeek::cluster;
|
||||||
|
|
||||||
double detail::Event::Timestamp() const {
|
double Event::Timestamp() const {
|
||||||
if ( meta ) {
|
if ( meta ) {
|
||||||
for ( const auto& m : *meta ) {
|
for ( const auto& m : *meta ) {
|
||||||
if ( m.Id() == static_cast<zeek_uint_t>(zeek::detail::MetadataType::NetworkTimestamp) )
|
if ( m.Id() == static_cast<zeek_uint_t>(zeek::detail::MetadataType::NetworkTimestamp) )
|
||||||
|
@ -38,7 +38,7 @@ double detail::Event::Timestamp() const {
|
||||||
return zeek::detail::NO_TIMESTAMP;
|
return zeek::detail::NO_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool detail::Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
|
bool Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
|
||||||
if ( ! id || ! val )
|
if ( ! id || ! val )
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
@ -58,11 +58,11 @@ bool detail::Event::AddMetadata(const EnumValPtr& id, zeek::ValPtr val) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> detail::Event::Take() && {
|
std::tuple<zeek::EventHandlerPtr, zeek::Args, zeek::detail::EventMetadataVectorPtr> Event::Take() && {
|
||||||
return {handler, std::move(args), std::move(meta)};
|
return {handler, std::move(args), std::move(meta)};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, detail::Event e) {
|
bool detail::LocalEventHandlingStrategy::DoProcessEvent(std::string_view topic, cluster::Event e) {
|
||||||
auto [handler, args, meta] = std::move(e).Take();
|
auto [handler, args, meta] = std::move(e).Take();
|
||||||
zeek::event_mgr.Enqueue(std::move(meta), handler, std::move(args), util::detail::SOURCE_BROKER);
|
zeek::event_mgr.Enqueue(std::move(meta), handler, std::move(args), util::detail::SOURCE_BROKER);
|
||||||
return true;
|
return true;
|
||||||
|
@ -139,7 +139,7 @@ bool Backend::Init(std::string nid) {
|
||||||
return DoInit();
|
return DoInit();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const {
|
std::optional<Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const {
|
||||||
auto checked_args = detail::check_args(handler, args);
|
auto checked_args = detail::check_args(handler, args);
|
||||||
if ( ! checked_args )
|
if ( ! checked_args )
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
|
@ -170,7 +170,7 @@ std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsS
|
||||||
if ( zeek::BifConst::EventMetadata::add_network_timestamp )
|
if ( zeek::BifConst::EventMetadata::add_network_timestamp )
|
||||||
meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime());
|
meta = zeek::detail::MakeEventMetadataVector(zeek::event_mgr.CurrentEventTime());
|
||||||
|
|
||||||
return zeek::cluster::detail::Event{eh, std::move(*checked_args), std::move(meta)};
|
return Event{eh, std::move(*checked_args), std::move(meta)};
|
||||||
}
|
}
|
||||||
|
|
||||||
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
|
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
|
||||||
|
@ -179,7 +179,7 @@ void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default implementation doing the serialization.
|
// Default implementation doing the serialization.
|
||||||
bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) {
|
bool Backend::DoPublishEvent(const std::string& topic, cluster::Event& event) {
|
||||||
byte_buffer buf;
|
byte_buffer buf;
|
||||||
|
|
||||||
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
bool do_publish = PLUGIN_HOOK_WITH_RESULT(HOOK_PUBLISH_EVENT, HookPublishEvent(*this, topic, event), true);
|
||||||
|
@ -209,7 +209,7 @@ void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) {
|
||||||
event_handling_strategy->ProcessLocalEvent(h, std::move(args));
|
event_handling_strategy->ProcessLocalEvent(h, std::move(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Backend::ProcessEvent(std::string_view topic, detail::Event e) {
|
bool Backend::ProcessEvent(std::string_view topic, cluster::Event e) {
|
||||||
return event_handling_strategy->ProcessEvent(topic, std::move(e));
|
return event_handling_strategy->ProcessEvent(topic, std::move(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,7 +316,7 @@ TEST_SUITE_BEGIN("cluster event");
|
||||||
TEST_CASE("add metadata") {
|
TEST_CASE("add metadata") {
|
||||||
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
||||||
zeek::Args args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)};
|
zeek::Args args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)};
|
||||||
zeek::cluster::detail::Event event{handler, std::move(args), nullptr};
|
zeek::cluster::Event event{handler, std::move(args), nullptr};
|
||||||
|
|
||||||
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
|
auto nts = zeek::id::find_val<zeek::EnumVal>("EventMetadata::NETWORK_TIMESTAMP");
|
||||||
REQUIRE(nts);
|
REQUIRE(nts);
|
||||||
|
|
|
@ -30,8 +30,6 @@ class OnLoopProcess;
|
||||||
|
|
||||||
namespace cluster {
|
namespace cluster {
|
||||||
|
|
||||||
namespace detail {
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cluster event class.
|
* Cluster event class.
|
||||||
*/
|
*/
|
||||||
|
@ -99,6 +97,8 @@ private:
|
||||||
zeek::detail::EventMetadataVectorPtr meta;
|
zeek::detail::EventMetadataVectorPtr meta;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for processing cluster::Event instances received
|
* Interface for processing cluster::Event instances received
|
||||||
* on a given topic.
|
* on a given topic.
|
||||||
|
@ -123,7 +123,7 @@ public:
|
||||||
*
|
*
|
||||||
* @return true if the remote event was handled successfully, else false.
|
* @return true if the remote event was handled successfully, else false.
|
||||||
*/
|
*/
|
||||||
bool ProcessEvent(std::string_view topic, Event e) { return DoProcessEvent(topic, std::move(e)); }
|
bool ProcessEvent(std::string_view topic, cluster::Event e) { return DoProcessEvent(topic, std::move(e)); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method for enquing backend specific events.
|
* Method for enquing backend specific events.
|
||||||
|
@ -156,7 +156,7 @@ private:
|
||||||
*
|
*
|
||||||
* @return true if the remote event was handled successfully, else false.
|
* @return true if the remote event was handled successfully, else false.
|
||||||
*/
|
*/
|
||||||
virtual bool DoProcessEvent(std::string_view topic, Event e) = 0;
|
virtual bool DoProcessEvent(std::string_view topic, cluster::Event e) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hook method for implementing ProcessLocalEvent().
|
* Hook method for implementing ProcessLocalEvent().
|
||||||
|
@ -180,7 +180,7 @@ private:
|
||||||
*/
|
*/
|
||||||
class LocalEventHandlingStrategy : public EventHandlingStrategy {
|
class LocalEventHandlingStrategy : public EventHandlingStrategy {
|
||||||
private:
|
private:
|
||||||
bool DoProcessEvent(std::string_view topic, Event e) override;
|
bool DoProcessEvent(std::string_view topic, cluster::Event e) override;
|
||||||
void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override;
|
void DoProcessLocalEvent(EventHandlerPtr h, zeek::Args args) override;
|
||||||
void DoProcessError(std::string_view tag, std::string_view message) override;
|
void DoProcessError(std::string_view tag, std::string_view message) override;
|
||||||
};
|
};
|
||||||
|
@ -223,13 +223,13 @@ public:
|
||||||
void Terminate() { DoTerminate(); }
|
void Terminate() { DoTerminate(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a cluster::detail::Event instance given an event handler and the
|
* Create a cluster::Event instance given an event handler and the
|
||||||
* script function arguments to it.
|
* script function arguments to it.
|
||||||
*
|
*
|
||||||
* @param handler A function val representing an event handler.
|
* @param handler A function val representing an event handler.
|
||||||
* @param args The arguments for the event handler.
|
* @param args The arguments for the event handler.
|
||||||
*/
|
*/
|
||||||
std::optional<detail::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const;
|
std::optional<cluster::Event> MakeClusterEvent(FuncValPtr handler, ArgsSpan args) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Publish a cluster::detail::Event instance to a given topic.
|
* Publish a cluster::detail::Event instance to a given topic.
|
||||||
|
@ -244,7 +244,7 @@ public:
|
||||||
*
|
*
|
||||||
* @return true if the event was successfully published.
|
* @return true if the event was successfully published.
|
||||||
*/
|
*/
|
||||||
bool PublishEvent(const std::string& topic, cluster::detail::Event& event) { return DoPublishEvent(topic, event); }
|
bool PublishEvent(const std::string& topic, cluster::Event& event) { return DoPublishEvent(topic, event); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Status codes for callbacks.
|
* Status codes for callbacks.
|
||||||
|
@ -385,7 +385,7 @@ protected:
|
||||||
* @param topic The topic on which the event was received.
|
* @param topic The topic on which the event was received.
|
||||||
* @param e The event as cluster::detail::Event.
|
* @param e The event as cluster::detail::Event.
|
||||||
*/
|
*/
|
||||||
bool ProcessEvent(std::string_view topic, detail::Event e);
|
bool ProcessEvent(std::string_view topic, cluster::Event e);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An error happened, pass it to the event handling strategy.
|
* An error happened, pass it to the event handling strategy.
|
||||||
|
@ -465,7 +465,7 @@ private:
|
||||||
* This hook method only exists for the existing Broker implementation that
|
* This hook method only exists for the existing Broker implementation that
|
||||||
* short-circuits serialization. Other backends should not override this.
|
* short-circuits serialization. Other backends should not override this.
|
||||||
*/
|
*/
|
||||||
virtual bool DoPublishEvent(const std::string& topic, cluster::detail::Event& event);
|
virtual bool DoPublishEvent(const std::string& topic, cluster::Event& event);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a serialized cluster::detail::Event to the given topic.
|
* Send a serialized cluster::detail::Event to the given topic.
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
#include "zeek/cluster/BifSupport.h"
|
#include "zeek/cluster/BifSupport.h"
|
||||||
|
|
||||||
#include "zeek/Desc.h"
|
#include "zeek/Desc.h"
|
||||||
#include "zeek/Event.h"
|
|
||||||
#include "zeek/EventRegistry.h"
|
#include "zeek/EventRegistry.h"
|
||||||
#include "zeek/Frame.h"
|
#include "zeek/Frame.h"
|
||||||
#include "zeek/Func.h"
|
#include "zeek/Func.h"
|
||||||
|
@ -16,8 +15,8 @@
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
// Convert a script-level Cluster::Event to a cluster::detail::Event.
|
// Convert a script-level Cluster::Event to a cluster::Event.
|
||||||
std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::cluster::Backend* backend,
|
std::optional<zeek::cluster::Event> to_cluster_event(const zeek::cluster::Backend* backend,
|
||||||
const zeek::RecordValPtr& rec) {
|
const zeek::RecordValPtr& rec) {
|
||||||
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
||||||
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
||||||
|
|
|
@ -12,9 +12,7 @@
|
||||||
|
|
||||||
namespace zeek::cluster {
|
namespace zeek::cluster {
|
||||||
|
|
||||||
namespace detail {
|
|
||||||
class Event;
|
class Event;
|
||||||
} // namespace detail
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class handles encoding of events into byte buffers and back.
|
* This class handles encoding of events into byte buffers and back.
|
||||||
|
@ -34,7 +32,7 @@ public:
|
||||||
*
|
*
|
||||||
* @returns True on success, false in exceptional cases (e.g. unsupported serialization).
|
* @returns True on success, false in exceptional cases (e.g. unsupported serialization).
|
||||||
*/
|
*/
|
||||||
virtual bool SerializeEvent(byte_buffer& buf, const detail::Event& event) = 0;
|
virtual bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unserialize an event from a given byte buffer.
|
* Unserialize an event from a given byte buffer.
|
||||||
|
@ -43,7 +41,7 @@ public:
|
||||||
*
|
*
|
||||||
* @returns The event, or std::nullopt on error.
|
* @returns The event, or std::nullopt on error.
|
||||||
*/
|
*/
|
||||||
virtual std::optional<cluster::detail::Event> UnserializeEvent(byte_buffer_span buf) = 0;
|
virtual std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @returns The name of this event serializer instance.
|
* @returns The name of this event serializer instance.
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
#include "zeek/cluster/serializer/broker/Serializer.h"
|
#include "zeek/cluster/serializer/broker/Serializer.h"
|
||||||
|
|
||||||
#include <cinttypes>
|
|
||||||
#include <optional>
|
#include <optional>
|
||||||
|
|
||||||
#include "zeek/DebugLogger.h"
|
#include "zeek/DebugLogger.h"
|
||||||
|
@ -55,7 +54,7 @@ zeek::detail::EventMetadataVectorPtr detail::metadata_vector_from_broker_event(c
|
||||||
return meta;
|
return meta;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event& ev) {
|
std::optional<broker::zeek::Event> detail::to_broker_event(const zeek::cluster::Event& ev) {
|
||||||
broker::vector xs;
|
broker::vector xs;
|
||||||
xs.reserve(ev.Args().size());
|
xs.reserve(ev.Args().size());
|
||||||
|
|
||||||
|
@ -96,7 +95,7 @@ std::optional<broker::zeek::Event> detail::to_broker_event(const detail::Event&
|
||||||
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
|
return broker::zeek::Event(ev.HandlerName(), xs, broker_meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
|
std::optional<zeek::cluster::Event> detail::to_zeek_event(const broker::zeek::Event& ev) {
|
||||||
auto&& name = ev.name();
|
auto&& name = ev.name();
|
||||||
auto&& args = ev.args();
|
auto&& args = ev.args();
|
||||||
|
|
||||||
|
@ -141,10 +140,10 @@ std::optional<detail::Event> detail::to_zeek_event(const broker::zeek::Event& ev
|
||||||
}
|
}
|
||||||
|
|
||||||
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
|
auto meta = cluster::detail::metadata_vector_from_broker_event(ev);
|
||||||
return cluster::detail::Event{handler, std::move(vl), std::move(meta)};
|
return zeek::cluster::Event{handler, std::move(vl), std::move(meta)};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
|
bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const zeek::cluster::Event& event) {
|
||||||
auto ev = to_broker_event(event);
|
auto ev = to_broker_event(event);
|
||||||
if ( ! ev )
|
if ( ! ev )
|
||||||
return false;
|
return false;
|
||||||
|
@ -160,7 +159,7 @@ bool detail::BrokerBinV1_Serializer::SerializeEvent(byte_buffer& buf, const deta
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<detail::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
std::optional<zeek::cluster::Event> detail::BrokerBinV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||||
auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "",
|
auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "",
|
||||||
buf.data(), buf.size());
|
buf.data(), buf.size());
|
||||||
if ( ! r )
|
if ( ! r )
|
||||||
|
@ -185,7 +184,7 @@ struct PushBackAdapter {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) {
|
bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const zeek::cluster::Event& event) {
|
||||||
auto ev = to_broker_event(event);
|
auto ev = to_broker_event(event);
|
||||||
if ( ! ev )
|
if ( ! ev )
|
||||||
return false;
|
return false;
|
||||||
|
@ -195,7 +194,7 @@ bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const det
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<detail::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
std::optional<zeek::cluster::Event> detail::BrokerJsonV1_Serializer::UnserializeEvent(byte_buffer_span buf) {
|
||||||
broker::variant res;
|
broker::variant res;
|
||||||
auto err =
|
auto err =
|
||||||
broker::format::json::v1::decode(std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()}, res);
|
broker::format::json::v1::decode(std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()}, res);
|
||||||
|
@ -213,7 +212,7 @@ TEST_SUITE_BEGIN("cluster serializer broker");
|
||||||
|
|
||||||
TEST_CASE("roundtrip") {
|
TEST_CASE("roundtrip") {
|
||||||
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
auto* handler = zeek::event_registry->Lookup("Supervisor::node_status");
|
||||||
detail::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
|
zeek::cluster::Event e{handler, zeek::Args{zeek::make_intrusive<zeek::StringVal>("TEST"), zeek::val_mgr->Count(42)},
|
||||||
nullptr};
|
nullptr};
|
||||||
|
|
||||||
// Register network timestamp metadata. This is idempotent.
|
// Register network timestamp metadata. This is idempotent.
|
||||||
|
|
|
@ -42,9 +42,9 @@ zeek::detail::EventMetadataVectorPtr metadata_vector_from_broker_event(const bro
|
||||||
* to the appropriate Val instances.
|
* to the appropriate Val instances.
|
||||||
*
|
*
|
||||||
* @param ev The broker side event.
|
* @param ev The broker side event.
|
||||||
* @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed.
|
* @returns A zeek::cluster::Event instance, or std::nullopt if the conversion failed.
|
||||||
*/
|
*/
|
||||||
std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev);
|
std::optional<cluster::Event> to_zeek_event(const broker::zeek::Event& ev);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a cluster::detail::Event to a broker::zeek::Event.
|
* Convert a cluster::detail::Event to a broker::zeek::Event.
|
||||||
|
@ -52,7 +52,7 @@ std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev);
|
||||||
* @param ev The cluster::detail::Event
|
* @param ev The cluster::detail::Event
|
||||||
* @return A broker::zeek::Event to be serialized, or nullopt in case of errors.
|
* @return A broker::zeek::Event to be serialized, or nullopt in case of errors.
|
||||||
*/
|
*/
|
||||||
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev);
|
std::optional<broker::zeek::Event> to_broker_event(const cluster::Event& ev);
|
||||||
|
|
||||||
// Implementation of the EventSerializer using the existing broker::detail::val_to_data()
|
// Implementation of the EventSerializer using the existing broker::detail::val_to_data()
|
||||||
// and broker::format::bin::v1::encode().
|
// and broker::format::bin::v1::encode().
|
||||||
|
@ -60,9 +60,9 @@ class BrokerBinV1_Serializer : public EventSerializer {
|
||||||
public:
|
public:
|
||||||
BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {}
|
BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {}
|
||||||
|
|
||||||
bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override;
|
bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) override;
|
||||||
|
|
||||||
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
|
std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data()
|
// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data()
|
||||||
|
@ -71,9 +71,9 @@ class BrokerJsonV1_Serializer : public EventSerializer {
|
||||||
public:
|
public:
|
||||||
BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {}
|
BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {}
|
||||||
|
|
||||||
bool SerializeEvent(byte_buffer& buf, const detail::Event& event) override;
|
bool SerializeEvent(byte_buffer& buf, const cluster::Event& event) override;
|
||||||
|
|
||||||
std::optional<detail::Event> UnserializeEvent(byte_buffer_span buf) override;
|
std::optional<cluster::Event> UnserializeEvent(byte_buffer_span buf) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace cluster::detail
|
} // namespace cluster::detail
|
||||||
|
|
|
@ -55,7 +55,7 @@ private:
|
||||||
* will need some abstractions if client's can opt to use different encodings
|
* will need some abstractions if client's can opt to use different encodings
|
||||||
* of events in the future.
|
* of events in the future.
|
||||||
*/
|
*/
|
||||||
bool DoProcessEvent(std::string_view topic, zeek::cluster::detail::Event e) override {
|
bool DoProcessEvent(std::string_view topic, zeek::cluster::Event e) override {
|
||||||
// If the client has left, no point in sending it any pending event.
|
// If the client has left, no point in sending it any pending event.
|
||||||
if ( wsc->IsTerminated() )
|
if ( wsc->IsTerminated() )
|
||||||
return true;
|
return true;
|
||||||
|
@ -71,7 +71,7 @@ private:
|
||||||
// if that's faster.
|
// if that's faster.
|
||||||
auto ev = zeek::cluster::detail::to_broker_event(e);
|
auto ev = zeek::cluster::detail::to_broker_event(e);
|
||||||
if ( ! ev ) {
|
if ( ! ev ) {
|
||||||
fprintf(stderr, "[ERROR] Unable to go from detail::Event to broker::event\n");
|
fprintf(stderr, "[ERROR] Unable to go from cluster::Event to broker::event\n");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -938,7 +938,7 @@ void Manager::HookUnprocessedPacket(const Packet* packet) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool Manager::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||||
zeek::cluster::detail::Event& event) const {
|
zeek::cluster::Event& event) const {
|
||||||
HookArgumentList args;
|
HookArgumentList args;
|
||||||
|
|
||||||
if ( HavePluginForHook(META_HOOK_PRE) ) {
|
if ( HavePluginForHook(META_HOOK_PRE) ) {
|
||||||
|
|
|
@ -456,8 +456,7 @@ public:
|
||||||
* @return true if event should be published, false if the publish
|
* @return true if event should be published, false if the publish
|
||||||
* operation should be skipped.
|
* operation should be skipped.
|
||||||
*/
|
*/
|
||||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) const;
|
||||||
zeek::cluster::detail::Event& event) const;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal method that registers a freshly instantiated plugin with
|
* Internal method that registers a freshly instantiated plugin with
|
||||||
|
|
|
@ -350,8 +350,7 @@ bool Plugin::HookReporter(const std::string& prefix, const EventHandlerPtr event
|
||||||
|
|
||||||
void Plugin::HookUnprocessedPacket(const Packet* packet) {}
|
void Plugin::HookUnprocessedPacket(const Packet* packet) {}
|
||||||
|
|
||||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||||
zeek::cluster::detail::Event& event) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,10 +54,7 @@ class Frame;
|
||||||
|
|
||||||
namespace cluster {
|
namespace cluster {
|
||||||
class Backend;
|
class Backend;
|
||||||
|
|
||||||
namespace detail {
|
|
||||||
class Event;
|
class Event;
|
||||||
}
|
|
||||||
} // namespace cluster
|
} // namespace cluster
|
||||||
|
|
||||||
namespace plugin {
|
namespace plugin {
|
||||||
|
@ -433,7 +430,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Constructor with cluster event argument.
|
* Constructor with cluster event argument.
|
||||||
*/
|
*/
|
||||||
explicit HookArgument(zeek::cluster::detail::Event* event) {
|
explicit HookArgument(zeek::cluster::Event* event) {
|
||||||
type = CLUSTER_EVENT;
|
type = CLUSTER_EVENT;
|
||||||
arg.cluster_event = event;
|
arg.cluster_event = event;
|
||||||
}
|
}
|
||||||
|
@ -592,7 +589,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Returns the value for a cluster event argument.
|
* Returns the value for a cluster event argument.
|
||||||
*/
|
*/
|
||||||
const zeek::cluster::detail::Event* AsClusterEvent() const {
|
const zeek::cluster::Event* AsClusterEvent() const {
|
||||||
assert(type == CLUSTER_EVENT);
|
assert(type == CLUSTER_EVENT);
|
||||||
return arg.cluster_event;
|
return arg.cluster_event;
|
||||||
}
|
}
|
||||||
|
@ -627,7 +624,7 @@ private:
|
||||||
const zeek::detail::Location* loc;
|
const zeek::detail::Location* loc;
|
||||||
const Packet* packet;
|
const Packet* packet;
|
||||||
const cluster::Backend* cluster_backend;
|
const cluster::Backend* cluster_backend;
|
||||||
const cluster::detail::Event* cluster_event;
|
const cluster::Event* cluster_event;
|
||||||
} arg;
|
} arg;
|
||||||
|
|
||||||
// Outside union because these have dtors.
|
// Outside union because these have dtors.
|
||||||
|
@ -1148,7 +1145,7 @@ protected:
|
||||||
* operation should be skipped.
|
* operation should be skipped.
|
||||||
*/
|
*/
|
||||||
virtual bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
virtual bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||||
zeek::cluster::detail::Event& event);
|
zeek::cluster::Event& event);
|
||||||
|
|
||||||
// Meta hooks.
|
// Meta hooks.
|
||||||
virtual void MetaHookPre(HookType hook, const HookArgumentList& args);
|
virtual void MetaHookPre(HookType hook, const HookArgumentList& args);
|
||||||
|
|
|
@ -43,8 +43,7 @@ static void describe_hook_args(const zeek::plugin::HookArgumentList& args, zeek:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||||
zeek::cluster::detail::Event& event) {
|
|
||||||
std::fprintf(stdout, "%.6f %-15s backend=%s topic=%s event=%s\n", zeek::run_state::network_time,
|
std::fprintf(stdout, "%.6f %-15s backend=%s topic=%s event=%s\n", zeek::run_state::network_time,
|
||||||
" HookPublishEvent", backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
" HookPublishEvent", backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ protected:
|
||||||
void InitPostScript() override;
|
void InitPostScript() override;
|
||||||
|
|
||||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||||
zeek::cluster::detail::Event& event) override;
|
zeek::cluster::Event& event) override;
|
||||||
|
|
||||||
void MetaHookPre(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args) override;
|
void MetaHookPre(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args) override;
|
||||||
void MetaHookPost(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args,
|
void MetaHookPost(zeek::plugin::HookType hook, const zeek::plugin::HookArgumentList& args,
|
||||||
|
|
|
@ -31,8 +31,7 @@ void Plugin::InitPostScript() {
|
||||||
std::fprintf(stdout, "%.6f %-15s\n", zeek::run_state::network_time, " InitPostScript");
|
std::fprintf(stdout, "%.6f %-15s\n", zeek::run_state::network_time, " InitPostScript");
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool Plugin::HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic, zeek::cluster::Event& event) {
|
||||||
zeek::cluster::detail::Event& event) {
|
|
||||||
std::fprintf(stdout, "%.6f %s backend=%s topic=%s event=%s\n", zeek::run_state::network_time, "HookPublishEvent",
|
std::fprintf(stdout, "%.6f %s backend=%s topic=%s event=%s\n", zeek::run_state::network_time, "HookPublishEvent",
|
||||||
backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
backend.Name().c_str(), topic.c_str(), std::string(event.HandlerName()).c_str());
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ protected:
|
||||||
void InitPostScript() override;
|
void InitPostScript() override;
|
||||||
|
|
||||||
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
bool HookPublishEvent(zeek::cluster::Backend& backend, const std::string& topic,
|
||||||
zeek::cluster::detail::Event& event) override;
|
zeek::cluster::Event& event) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern Plugin plugin;
|
extern Plugin plugin;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue