cluster/Backend: Make backend event processing customizable

This allows configurability at the code level to decide what to do with
a received remote events and events produced by a backend. For now, only
enqueue events into the process's script layer, but for the WebSocket
interface, the action would be to send out the event on a WebSocket
connection instead.
This commit is contained in:
Arne Welzel 2025-01-09 09:58:32 +01:00
parent 337b62960b
commit 0b7a660a34
10 changed files with 139 additions and 27 deletions

View file

@ -213,7 +213,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const
} // namespace
#endif
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) {
Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr, nullptr) {
bound_port = 0;
use_real_time = arg_use_real_time;
peer_count = 0;

View file

@ -15,8 +15,9 @@ zeek::plugin::Configuration Plugin::Configure() {
// instantiated in zeek-setup.cc. Don't even allow to instantiate
// a second one via the plugin mechanism. In the future, this could
// be changed so that broker is instantiated on demand only.
auto fail_instantiate = [](std::unique_ptr<cluster::EventSerializer>,
std::unique_ptr<cluster::LogSerializer>) -> std::unique_ptr<cluster::Backend> {
auto fail_instantiate =
[](std::unique_ptr<cluster::EventSerializer>, std::unique_ptr<cluster::LogSerializer>,
std::unique_ptr<cluster::detail::EventHandlingStrategy>) -> std::unique_ptr<cluster::Backend> {
zeek::reporter->FatalError("do not instantiate broker explicitly");
return nullptr;
};

View file

@ -13,9 +13,20 @@
#include "zeek/cluster/Serializer.h"
#include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h"
#include "zeek/util.h"
using namespace zeek::cluster;
bool detail::LocalEventHandlingStrategy::DoHandleRemoteEvent(std::string_view topic, detail::Event e) {
zeek::event_mgr.Enqueue(e.Handler(), std::move(e.args), util::detail::SOURCE_BROKER, 0, nullptr, e.timestamp);
return true;
}
void detail::LocalEventHandlingStrategy::DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) {
zeek::event_mgr.Enqueue(h, std::move(args));
}
std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) {
const auto& func_type = handler->GetType<zeek::FuncType>();
@ -58,6 +69,10 @@ std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, ze
return result;
}
Backend::Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: event_serializer(std::move(es)), log_serializer(std::move(ls)), event_handling_strategy(std::move(ehs)) {}
std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const {
auto checked_args = detail::check_args(handler, args);
if ( ! checked_args )
@ -96,6 +111,10 @@ bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& he
return DoPublishLogWrites(header, log_serializer->Name(), buf);
}
void Backend::EnqueueEvent(EventHandlerPtr h, zeek::Args args) {
event_handling_strategy->EnqueueLocalEvent(h, std::move(args));
}
bool Backend::ProcessEventMessage(std::string_view topic, std::string_view format,
const detail::byte_buffer_span payload) {
if ( format != event_serializer->Name() ) {
@ -113,11 +132,7 @@ bool Backend::ProcessEventMessage(std::string_view topic, std::string_view forma
return false;
}
auto& event = *r;
zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr,
event.timestamp);
return true;
return event_handling_strategy->HandleRemoteEvent(topic, std::move(*r));
}
bool Backend::ProcessLogMessage(std::string_view format, detail::byte_buffer_span payload) {

View file

@ -53,6 +53,75 @@ public:
const EventHandlerPtr& Handler() const { return handler; }
};
/**
* Interface for processing cluster::Event instances received
* on a given topic.
*
* An instances is injected into Backend instances to allow
* modifying the behavior for received events. For instance,
* for backends instantiated for WebSocket clients, events
* should not be raised as Zeek events locally and instead
* transmitted to the WebSocket client.
*/
class EventHandlingStrategy {
public:
virtual ~EventHandlingStrategy() = default;
/**
* Method for processing a remote event received on the given topic.
*
* When handling the remote event fails, this method should return false.
*
* @param topic The topic on which the event was received.
* @param ev The parsed event that was received.
*
* @return true if the remote event was handled successfully, else false.
*/
bool HandleRemoteEvent(std::string_view topic, Event e) { return DoHandleRemoteEvent(topic, std::move(e)); }
/**
* Method for enquing backend specific events.
*
* Some backend's may raise events destined for the local
* scripting layer. That's usually wanted, but not always.
* When the backend is instantiated for a WebSocket client,
* local scripting layer should not raise events for the
* WebSocket client.
* @param h The event handler to use.
* @param args The event arguments.
*/
void EnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) { DoEnqueueLocalEvent(h, std::move(args)); }
private:
/**
* Hook method for implementing HandleRemoteEvent().
*
* @param topic The topic on which the event was received.
* @param ev The parsed event that was received.
*
* @return true if the remote event was handled successfully, else false.
*/
virtual bool DoHandleRemoteEvent(std::string_view topic, Event e) = 0;
/**
* Hook method for implementing EnqueueLocalEvent().
*
* @param h The event handler to use.
* @param args The event arguments.
*/
virtual void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) = 0;
};
/**
* Strategy enqueueing events into this process's Zeek event loop.
*/
class LocalEventHandlingStrategy : public EventHandlingStrategy {
private:
bool DoHandleRemoteEvent(std::string_view topic, Event e) override;
void DoEnqueueLocalEvent(EventHandlerPtr h, zeek::Args args) override;
};
/**
* Validate that the provided args are suitable for handler.
*
@ -143,9 +212,24 @@ public:
protected:
/**
* Constructor.
*
* @param es The event serializer to use.
* @param ls The log batch serializer to use.
* @param ehs The event handling strategy to use for this backend.
*/
Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls)
: event_serializer(std::move(es)), log_serializer(std::move(ls)) {}
Backend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs);
/**
* Enqueue an event to be raised to this process Zeek scripting layer.
*
* When a backend is used for a WebSocket client connection, events
* raised through this method are blackholed.
*
* @param h The event handler.
* @param args The event arguments.
*/
void EnqueueEvent(EventHandlerPtr h, zeek::Args args);
/**
* Process an incoming event message.
@ -277,6 +361,7 @@ private:
std::unique_ptr<EventSerializer> event_serializer;
std::unique_ptr<LogSerializer> log_serializer;
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy;
};
/**

View file

@ -13,7 +13,8 @@ namespace zeek::cluster {
class BackendComponent : public plugin::Component {
public:
using factory_callback = std::unique_ptr<Backend> (*)(std::unique_ptr<EventSerializer>,
std::unique_ptr<LogSerializer>);
std::unique_ptr<LogSerializer>,
std::unique_ptr<detail::EventHandlingStrategy>);
/**
* Constructor.

View file

@ -11,11 +11,14 @@ Manager::Manager()
event_serializers(plugin::ComponentManager<EventSerializerComponent>("Cluster", "EventSerializerTag")),
log_serializers(plugin::ComponentManager<LogSerializerComponent>("Cluster", "LogSerializerTag")) {}
std::unique_ptr<Backend> Manager::InstantiateBackend(const zeek::EnumValPtr& tag,
std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) {
const BackendComponent* c = Backends().Lookup(tag);
return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr;
std::unique_ptr<Backend> Manager::InstantiateBackend(
const zeek::EnumValPtr& tag, std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer,
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy) {
if ( const auto* c = Backends().Lookup(tag) )
return c->Factory()(std::move(event_serializer), std::move(log_serializer), std::move(event_handling_strategy));
return nullptr;
}
std::unique_ptr<EventSerializer> Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) {

View file

@ -27,12 +27,14 @@ public:
* @param tag The enum value identifying the backend.
* @param event_serializer The event serializer to inject.
* @param log_serializer The log serializer to inject.
* @param event_handling_strategy The event handling strategy to inject.
*
* @return New ClusterBackend instance, or null if there's no such component.
*/
std::unique_ptr<Backend> InstantiateBackend(const EnumValPtr& tag,
std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer);
std::unique_ptr<LogSerializer> log_serializer,
std::unique_ptr<detail::EventHandlingStrategy> event_handling_strategy);
/**
* Instantiate a event serializer with the given enum value.

View file

@ -14,7 +14,6 @@
#include <zmq.hpp>
#include "zeek/DebugLogger.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
@ -66,9 +65,9 @@ void self_thread_fun(void* arg) {
} // namespace
// Constructor.
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls)
: ThreadedBackend(std::move(es), std::move(ls)) {
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
@ -555,7 +554,7 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span pa
zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription;
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str());
zeek::event_mgr.Enqueue(eh, zeek::make_intrusive<zeek::StringVal>(topic));
EnqueueEvent(eh, zeek::Args{zeek::make_intrusive<zeek::StringVal>(topic)});
return true;
}
else {

View file

@ -17,7 +17,8 @@ public:
/**
* Constructor.
*/
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls);
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs);
/**
* Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen
@ -34,8 +35,9 @@ public:
* Component factory.
*/
static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) {
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer));
std::unique_ptr<LogSerializer> log_serializer,
std::unique_ptr<detail::EventHandlingStrategy> ehs) {
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer), std::move(ehs));
}
private:

View file

@ -849,8 +849,12 @@ SetupResult setup(int argc, char** argv, Options* zopts) {
exit(1);
}
auto backend = cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer),
std::move(log_serializer));
auto event_handling_strategy = std::make_unique<cluster::detail::LocalEventHandlingStrategy>();
auto backend =
cluster::manager->InstantiateBackend(cluster_backend_val, std::move(event_serializer),
std::move(log_serializer), std::move(event_handling_strategy));
if ( ! backend ) {
reporter->Error("Failed to instantiate cluster backend: %s",
zeek::obj_desc_short(cluster_backend_val.get()).c_str());