From 0b7a660a34a6c5299a8218eefc9e5116bf05924a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 9 Jan 2025 09:58:32 +0100 Subject: [PATCH] cluster/Backend: Make backend event processing customizable This allows configurability at the code level to decide what to do with a received remote events and events produced by a backend. For now, only enqueue events into the process's script layer, but for the WebSocket interface, the action would be to send out the event on a WebSocket connection instead. --- src/broker/Manager.cc | 2 +- src/broker/Plugin.cc | 5 +- src/cluster/Backend.cc | 25 ++++++-- src/cluster/Backend.h | 89 +++++++++++++++++++++++++++- src/cluster/Component.h | 3 +- src/cluster/Manager.cc | 13 ++-- src/cluster/Manager.h | 4 +- src/cluster/backend/zeromq/ZeroMQ.cc | 9 ++- src/cluster/backend/zeromq/ZeroMQ.h | 8 ++- src/zeek-setup.cc | 8 ++- 10 files changed, 139 insertions(+), 27 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 1dec2ef084..eb4d953889 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -213,7 +213,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) { +Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; diff --git a/src/broker/Plugin.cc b/src/broker/Plugin.cc index 7fd982efa9..ea22a8ce89 100644 --- a/src/broker/Plugin.cc +++ b/src/broker/Plugin.cc @@ -15,8 +15,9 @@ zeek::plugin::Configuration Plugin::Configure() { // instantiated in zeek-setup.cc. Don't even allow to instantiate // a second one via the plugin mechanism. In the future, this could // be changed so that broker is instantiated on demand only. - auto fail_instantiate = [](std::unique_ptr, - std::unique_ptr) -> std::unique_ptr { + auto fail_instantiate = + [](std::unique_ptr, std::unique_ptr, + std::unique_ptr) -> std::unique_ptr { zeek::reporter->FatalError("do not instantiate broker explicitly"); return nullptr; }; diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 8a491e72be..177997238b 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -13,9 +13,20 @@ #include "zeek/cluster/Serializer.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" +#include "zeek/util.h" using namespace zeek::cluster; + +bool detail::LocalEventHandlingStrategy::DoHandleRemoteEvent(std::string_view topic, detail::Event e) { + zeek::event_mgr.Enqueue(e.Handler(), std::move(e.args), util::detail::SOURCE_BROKER, 0, nullptr, e.timestamp); + return true; +} + +void detail::LocalEventHandlingStrategy::DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) { + zeek::event_mgr.Enqueue(h, std::move(args)); +} + std::optional detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { const auto& func_type = handler->GetType(); @@ -58,6 +69,10 @@ std::optional detail::check_args(const zeek::FuncValPtr& handler, ze return result; } +Backend::Backend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) + : event_serializer(std::move(es)), log_serializer(std::move(ls)), event_handling_strategy(std::move(ehs)) {} + std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { auto checked_args = detail::check_args(handler, args); if ( ! checked_args ) @@ -96,6 +111,10 @@ bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& he return DoPublishLogWrites(header, log_serializer->Name(), buf); } +void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) { + event_handling_strategy->EnqueueLocalEvent(h, std::move(args)); +} + bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format, const detail::byte_buffer_span payload) { if ( format != event_serializer->Name() ) { @@ -113,11 +132,7 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma return false; } - auto& event = *r; - zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr, - event.timestamp); - - return true; + return event_handling_strategy->HandleRemoteEvent(topic, std::move(*r)); } bool Backend::ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload) { diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index df8edf01b3..bef6e3fa4e 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -53,6 +53,75 @@ public: const EventHandlerPtr& Handler() const { return handler; } }; +/** + * Interface for processing cluster::Event instances received + * on a given topic. + * + * An instances is injected into Backend instances to allow + * modifying the behavior for received events. For instance, + * for backends instantiated for WebSocket clients, events + * should not be raised as Zeek events locally and instead + * transmitted to the WebSocket client. + */ +class EventHandlingStrategy { +public: + virtual ~EventHandlingStrategy() = default; + + /** + * Method for processing a remote event received on the given topic. + * + * When handling the remote event fails, this method should return false. + * + * @param topic The topic on which the event was received. + * @param ev The parsed event that was received. + * + * @return true if the remote event was handled successfully, else false. + */ + bool HandleRemoteEvent(std::string_view topic, Event e) { return DoHandleRemoteEvent(topic, std::move(e)); } + + /** + * Method for enquing backend specific events. + * + * Some backend's may raise events destined for the local + * scripting layer. That's usually wanted, but not always. + * When the backend is instantiated for a WebSocket client, + * local scripting layer should not raise events for the + * WebSocket client. + + * @param h The event handler to use. + * @param args The event arguments. + */ + void EnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) { DoEnqueueLocalEvent(h, std::move(args)); } + +private: + /** + * Hook method for implementing HandleRemoteEvent(). + * + * @param topic The topic on which the event was received. + * @param ev The parsed event that was received. + * + * @return true if the remote event was handled successfully, else false. + */ + virtual bool DoHandleRemoteEvent(std::string_view topic, Event e) = 0; + + /** + * Hook method for implementing EnqueueLocalEvent(). + * + * @param h The event handler to use. + * @param args The event arguments. + */ + virtual void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) = 0; +}; + +/** + * Strategy enqueueing events into this process's Zeek event loop. + */ +class LocalEventHandlingStrategy : public EventHandlingStrategy { +private: + bool DoHandleRemoteEvent(std::string_view topic, Event e) override; + void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) override; +}; + /** * Validate that the provided args are suitable for handler. * @@ -143,9 +212,24 @@ public: protected: /** * Constructor. + * + * @param es The event serializer to use. + * @param ls The log batch serializer to use. + * @param ehs The event handling strategy to use for this backend. */ - Backend(std::unique_ptr es, std::unique_ptr ls) - : event_serializer(std::move(es)), log_serializer(std::move(ls)) {} + Backend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs); + + /** + * Enqueue an event to be raised to this process Zeek scripting layer. + * + * When a backend is used for a WebSocket client connection, events + * raised through this method are blackholed. + * + * @param h The event handler. + * @param args The event arguments. + */ + void EnqueueEvent(EventHandlerPtr h, zeek::Args args); /** * Process an incoming event message. @@ -277,6 +361,7 @@ private: std::unique_ptr event_serializer; std::unique_ptr log_serializer; + std::unique_ptr event_handling_strategy; }; /** diff --git a/src/cluster/Component.h b/src/cluster/Component.h index 944557feae..49f8491a56 100644 --- a/src/cluster/Component.h +++ b/src/cluster/Component.h @@ -13,7 +13,8 @@ namespace zeek::cluster { class BackendComponent : public plugin::Component { public: using factory_callback = std::unique_ptr (*)(std::unique_ptr, - std::unique_ptr); + std::unique_ptr, + std::unique_ptr); /** * Constructor. diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc index e4e1ac8f71..ffac82c396 100644 --- a/src/cluster/Manager.cc +++ b/src/cluster/Manager.cc @@ -11,11 +11,14 @@ Manager::Manager() event_serializers(plugin::ComponentManager("Cluster", "EventSerializerTag")), log_serializers(plugin::ComponentManager("Cluster", "LogSerializerTag")) {} -std::unique_ptr Manager::InstantiateBackend(const zeek::EnumValPtr& tag, - std::unique_ptr event_serializer, - std::unique_ptr log_serializer) { - const BackendComponent* c = Backends().Lookup(tag); - return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr; +std::unique_ptr Manager::InstantiateBackend( + const zeek::EnumValPtr& tag, std::unique_ptr event_serializer, + std::unique_ptr log_serializer, + std::unique_ptr event_handling_strategy) { + if ( const auto* c = Backends().Lookup(tag) ) + return c->Factory()(std::move(event_serializer), std::move(log_serializer), std::move(event_handling_strategy)); + + return nullptr; } std::unique_ptr Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) { diff --git a/src/cluster/Manager.h b/src/cluster/Manager.h index fd4a44f31c..3e64924039 100644 --- a/src/cluster/Manager.h +++ b/src/cluster/Manager.h @@ -27,12 +27,14 @@ public: * @param tag The enum value identifying the backend. * @param event_serializer The event serializer to inject. * @param log_serializer The log serializer to inject. + * @param event_handling_strategy The event handling strategy to inject. * * @return New ClusterBackend instance, or null if there's no such component. */ std::unique_ptr InstantiateBackend(const EnumValPtr& tag, std::unique_ptr event_serializer, - std::unique_ptr log_serializer); + std::unique_ptr log_serializer, + std::unique_ptr event_handling_strategy); /** * Instantiate a event serializer with the given enum value. diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 02fd795bbf..7825ab5d82 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -14,7 +14,6 @@ #include #include "zeek/DebugLogger.h" -#include "zeek/Event.h" #include "zeek/EventRegistry.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" @@ -66,9 +65,9 @@ void self_thread_fun(void* arg) { } // namespace -// Constructor. -ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls) - : ThreadedBackend(std::move(es), std::move(ls)) { +ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) + : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_push = zmq::socket_t(ctx, zmq::socket_type::push); @@ -555,7 +554,7 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span pa zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription; ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str()); - zeek::event_mgr.Enqueue(eh, zeek::make_intrusive(topic)); + EnqueueEvent(eh, zeek::Args{zeek::make_intrusive(topic)}); return true; } else { diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 8a715b8c28..20df495453 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -17,7 +17,8 @@ public: /** * Constructor. */ - ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls); + ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs); /** * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen @@ -34,8 +35,9 @@ public: * Component factory. */ static std::unique_ptr Instantiate(std::unique_ptr event_serializer, - std::unique_ptr log_serializer) { - return std::make_unique(std::move(event_serializer), std::move(log_serializer)); + std::unique_ptr log_serializer, + std::unique_ptr ehs) { + return std::make_unique(std::move(event_serializer), std::move(log_serializer), std::move(ehs)); } private: diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 446ff82215..99c94d8c21 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -849,8 +849,12 @@ SetupResult setup(int argc, char** argv, Options* zopts) { exit(1); } - auto backend = cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), - std::move(log_serializer)); + auto event_handling_strategy = std::make_unique(); + + auto backend = + cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer), + std::move(log_serializer), std::move(event_handling_strategy)); + if ( ! backend ) { reporter->Error("Failed to instantiate cluster backend: %s", zeek::obj_desc_short(cluster_backend_val.get()).c_str());