From 349d88153f25996b6aeff10bfaeb2470fa65dae9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 21 Nov 2024 18:45:23 +0100 Subject: [PATCH 01/13] broker: Pass frame to MakeEvent() This was lost in a prior change. --- src/broker/messaging.bif | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index 632cf06ce5..aba84fe344 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -99,7 +99,7 @@ function Broker::make_event%(...%): Broker::Event %{ zeek::Broker::Manager::ScriptScopeGuard ssg; - auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@}); + auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@}, frame); return zeek::cast_intrusive(ev); %} From 35eadf0ceb0c09e5c3c460654caaae2b76921301 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 10:30:57 +0100 Subject: [PATCH 02/13] plugin: Add component enums for pluggable cluster backends --- src/plugin/Component.cc | 6 ++++++ src/plugin/Component.h | 21 ++++++++++++--------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/plugin/Component.cc b/src/plugin/Component.cc index 5740d7f19a..96476f2aea 100644 --- a/src/plugin/Component.cc +++ b/src/plugin/Component.cc @@ -39,6 +39,12 @@ void Component::Describe(ODesc* d) const { case component::SESSION_ADAPTER: d->Add("Session Adapter"); break; + case component::CLUSTER_BACKEND: d->Add("Cluster Backend"); break; + + case component::EVENT_SERIALIZER: d->Add("Event Serializer"); break; + + case component::LOG_SERIALIZER: d->Add("Log Serializer"); break; + default: reporter->InternalWarning("unknown component type in plugin::Component::Describe"); d->Add(""); diff --git a/src/plugin/Component.h b/src/plugin/Component.h index d4eb296bf3..f61fe0555b 100644 --- a/src/plugin/Component.h +++ b/src/plugin/Component.h @@ -21,15 +21,18 @@ namespace component { * Component types. */ enum Type { - READER, /// An input reader (not currently used). - WRITER, /// A logging writer (not currently used). - ANALYZER, /// A protocol analyzer. - PACKET_ANALYZER, /// A packet analyzer. - FILE_ANALYZER, /// A file analyzer. - IOSOURCE, /// An I/O source, excluding packet sources. - PKTSRC, /// A packet source. - PKTDUMPER, /// A packet dumper. - SESSION_ADAPTER, /// A session adapter analyzer. + READER, /// An input reader (not currently used). + WRITER, /// A logging writer (not currently used). + ANALYZER, /// A protocol analyzer. + PACKET_ANALYZER, /// A packet analyzer. + FILE_ANALYZER, /// A file analyzer. + IOSOURCE, /// An I/O source, excluding packet sources. + PKTSRC, /// A packet source. + PKTDUMPER, /// A packet dumper. + SESSION_ADAPTER, /// A session adapter analyzer. + CLUSTER_BACKEND, /// A cluster backend. + EVENT_SERIALIZER, /// A serializer for events, used by cluster backends. + LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends. }; } // namespace component From d2633163bd97af2a77383dfa1668be516e7c68e0 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 10:28:18 +0100 Subject: [PATCH 03/13] DebugLogger: Add cluster debugging stream --- src/DebugLogger.cc | 3 ++- src/DebugLogger.h | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index 8e964aac32..c9a17b97f8 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -19,7 +19,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = {"main-loop", 0, false}, {"dpd", 0, false}, {"packet_analysis", 0, false}, {"file_analysis", 0, false}, {"tm", 0, false}, {"logging", 0, false}, {"input", 0, false}, {"threading", 0, false}, {"plugins", 0, false}, {"zeekygen", 0, false}, {"pktio", 0, false}, {"broker", 0, false}, - {"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false}}; + {"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false}, + {"cluster", 0, false}}; DebugLogger::DebugLogger() { verbose = false; diff --git a/src/DebugLogger.h b/src/DebugLogger.h index a8df9cd977..3bb34e559d 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -54,6 +54,7 @@ enum DebugStream { DBG_SUPERVISOR, // Process supervisor DBG_HASHKEY, // HashKey buffers DBG_SPICY, // Spicy functionality + DBG_CLUSTER, // Cluster functionality NUM_DBGS // Has to be last }; From 60fb212f17bec69091097afddda96c92632c9707 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 12:23:21 +0100 Subject: [PATCH 04/13] SerialTypes/Field: Allow default construction and add move constructor This is in preparation of using SerialTypes to serialize and unserialize complete log batches which include Field instances and not just Value's. --- src/threading/SerialTypes.h | 39 ++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/threading/SerialTypes.h b/src/threading/SerialTypes.h index dabf27210e..1a0ea21f1b 100644 --- a/src/threading/SerialTypes.h +++ b/src/threading/SerialTypes.h @@ -19,13 +19,21 @@ namespace zeek::threading { * Definition of a log file, i.e., one column of a log stream. */ struct Field { - const char* name; //! Name of the field. + const char* name = nullptr; //! Name of the field. //! Needed by input framework. Port fields have two names (one for the //! port, one for the type), and this specifies the secondary name. - const char* secondary_name; - TypeTag type; //! Type of the field. - TypeTag subtype; //! Inner type for sets and vectors. - bool optional; //! True if field is optional. + const char* secondary_name = nullptr; + TypeTag type = TYPE_ERROR; //! Type of the field. + TypeTag subtype = TYPE_ERROR; //! Inner type for sets and vectors. + bool optional = false; //! True if field is optional. + + + /** + * Constructor. + * + * For Read() usage. Initializes with TYPE_ERROR. + */ + Field() = default; /** * Constructor. @@ -47,6 +55,23 @@ struct Field { subtype(other.subtype), optional(other.optional) {} + /** + * Move constructor. + */ + Field(Field&& other) noexcept { + name = other.name; + secondary_name = other.secondary_name; + type = other.type; + subtype = other.subtype; + optional = other.optional; + + other.name = nullptr; + other.secondary_name = nullptr; + other.type = TYPE_ERROR; + other.subtype = TYPE_ERROR; + other.optional = false; + } + ~Field() { delete[] name; delete[] secondary_name; @@ -91,10 +116,6 @@ struct Field { * thread-safe. */ std::string TypeName() const; - -private: - // Force usage of constructor above. - Field() {} }; /** From a946b27faff9d74d32e66cc74684131ce1611796 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 11:31:45 +0100 Subject: [PATCH 05/13] logging: Introduce logging/Types.h Header and implementation for types shared between the cluster and logging framework. The logging framework will be adapted later to use these. For now, the new cluster components will simply reference them. --- src/logging/CMakeLists.txt | 1 + src/logging/Types.cc | 48 ++++++++++++++++++ src/logging/Types.h | 99 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 src/logging/Types.cc create mode 100644 src/logging/Types.h diff --git a/src/logging/CMakeLists.txt b/src/logging/CMakeLists.txt index 7690998520..d064fb278a 100644 --- a/src/logging/CMakeLists.txt +++ b/src/logging/CMakeLists.txt @@ -5,6 +5,7 @@ zeek_add_subdir_library( Manager.cc WriterBackend.cc WriterFrontend.cc + Types.cc BIFS logging.bif) diff --git a/src/logging/Types.cc b/src/logging/Types.cc new file mode 100644 index 0000000000..9fab8baf05 --- /dev/null +++ b/src/logging/Types.cc @@ -0,0 +1,48 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/logging/Types.h" + +#include "zeek/Desc.h" +#include "zeek/Type.h" +#include "zeek/Val.h" + +namespace zeek::logging::detail { + +LogWriteHeader::LogWriteHeader() = default; + +LogWriteHeader::LogWriteHeader(EnumValPtr arg_stream_id, EnumValPtr arg_writer_id, std::string arg_filter_name, + std::string arg_path) + : stream_id(std::move(arg_stream_id)), + writer_id(std::move(arg_writer_id)), + filter_name(std::move(arg_filter_name)), + path(std::move(arg_path)) { + stream_name = obj_desc_short(stream_id.get()); + writer_name = obj_desc_short(writer_id.get()); +} + +LogWriteHeader& LogWriteHeader::operator=(const LogWriteHeader& other) = default; + +LogWriteHeader::~LogWriteHeader() = default; + +bool LogWriteHeader::PopulateEnumVals() { + static const auto& stream_id_type = zeek::id::find_type("Log::ID"); + static const auto& writer_id_type = zeek::id::find_type("Log::Writer"); + + if ( stream_name.empty() || writer_name.empty() ) + return false; + + auto sid = stream_id_type->Lookup(stream_name); + if ( sid < 0 ) + return false; + + auto wid = writer_id_type->Lookup(writer_name); + if ( wid < 0 ) + return false; + + stream_id = stream_id_type->GetEnumVal(sid); + writer_id = writer_id_type->GetEnumVal(wid); + + return true; +} + +} // namespace zeek::logging::detail diff --git a/src/logging/Types.h b/src/logging/Types.h new file mode 100644 index 0000000000..06d83897a8 --- /dev/null +++ b/src/logging/Types.h @@ -0,0 +1,99 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Header for types shared between cluster and logging components. +// +// Currently these are in detail, but over time may move into the +// public namespace once established. + +#pragma once + +#include +#include + +#include "zeek/IntrusivePtr.h" +#include "zeek/threading/SerialTypes.h" + +namespace zeek { +class EnumVal; +using EnumValPtr = IntrusivePtr; + +namespace logging::detail { + +/** + * A single log record. + * + * This is what a Zeek record value passed into Log::write() + * is converted into before passed to a local log writer or + * send via the cluster to a remote node. + */ +using LogRecord = std::vector; + +/** + * A struct holding all necessary information that relates to + * log writes for a given path. These values are constant over + * the lifetime of a \a WriterFrontend. + * + * Note that the constructor, destructor and assignment operator are + * defaulted in Types.cc. This is to avoid a Val.h include here. + */ +struct LogWriteHeader { + /** + * Default constructor. + */ + LogWriteHeader(); + + /** + * Constructor that populates stream_name and writer_name. + * + * @param stream_id Enum value representing the stream. + * @param writer_id Enum value representing the writer. + * @param filter_name The filter name of the writer frontend. + * @param path The path of the writer frontend. + */ + LogWriteHeader(EnumValPtr stream_id, EnumValPtr writer_id, std::string filter_name, std::string path); + + /** + * Assignment operator. + */ + LogWriteHeader& operator=(const LogWriteHeader& other); + + /** + * Destructor. + */ + ~LogWriteHeader(); + + /** + * Helper to populate stream_id and writer_id after the + * stream_name and writer_name members were set. + * + * @return true if matching enum values were, else false. + */ + bool PopulateEnumVals(); + + EnumValPtr stream_id; // The enum identifying the stream. + std::string stream_name; // The name of the stream, e.g. Conn::LOG + EnumValPtr writer_id; // The enum identifying the writer. Mostly for backwards compat with broker. + std::string writer_name; // The name of the writer, e.g. WRITER_ASCII. + std::string filter_name; // The name of the filter. + std::string path; // The path as configured or produced by the filter's path_func. + std::vector fields; // The schema describing a log record. +}; + +/** + * A batch of log records including their header. + * + * This is the object created when unserialzing a log-write + * from another cluster node. + * + * This layout currently implies that during de-serialization + * data is copied into LogRecord / threading::Value structures. + * If the need for zero-copy approaches arises, might need a + * different approach to free the underlying buffer (capnproto). + */ +struct LogWriteBatch { + LogWriteHeader header; + std::vector records; +}; + +} // namespace logging::detail +} // namespace zeek From e94e30616d2863c9fba50622ba33907243efdf2c Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 11:41:25 +0100 Subject: [PATCH 06/13] cluster/Serializer: Interface for event and log serializers --- src/cluster/Serializer.h | 107 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/cluster/Serializer.h diff --git a/src/cluster/Serializer.h b/src/cluster/Serializer.h new file mode 100644 index 0000000000..2be72ceca2 --- /dev/null +++ b/src/cluster/Serializer.h @@ -0,0 +1,107 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Interfaces to be implemented by event and log serializer components. + +#pragma once + +#include +#include +#include + +#include "zeek/Span.h" +#include "zeek/logging/Types.h" + +namespace zeek::cluster { + +namespace detail { +class Event; + +using byte_buffer = std::vector; +using byte_buffer_span = Span; + +} // namespace detail + + +/** + * This class handles encoding of events into byte buffers and back. + * + * An event and its parameters can be serialized as a message which + * another node can unserialize and then enqueue as an event. + */ +class EventSerializer { +public: + virtual ~EventSerializer() = default; + + /** + * Serialize an event into the given byte buffer. + * + * @param buf The buffer to use for serialization. + * @param event The event to serialize. + * + * @returns True on success, false in exceptional cases (e.g. unsupported serialization). + */ + virtual bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) = 0; + + /** + * Unserialize an event from a given byte buffer. + * + * @param buf A span representing a received remote event. + * + * @returns The event, or std::nullopt on error. + */ + virtual std::optional UnserializeEvent(detail::byte_buffer_span buf) = 0; + + /** + * @returns The name of this event serializer instance. + */ + const std::string& Name() { return name; } + +protected: + /** + * Constructor. + */ + EventSerializer(std::string name) : name(std::move(name)) {} + +private: + std::string name; +}; + +/** + * Interface for a serializer for logging::LogRecord instances. + */ +class LogSerializer { +public: + /** + * Constructor. + */ + explicit LogSerializer(std::string name) : name(std::move(name)) {}; + + virtual ~LogSerializer() = default; + + /** + * Serialize log records into a byte buffer. + * + * @param buf The buffer to serialize into. + * @param header The log batch header. + * @param records The actual log writes. + */ + virtual bool SerializeLogWrite(detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header, + zeek::Span records) = 0; + + /** + * Unserialize log writes from a given byte buffer. + * + * @param buf The span representing received log writes. + */ + virtual std::optional UnserializeLogWrite(detail::byte_buffer_span buf) = 0; + + /** + * @returns The name of this log serializer instance. + */ + const std::string& Name() { return name; } + +private: + std::string name; +}; + +} // namespace zeek::cluster From fb23a06f6fc46d16807b1c58d3fe34e03d9e10a6 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 11:52:15 +0100 Subject: [PATCH 07/13] cluster/Backend: Interface for cluster backends --- scripts/base/frameworks/cluster/main.zeek | 8 + src/cluster/Backend.cc | 214 ++++++++++++ src/cluster/Backend.h | 399 ++++++++++++++++++++++ 3 files changed, 621 insertions(+) create mode 100644 src/cluster/Backend.cc create mode 100644 src/cluster/Backend.h diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 3ecdc52420..d27a9160a9 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -280,6 +280,14 @@ export { ## Returns: a topic string that may used to send a message exclusively to ## a given cluster node. global nodeid_topic: function(id: string): string; + + ## An event instance for cluster pub/sub. + type Event: record { + ## The event handler to be invoked on the remote node. + ev: any; + ## The arguments for the event. + args: vector of any; + }; } # Track active nodes per type. diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc new file mode 100644 index 0000000000..3737b9134f --- /dev/null +++ b/src/cluster/Backend.cc @@ -0,0 +1,214 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/Backend.h" + +#include + +#include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" +#include "zeek/Func.h" +#include "zeek/Reporter.h" +#include "zeek/Type.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/iosource/Manager.h" + +using namespace zeek::cluster; + +std::optional detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { + const auto& func_type = handler->GetType(); + + if ( func_type->Flavor() != zeek::FUNC_FLAVOR_EVENT ) { + zeek::reporter->Error("unexpected function type for %s: %s", handler->AsFunc()->GetName().c_str(), + func_type->FlavorString().c_str()); + return std::nullopt; + } + + const auto& types = func_type->ParamList()->GetTypes(); + if ( args.size() != types.size() ) { + zeek::reporter->Error("bad number of arguments for %s: got %zu, expect %zu", + handler->AsFunc()->GetName().c_str(), args.size(), types.size()); + return std::nullopt; + } + + zeek::Args result(args.size()); + + for ( size_t i = 0; i < args.size(); i++ ) { + const auto& a = args[i]; + const auto& got_type = a->GetType(); + const auto& expected_type = types[i]; + + if ( ! same_type(got_type, expected_type) ) { + zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1, + zeek::obj_desc_short(got_type.get()).c_str(), + zeek::obj_desc_short(expected_type.get()).c_str()); + return std::nullopt; + } + + result[i] = args[i]; + } + + return result; +} + +std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { + auto checked_args = detail::check_args(handler, args); + if ( ! checked_args ) + return std::nullopt; + + if ( timestamp == 0.0 ) + timestamp = zeek::event_mgr.CurrentEventTime(); + + const auto& eh = zeek::event_registry->Lookup(handler->AsFuncPtr()->GetName()); + if ( ! eh ) { + zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc(handler.get()).c_str()); + return std::nullopt; + } + + return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp}; +} + +// Default implementation doing the serialization. +bool Backend::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { + cluster::detail::byte_buffer buf; + + if ( ! event_serializer->SerializeEvent(buf, event) ) + return false; + + return DoPublishEvent(topic, event_serializer->Name(), buf); +} + +// Default implementation doing log record serialization. +bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records) { + cluster::detail::byte_buffer buf; + + if ( ! log_serializer->SerializeLogWrite(buf, header, records) ) + return false; + + return DoPublishLogWrites(header, log_serializer->Name(), buf); +} + +bool Backend::ProcessEventMessage(const std::string_view& topic, const std::string_view& format, + const detail::byte_buffer_span payload) { + if ( format != event_serializer->Name() ) { + zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), + event_serializer->Name().c_str()); + return false; + } + + auto r = event_serializer->UnserializeEvent(payload); + + if ( ! r ) { + auto escaped = + util::get_escaped_string(std::string(reinterpret_cast(payload.data()), payload.size()), false); + zeek::reporter->Error("Failed to unserialize message: %s: %s", std::string{topic}.c_str(), escaped.c_str()); + return false; + } + + auto& event = *r; + zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr, + event.timestamp); + + return true; +} + +bool Backend::ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload) { + // We could also dynamically lookup the right de-serializer, but + // for now assume we just receive what is configured. + if ( format != log_serializer->Name() ) { + zeek::reporter->Error("Got log message in format '%s', but have deserializer '%s'", std::string{format}.c_str(), + log_serializer->Name().c_str()); + return false; + } + + auto result = log_serializer->UnserializeLogWrite(payload); + + if ( ! result ) { + zeek::reporter->Error("Failed to unserialize log message using '%s'", std::string{format}.c_str()); + return false; + } + + // TODO: Send the whole batch to the logging manager. + // return zeek::log_mgr->WritesFromRemote(result->header, std::move(result->records)); + zeek::reporter->FatalError("not implemented"); + return false; +} + +bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span payload) { + return DoProcessBackendMessage(tag, payload); +} + +namespace { + +bool register_io_source(zeek::iosource::IOSource* src, int fd, bool dont_count) { + constexpr bool manage_lifetime = true; + + zeek::iosource_mgr->Register(src, dont_count, manage_lifetime); + + if ( ! zeek::iosource_mgr->RegisterFd(fd, src) ) { + zeek::reporter->Error("Failed to register messages_flare with IO manager"); + return false; + } + + return true; +} +} // namespace + +bool ThreadedBackend::DoInit() { + // Register as counting during DoInit() to avoid Zeek from shutting down. + return register_io_source(this, messages_flare.FD(), false); +} + +void ThreadedBackend::DoInitPostScript() { + // Register non-counting after parsing scripts. + register_io_source(this, messages_flare.FD(), true); +} + +void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { + bool fire = false; + + // Enqueue under lock. + { + std::scoped_lock lock(messages_mtx); + fire = messages.empty(); + + if ( messages.empty() ) { + messages = std::move(qmessages); + } + else { + messages.reserve(messages.size() + qmessages.size()); + for ( auto& qmsg : qmessages ) + messages.emplace_back(std::move(qmsg)); + } + } + + if ( fire ) + messages_flare.Fire(); +} + +void ThreadedBackend::Process() { + QueueMessages to_process; + { + std::scoped_lock lock(messages_mtx); + to_process = std::move(messages); + messages_flare.Extinguish(); + messages.clear(); + } + + for ( const auto& msg : to_process ) { + // sonarlint wants to use std::visit. not sure... + if ( auto* emsg = std::get_if(&msg) ) { + ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); + } + else if ( auto* lmsg = std::get_if(&msg) ) { + ProcessLogMessage(lmsg->format, lmsg->payload_span()); + } + else if ( auto* bmsg = std::get_if(&msg) ) { + ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); + } + else { + zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); + } + } +} diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h new file mode 100644 index 0000000000..55d2928736 --- /dev/null +++ b/src/cluster/Backend.h @@ -0,0 +1,399 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// The interface for cluster backends and remote events. + +#pragma once + +#include +#include +#include +#include +#include + +#include "zeek/EventHandler.h" +#include "zeek/Flare.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Span.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/iosource/IOSource.h" +#include "zeek/logging/Types.h" + +namespace zeek { + +class FuncVal; + +using FuncValPtr = IntrusivePtr; + +class Val; +using ValPtr = IntrusivePtr; +using ArgsSpan = Span; + +namespace cluster { + +namespace detail { + +/** + * Cluster event class. + */ +class Event { +public: + /** + * Constructor. + */ + Event(const EventHandlerPtr& handler, zeek::Args args, double timestamp = 0.0) + : handler(handler), args(std::move(args)), timestamp(timestamp) {} + + EventHandlerPtr handler; + zeek::Args args; + double timestamp; // TODO: This should be more generic, possibly holding a + // vector of key/value metadata, rather than just + // the timestamp. + + std::string_view HandlerName() const { return handler->Name(); } + const EventHandlerPtr& Handler() const { return handler; } +}; + +/** + * Validate that the provided args are suitable for handler. + * + * @param handler An event handler. + * @param args The provide arguments for the handler as a span. + * + * @return A zeek::Args instance if successful, else std::nullopt. + */ +std::optional check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args); +} // namespace detail + +/** + * Interface for a cluster backend implementing publish/subscribe communication. + * Serialization of events should be done using the serializers injected into + * the constructor. + */ +class Backend { +public: + virtual ~Backend() = default; + + /** + * Hook invoked after all scripts have been parsed. + */ + void InitPostScript() { DoInitPostScript(); } + + /** + * Method invoked from the Cluster::Backend::__init() bif. + */ + bool Init() { return DoInit(); } + + /** + * Hook invoked when Zeek is about to terminate. + */ + void Terminate() { DoTerminate(); } + + /** + * Create a cluster::detail::Event instance given an event handler and the + * script function arguments to it. + * + * @param handler A function val representing an event handler. + * @param args The arguments for the event handler. + * @param timestamp The network time to add to the event as metadata. + */ + std::optional MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp = 0.0) const; + + /** + * Publish a cluster::detail::Event instance to a given topic. + * + * @param topic The topic string to publish the event to. + * @param event The event to publish. + * + * @return true if the event was successfully published. + */ + bool PublishEvent(const std::string& topic, const cluster::detail::Event& event) { + return DoPublishEvent(topic, event); + } + + /** + * Register interest in messages that use a certain topic prefix. + * + * @param topic_prefix a prefix to match against remote message topics. + * @return true if it's a new event subscription and it is now registered. + */ + bool Subscribe(const std::string& topic_prefix) { return DoSubscribe(topic_prefix); } + + /** + * Unregister interest in messages on a certain topic. + * + * @param topic_prefix a prefix previously supplied to Subscribe() + * @return true if interest in topic prefix is no longer advertised. + */ + bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + + /** + * Publish multiple log records. + * + * All log records belong to the (stream, filter, path) tuple that is + * described by \a header. + * + * @param header Fixed information about the stream, writer, filter and schema of the records. + * @param records A span of logging::detail::LogRecords to be published. + */ + bool PublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records) { + return DoPublishLogWrites(header, records); + } + +protected: + /** + * Constructor. + */ + Backend(std::unique_ptr es, std::unique_ptr ls) + : event_serializer(std::move(es)), log_serializer(std::move(ls)) {} + + /** + * Process an incoming event message. + */ + bool ProcessEventMessage(const std::string_view& topic, const std::string_view& format, + detail::byte_buffer_span payload); + + /** + * Process an incoming log message. + */ + bool ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload); + +private: + /** + * Called after all Zeek scripts have been loaded. + * + * A cluster backend should initialize itself based on script variables, + * register any IO sources, etc. It should not yet start any connections, that + * should happen in DoInit() instead. + */ + virtual void DoInitPostScript() = 0; + + /** + * Called from Cluster::Backend::__init(). + * + * Backend implementations should start connections with + * remote systems or other nodes, open listening ports or + * do whatever is needed to be functional. + */ + virtual bool DoInit() = 0; + + /** + * Called at termination time. + * + * This should be used to shut down connectivity. Any last messages + * to be published should be sent from script land, rather than in + * DoTerminate(). A backend may wait for a bounded and configurable + * amount of time to flush any last messages out. + */ + virtual void DoTerminate() = 0; + + /** + * Publish a cluster::detail::Event to the given topic. + * + * The default implementation serializes to a detail::byte_buffer and + * calls DoPublishEvent() with the resulting buffer. + * + * This hook method only exists for the existing Broker implementation that + * short-circuits serialization. Other backends should not override this. + */ + virtual bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event); + + /** + * Send a serialized cluster::detail::Event to the given topic. + * + * Semantics of this call are "fire-and-forget". An implementation should + * ensure the message is enqueued for delivery, but may not have been sent out + * let alone received by any subscribers of the topic when this call returns. + * + * If the backend has not established a connection, the published message is + * allowed to be discarded. + * + * @param topic a topic string associated with the message. + * @param format the format/serializer used for serialization of the message payload. + * @param buf the serialized Event. + * @return true if the message has been published successfully. + */ + virtual bool DoPublishEvent(const std::string& topic, const std::string& format, + const detail::byte_buffer& buf) = 0; + + /** + * Register interest in messages that use a certain topic prefix. + * + * If the backend hasn't yet established a connection, any subscriptions + * should be queued until they can be processed. + * + * @param topic_prefix a prefix to match against remote message topics. + * + * @return true if it's a new event subscription and now registered. + */ + virtual bool DoSubscribe(const std::string& topic_prefix) = 0; + + /** + * Unregister interest in messages on a certain topic. + * + * @param topic_prefix a prefix previously supplied to Subscribe() + * @return true if interest in topic prefix is no longer advertised. + */ + virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0; + + /** + * Serialize a log batch, then forward it to DoPublishLogWrites() below. + + * The default implementation serializes to a detail::byte_buffer and + * calls DoPublishLogWrites() with the resulting buffer. + * + * This hook method only exists for the existing Broker implementation that + * short-circuits serialization. Other backends should not override this. + * + * @param header The header describing the writer frontend where the records originate from. + * @param records Records to be serialized. + * + * @return true if the message has been published successfully. + */ + virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records); + + /** + * Send out a serialized log batch. + * + * A backend implementation may use the values from \a header to + * construct a topic to write the logs to. + * + * Semantics of this call are "fire-and-forget". An implementation should + * ensure the message is enqueued for delivery, but may not have been sent out + * let alone received by the destination when this call returns. + * + * Sharding log writes to multiple receivers (logger nodes) is backend specific. + * Broker, for example, involves Zeek script layer cluster pool concepts. + * Other backends may use appropriate native mechanisms that may be more + * efficient. + * + * @param header the header describing the writer frontend where the records originate from. + * @param format the format/serializer used for serialization of the message payload. + * @param buf the serialized log batch. This is the message payload. + * @return true if the message has been published successfully. + */ + virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, + detail::byte_buffer& buf) = 0; + + std::unique_ptr event_serializer; + std::unique_ptr log_serializer; +}; + +/** + * A cluster backend may receive event and log messages asynchronously + * through threads. The following structs can be used with QueueForProcessing() + * to enqueue these messages onto the main IO loop for processing. + * + * EventMessage and LogMessage are processed in a generic fashion in + * the Process() method. The BackendMessage can be intercepted with + * DoProcessBackendMessage(). DoProcessBackendMessage() is guaranteed + * to run on Zeek's main thread. + */ + +/** + * A message on a topic for events was received. + */ +struct EventMessage { + std::string topic; + std::string format; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +/** + * A message that represents log records. + */ +struct LogMessage { + std::string format; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +/** + * A backend specific message. + * + * This provides a mechanism to transfer auxiliary information + * from a background thread to Zeek's main thread. + */ +struct BackendMessage { + int tag; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +using QueueMessage = std::variant; +using QueueMessages = std::vector; + +/** + * Support for backends that use background threads or invoke + * callbacks on non-main threads. + */ +class ThreadedBackend : public Backend, public zeek::iosource::IOSource { +public: + using Backend::Backend; + +protected: + /** + * To be used by implementations to enqueue messages for processing on the IO loop. + * + * It's safe to call this method from any thread. + * + * @param messages Messages to be enqueued. + */ + void QueueForProcessing(QueueMessages&& messages); + + void Process() override; + + double GetNextTimeout() override { return -1; } + + /** + * The DoInitPostScript() implementation of ThreadedBackend + * registers itself as a non-counting IO source. + * + * Classes deriving from ThreadedBackend and providing their + * own DoInitPostScript() method should invoke the ThreadedBackend's + * implementation to register themselves as a non-counting + * IO source with the IO loop. + */ + void DoInitPostScript() override; + + /** + * The default DoInit() implementation of ThreadedBackend + * registers itself as a counting IO source to keep the IO + * loop alive after initialization. + * + * Classes deriving from ThreadedBackend and providing their + * own DoInit() method should invoke the ThreadedBackend's + * implementation to register themselves as a counting + * IO source with the IO loop. + */ + bool DoInit() override; + +private: + /** + * Process a backend specific message queued as BackendMessage. + */ + bool ProcessBackendMessage(int tag, detail::byte_buffer_span payload); + + /** + * If a cluster backend produces messages of type BackendMessage, + * this method will be invoked by the main thread to process it. + */ + virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; }; + + // Members used for communication with the main thread. + std::mutex messages_mtx; + std::vector messages; + zeek::detail::Flare messages_flare; +}; + + +// Cluster backend instance used for publish() and subscribe() calls. +extern Backend* backend; + +} // namespace cluster +} // namespace zeek From 23ca625c039d31e3c1a6088a38c88ab1d44d240b Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 11:24:08 +0100 Subject: [PATCH 08/13] cluster: Add Components and ComponentManager for new components --- src/cluster/CMakeLists.txt | 9 +++ src/cluster/Component.cc | 56 ++++++++++++++++ src/cluster/Component.h | 131 +++++++++++++++++++++++++++++++++++++ src/cluster/Manager.cc | 27 ++++++++ src/cluster/Manager.h | 80 ++++++++++++++++++++++ src/zeekygen/ScriptInfo.cc | 8 +++ 6 files changed, 311 insertions(+) create mode 100644 src/cluster/CMakeLists.txt create mode 100644 src/cluster/Component.cc create mode 100644 src/cluster/Component.h create mode 100644 src/cluster/Manager.cc create mode 100644 src/cluster/Manager.h diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt new file mode 100644 index 0000000000..0395d28786 --- /dev/null +++ b/src/cluster/CMakeLists.txt @@ -0,0 +1,9 @@ +zeek_add_subdir_library( + cluster + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + SOURCES + Component.cc + Backend.cc + Manager.cc) diff --git a/src/cluster/Component.cc b/src/cluster/Component.cc new file mode 100644 index 0000000000..09dd74c938 --- /dev/null +++ b/src/cluster/Component.cc @@ -0,0 +1,56 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/Component.h" + +#include "zeek/Desc.h" +#include "zeek/Tag.h" +#include "zeek/cluster/Manager.h" +#include "zeek/util.h" + +using namespace zeek::cluster; + +BackendComponent::BackendComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::CLUSTER_BACKEND, name, 0, cluster::manager->Backends().GetTagType()) { + factory = arg_factory; +} + +void BackendComponent::Initialize() { + InitializeTag(); + cluster::manager->Backends().RegisterComponent(this, "CLUSTER_BACKEND_"); +} + +void BackendComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::CLUSTER_BACKEND_"); + d->Add(CanonicalName()); +} + +EventSerializerComponent::EventSerializerComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::EVENT_SERIALIZER, name, 0, + cluster::manager->EventSerializers().GetTagType()) { + factory = arg_factory; +} + +void EventSerializerComponent::Initialize() { + InitializeTag(); + cluster::manager->EventSerializers().RegisterComponent(this, "EVENT_SERIALIZER_"); +} + +void EventSerializerComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::EVENT_SERIALIZER_"); + d->Add(CanonicalName()); +} + +LogSerializerComponent::LogSerializerComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::LOG_SERIALIZER, name, 0, cluster::manager->LogSerializers().GetTagType()) { + factory = arg_factory; +} + +void LogSerializerComponent::Initialize() { + InitializeTag(); + cluster::manager->LogSerializers().RegisterComponent(this, "LOG_SERIALIZER_"); +} + +void LogSerializerComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::LOG_SERIALIZER_"); + d->Add(CanonicalName()); +} diff --git a/src/cluster/Component.h b/src/cluster/Component.h new file mode 100644 index 0000000000..944557feae --- /dev/null +++ b/src/cluster/Component.h @@ -0,0 +1,131 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/plugin/Component.h" + +namespace zeek::cluster { + +class BackendComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(std::unique_ptr, + std::unique_ptr); + + /** + * Constructor. + * + * @param name The name of the cluster backend. A Zeek script-level enum + * with the name Cluster::CLUSTER_BACKEND_ will be created. + * + * @param factory A factory function to instantiate instances of the + * cluster backend. + */ + BackendComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~BackendComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; + + +class EventSerializerComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(); + + /** + * Constructor. + * + * @param name The name of the event serializer. A Zeek script-level enum + * with the name Cluster::EVENT_SERIALIZER_ will be created. + * + * @param factory A factory function to instantiate instances of the + * event serializer. + */ + EventSerializerComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~EventSerializerComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; + +class LogSerializerComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(); + + /** + * Constructor. + * + * @param name The name of the log serializer. A Zeek script-level enum + * with the name Cluster::LOG_SERIALIZER_ will be created. + * + * @param factory A factory function to instantiate instances of the + * log serializer. + */ + LogSerializerComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~LogSerializerComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; +} // namespace zeek::cluster diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc new file mode 100644 index 0000000000..eff6196164 --- /dev/null +++ b/src/cluster/Manager.cc @@ -0,0 +1,27 @@ +#include "zeek/cluster/Manager.h" + +#include "zeek/cluster/Serializer.h" + +using namespace zeek::cluster; + +Manager::Manager() + : backends(plugin::ComponentManager("Cluster", "BackendTag")), + event_serializers(plugin::ComponentManager("Cluster", "EventSerializerTag")), + log_serializers(plugin::ComponentManager("Cluster", "LogSerializerTag")) {} + +std::unique_ptr Manager::InstantiateBackend(const zeek::EnumValPtr& tag, + std::unique_ptr event_serializer, + std::unique_ptr log_serializer) { + const BackendComponent* c = Backends().Lookup(tag); + return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr; +} + +std::unique_ptr Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) { + const EventSerializerComponent* c = EventSerializers().Lookup(tag); + return c ? c->Factory()() : nullptr; +} + +std::unique_ptr Manager::InstantiateLogSerializer(const zeek::EnumValPtr& tag) { + const LogSerializerComponent* c = LogSerializers().Lookup(tag); + return c ? c->Factory()() : nullptr; +} diff --git a/src/cluster/Manager.h b/src/cluster/Manager.h new file mode 100644 index 0000000000..fd4a44f31c --- /dev/null +++ b/src/cluster/Manager.h @@ -0,0 +1,80 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include + +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/plugin/ComponentManager.h" + +namespace zeek::cluster { + +/** + * Manager to allow registration of cluster components. + * + * This manager holds three component manager for event and log serializers + * components, as well as backend components themselves. + */ +class Manager { +public: + Manager(); + + /** + * Instantiate a cluster backend with the given enum value and + * pre-instantiated event and log serializers. + * + * @param tag The enum value identifying the backend. + * @param event_serializer The event serializer to inject. + * @param log_serializer The log serializer to inject. + * + * @return New ClusterBackend instance, or null if there's no such component. + */ + std::unique_ptr InstantiateBackend(const EnumValPtr& tag, + std::unique_ptr event_serializer, + std::unique_ptr log_serializer); + + /** + * Instantiate a event serializer with the given enum value. + * + * @param tag The enum value identifying a serializer. + * + * @return New Serializer instance, or null if there's no such component. + */ + std::unique_ptr InstantiateEventSerializer(const EnumValPtr& tag); + + /** + * Instantiate a log serializer with the given enum value. + * + * @param tag The enum value identifying a serializer. + * + * @return New Serializer instance, or null if there's no such component. + */ + std::unique_ptr InstantiateLogSerializer(const EnumValPtr& tag); + + /** + * @return The ComponentManager for backends. + */ + plugin::ComponentManager& Backends() { return backends; }; + + /** + * @return The ComponentManager for event serializers. + */ + plugin::ComponentManager& EventSerializers() { return event_serializers; }; + + /** + * @return The ComponentManager for serializers. + */ + plugin::ComponentManager& LogSerializers() { return log_serializers; }; + +private: + plugin::ComponentManager backends; + plugin::ComponentManager event_serializers; + plugin::ComponentManager log_serializers; +}; + +// This manager instance only exists for plugins to register components, +// not for actual cluster functionality. +extern Manager* manager; + +} // namespace zeek::cluster diff --git a/src/zeekygen/ScriptInfo.cc b/src/zeekygen/ScriptInfo.cc index 817174cca9..70347baa88 100644 --- a/src/zeekygen/ScriptInfo.cc +++ b/src/zeekygen/ScriptInfo.cc @@ -313,6 +313,14 @@ void ScriptInfo::DoInitPostScript() { const auto& id = zeek::detail::global_scope()->Find("Log::Writer"); types.push_back(new IdentifierInfo(id, this)); } + else if ( name == "base/frameworks/cluster/main.zeek" ) { + const auto& backend_id = zeek::detail::global_scope()->Find("Cluster::BackendTag"); + types.push_back(new IdentifierInfo(backend_id, this)); + const auto& event_serializer_id = zeek::detail::global_scope()->Find("Cluster::EventSerializerTag"); + types.push_back(new IdentifierInfo(event_serializer_id, this)); + const auto& log_serializer_id = zeek::detail::global_scope()->Find("Cluster::LogSerializerTag"); + types.push_back(new IdentifierInfo(log_serializer_id, this)); + } } vector ScriptInfo::GetComments() const { return comments; } From ac9594ffaee98ef422fda7e09ec42b4ba2f309df Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 15:19:34 +0100 Subject: [PATCH 09/13] cluster: Add to src/CMakeLists.txt --- src/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3c5dfacf25..50bc006f26 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -194,6 +194,7 @@ gen_zam_target(${GEN_ZAM_SRC_DIR}) option(USE_SQLITE "Should Zeek use SQLite?" ON) add_subdirectory(analyzer) +add_subdirectory(cluster) add_subdirectory(packet_analysis) add_subdirectory(broker) add_subdirectory(telemetry) From 60b5cf97301c9775b2c2e60f0b79d159f4f8ace6 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 13:33:31 +0100 Subject: [PATCH 10/13] zeek-setup: Instantiate backend::manager Required to allow registration of cluster components. --- src/zeek-setup.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 3cf44a5b7f..0e449843e5 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -54,6 +54,7 @@ #include "zeek/analyzer/Manager.h" #include "zeek/binpac_zeek.h" #include "zeek/broker/Manager.h" +#include "zeek/cluster/Manager.h" #include "zeek/file_analysis/Manager.h" #include "zeek/input.h" #include "zeek/input/Manager.h" @@ -179,6 +180,8 @@ zeek::detail::trigger::Manager* zeek::detail::trigger_mgr = nullptr; zeek::spicy::Manager* zeek::spicy_mgr = nullptr; #endif +zeek::cluster::Manager* zeek::cluster::manager = nullptr; + std::vector zeek::detail::zeek_script_prefixes; zeek::detail::Stmt* zeek::detail::stmts = nullptr; zeek::EventRegistry* zeek::event_registry = nullptr; @@ -391,6 +394,7 @@ static void terminate_zeek() { delete packet_mgr; delete analyzer_mgr; delete file_mgr; + delete cluster::manager; // broker_mgr, timer_mgr, supervisor, and dns_mgr are deleted via iosource_mgr delete iosource_mgr; delete event_registry; @@ -668,6 +672,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) { log_mgr = new logging::Manager(); input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); + cluster::manager = new cluster::Manager(); auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode; broker_mgr = new Broker::Manager(broker_real_time); trigger_mgr = new trigger::Manager(); From 6fb73aa9da9b0c24ea43df53eeb71f1df6afd34a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 12:49:28 +0100 Subject: [PATCH 11/13] broker: Add shim plugin adding a backend component For broker, this isn't really functioning, but at least makes the CLUSTER_BACKEND_BROKER enum available. --- src/broker/CMakeLists.txt | 3 ++ src/broker/Plugin.cc | 28 +++++++++++++++++++ src/broker/Plugin.h | 14 ++++++++++ .../scripts.base.files.x509.files/files.log | 12 ++++---- 4 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 src/broker/Plugin.cc create mode 100644 src/broker/Plugin.h diff --git a/src/broker/CMakeLists.txt b/src/broker/CMakeLists.txt index 472f67e11d..210f5be138 100644 --- a/src/broker/CMakeLists.txt +++ b/src/broker/CMakeLists.txt @@ -12,3 +12,6 @@ zeek_add_subdir_library( data.bif messaging.bif store.bif) + +# Small plugin shim to make the CLUSTER_BACKEND_BROKER enum value available. +zeek_add_plugin(Zeek Cluster_Backend_Broker SOURCES Plugin.cc) diff --git a/src/broker/Plugin.cc b/src/broker/Plugin.cc new file mode 100644 index 0000000000..3f252f2690 --- /dev/null +++ b/src/broker/Plugin.cc @@ -0,0 +1,28 @@ +#include "zeek/broker/Plugin.h" + +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" + +using namespace zeek::plugin::Zeek_Cluster_Backend_Broker; + +zeek::plugin::Configuration Plugin::Configure() { + // For now, there's always the broker_mgr instance that's explicitly + // instantiated in zeek-setup.cc. Don't even allow to instantiate + // a second one via the plugin mechanism. In the future, this could + // be changed so that broker is instantiated on demand only. + auto fail_instantiate = [](std::unique_ptr, + std::unique_ptr) -> std::unique_ptr { + zeek::reporter->FatalError("do not instantiate broker explicitly"); + return nullptr; + }; + + AddComponent(new cluster::BackendComponent("BROKER", fail_instantiate)); + + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_Backend_Broker"; + config.description = "Cluster backend using Broker"; + return config; +} diff --git a/src/broker/Plugin.h b/src/broker/Plugin.h new file mode 100644 index 0000000000..2ce9b48b75 --- /dev/null +++ b/src/broker/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Zeek_Cluster_Backend_Broker { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +} plugin; + +} // namespace zeek::plugin::Zeek_Cluster_Backend_Broker diff --git a/testing/btest/Baseline/scripts.base.files.x509.files/files.log b/testing/btest/Baseline/scripts.base.files.x509.files/files.log index e64dfc52c0..ce19924fa1 100644 --- a/testing/btest/Baseline/scripts.base.files.x509.files/files.log +++ b/testing/btest/Baseline/scripts.base.files.x509.files/files.log @@ -7,10 +7,10 @@ #open XXXX-XX-XX-XX-XX-XX #fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid md5 sha1 sha256 #types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string string string string -XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 -XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 #close XXXX-XX-XX-XX-XX-XX From de9d39cd01b54eb5ba0d996768614888d031f097 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 13 Nov 2024 12:57:44 +0100 Subject: [PATCH 12/13] btest: Add cluster dir, minimal test for enum value --- testing/btest/Baseline/cluster.backend-enum/out | 5 +++++ testing/btest/btest.cfg | 2 +- testing/btest/cluster/backend-enum.zeek | 10 ++++++++++ 3 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 testing/btest/Baseline/cluster.backend-enum/out create mode 100644 testing/btest/cluster/backend-enum.zeek diff --git a/testing/btest/Baseline/cluster.backend-enum/out b/testing/btest/Baseline/cluster.backend-enum/out new file mode 100644 index 0000000000..1948596eaa --- /dev/null +++ b/testing/btest/Baseline/cluster.backend-enum/out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Zeek::Cluster_Backend_Broker - Cluster backend using Broker (built-in) + [Cluster Backend] BROKER (Cluster::CLUSTER_BACKEND_BROKER) + +Cluster::CLUSTER_BACKEND_BROKER, Cluster::BackendTag diff --git a/testing/btest/btest.cfg b/testing/btest/btest.cfg index 9b459a0a05..405ad41f3c 100644 --- a/testing/btest/btest.cfg +++ b/testing/btest/btest.cfg @@ -4,7 +4,7 @@ build_dir = build [btest] -TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr +TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr cluster TmpDir = %(testbase)s/.tmp BaselineDir = %(testbase)s/Baseline IgnoreDirs = .svn CVS .tmp diff --git a/testing/btest/cluster/backend-enum.zeek b/testing/btest/cluster/backend-enum.zeek new file mode 100644 index 0000000000..04d4b89963 --- /dev/null +++ b/testing/btest/cluster/backend-enum.zeek @@ -0,0 +1,10 @@ +# @TEST-DOC: Test cluster backend enum +# +# @TEST-EXEC: zeek -NN Zeek::Cluster_Backend_Broker >>out +# @TEST-EXEC: zeek -b %INPUT >>out +# @TEST-EXEC: btest-diff out + +event zeek_init() + { + print Cluster::CLUSTER_BACKEND_BROKER, type_name(Cluster::CLUSTER_BACKEND_BROKER); + } From baca6ba769b14644b96e138ee4698d902ee23ac3 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 21 Nov 2024 18:04:29 +0100 Subject: [PATCH 13/13] btest: Test Broker::make_event() together with Cluster::publish_hrw() --- .../manager-1..stdout | 52 ++++++++++ .../proxy-1..stdout | 10 ++ .../proxy-2..stdout | 24 +++++ .../topic_distribution_make_event.zeek | 99 +++++++++++++++++++ 4 files changed, 185 insertions(+) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout create mode 100644 testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout new file mode 100644 index 0000000000..b966781c80 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout @@ -0,0 +1,52 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +1st stuff +hrw, 0, T +hrw, 1, T +hrw, 2, T +hrw, 3, T +hrw, 13, T +hrw, 37, T +hrw, 42, T +hrw, 101, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +2nd stuff +hrw, 0, T +hrw, 1, T +hrw, 2, T +hrw, 3, T +hrw, 13, T +hrw, 37, T +hrw, 42, T +hrw, 101, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +no stuff +hrw, 0, F +hrw, 1, F +hrw, 2, F +hrw, 3, F +hrw, 13, F +hrw, 37, F +hrw, 42, F +hrw, 101, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout new file mode 100644 index 0000000000..c4e16958ba --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout @@ -0,0 +1,10 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got distributed event hrw, 1 +got distributed event hrw, 13 +got distributed event hrw, 37 +got distributed event hrw, 42 +got distributed event hrw, 101 +got distributed event rr, 0 +got distributed event rr, 2 +got distributed event rr, 13 +got distributed event rr, 42 diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout new file mode 100644 index 0000000000..fec1ae9fe4 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout @@ -0,0 +1,24 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got distributed event hrw, 0 +got distributed event hrw, 2 +got distributed event hrw, 3 +got distributed event rr, 1 +got distributed event rr, 3 +got distributed event rr, 37 +got distributed event rr, 101 +got distributed event hrw, 0 +got distributed event hrw, 1 +got distributed event hrw, 2 +got distributed event hrw, 3 +got distributed event hrw, 13 +got distributed event hrw, 37 +got distributed event hrw, 42 +got distributed event hrw, 101 +got distributed event rr, 0 +got distributed event rr, 1 +got distributed event rr, 2 +got distributed event rr, 3 +got distributed event rr, 13 +got distributed event rr, 37 +got distributed event rr, 42 +got distributed event rr, 101 diff --git a/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek b/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek new file mode 100644 index 0000000000..e966fc73ce --- /dev/null +++ b/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek @@ -0,0 +1,99 @@ +# @TEST-DOC: Broker::make_event() together with Cluster::publish_hrw() and Cluster::publish_rr() +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 +# @TEST-PORT: BROKER_PORT4 +# @TEST-PORT: BROKER_PORT5 +# +# @TEST-EXEC: zeek -b --parse-only %INPUT +# @TEST-EXEC: btest-bg-run manager-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run proxy-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-2 zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff proxy-1/.stdout +# @TEST-EXEC: btest-diff proxy-2/.stdout + +@load policy/frameworks/cluster/experimental + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"], + ["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1"], +}; +@TEST-END-FILE + +global q = 0; + +event go_away() + { + terminate(); + } + +event distributed_event_hrw(c: count) + { + print "got distributed event hrw", c; + } + +event distributed_event_rr(c: count) + { + print "got distributed event rr", c; + } + +function send_stuff(heading: string) + { + print heading; + + local v: vector of count = vector(0, 1, 2, 3, 13, 37, 42, 101); + local e: Broker::Event; + + for ( i in v ) + { + e = Broker::make_event(distributed_event_hrw, v[i]); + print "hrw", v[i], Cluster::publish_hrw(Cluster::proxy_pool, v[i], e); + } + + local rr_key = "test"; + + for ( i in v ) + { + e = Broker::make_event(distributed_event_rr, v[i]); + print "rr", Cluster::publish_rr(Cluster::proxy_pool, rr_key, e); + } + } + +event Cluster::Experimental::cluster_started() + { + if ( Cluster::node != "manager-1" ) + return; + + send_stuff("1st stuff"); + local e = Broker::make_event(go_away); + Broker::publish(Cluster::node_topic("proxy-1"), e); + } + +event Cluster::node_down(name: string, id: string) + { + if ( Cluster::node != "manager-1" ) + return; + + if ( name == "proxy-1" ) + { + send_stuff("2nd stuff"); + local e = Broker::make_event(go_away); + Broker::publish(Cluster::node_topic("proxy-2"), e); + } + + if ( name == "proxy-2" ) + { + send_stuff("no stuff"); + terminate(); + } + } + +event Cluster::node_down(name: string, id: string) + { + if ( name == "manager-1" ) + terminate(); + }