diff --git a/CHANGES b/CHANGES index b162cc4502..50a02d5220 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,47 @@ +7.1.0-dev.607 | 2024-11-22 12:32:21 +0100 + + * btest: Test Broker::make_event() together with Cluster::publish_hrw() (Arne Welzel, Corelight) + + * btest: Add cluster dir, minimal test for enum value (Arne Welzel, Corelight) + + * broker: Add shim plugin adding a backend component (Arne Welzel, Corelight) + + For broker, this isn't really functioning, but at least makes the + CLUSTER_BACKEND_BROKER enum available. + + * zeek-setup: Instantiate backend::manager (Arne Welzel, Corelight) + + Required to allow registration of cluster components. + + * cluster: Add to src/CMakeLists.txt (Arne Welzel, Corelight) + + * cluster: Add Components and ComponentManager for new components (Arne Welzel, Corelight) + + * cluster/Backend: Interface for cluster backends (Arne Welzel, Corelight) + + * cluster/Serializer: Interface for event and log serializers (Arne Welzel, Corelight) + + * logging: Introduce logging/Types.h (Arne Welzel, Corelight) + + Header and implementation for types shared between the cluster and + logging framework. The logging framework will be adapted later to + use these. For now, the new cluster components will simply reference + them. + + * SerialTypes/Field: Allow default construction and add move constructor (Arne Welzel, Corelight) + + This is in preparation of using SerialTypes to serialize and + unserialize complete log batches which include Field instances + and not just Value's. + + * DebugLogger: Add cluster debugging stream (Arne Welzel, Corelight) + + * plugin: Add component enums for pluggable cluster backends (Arne Welzel, Corelight) + + * broker: Pass frame to MakeEvent() (Arne Welzel, Corelight) + + This was lost in a prior change. + 7.1.0-dev.592 | 2024-11-21 16:38:55 +0100 * sumstat/non-cluster: Move last epoch processing to zeek_done() (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index b1f64c2b30..be93068fd5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.1.0-dev.592 +7.1.0-dev.607 diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 3ecdc52420..d27a9160a9 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -280,6 +280,14 @@ export { ## Returns: a topic string that may used to send a message exclusively to ## a given cluster node. global nodeid_topic: function(id: string): string; + + ## An event instance for cluster pub/sub. + type Event: record { + ## The event handler to be invoked on the remote node. + ev: any; + ## The arguments for the event. + args: vector of any; + }; } # Track active nodes per type. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3c5dfacf25..50bc006f26 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -194,6 +194,7 @@ gen_zam_target(${GEN_ZAM_SRC_DIR}) option(USE_SQLITE "Should Zeek use SQLite?" ON) add_subdirectory(analyzer) +add_subdirectory(cluster) add_subdirectory(packet_analysis) add_subdirectory(broker) add_subdirectory(telemetry) diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index 5e97d1e629..3feb37f8d0 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -19,7 +19,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = {"main-loop", 0, false}, {"dpd", 0, false}, {"packet-analysis", 0, false}, {"file-analysis", 0, false}, {"tm", 0, false}, {"logging", 0, false}, {"input", 0, false}, {"threading", 0, false}, {"plugins", 0, false}, {"zeekygen", 0, false}, {"pktio", 0, false}, {"broker", 0, false}, - {"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false}}; + {"scripts", 0, false}, {"supervisor", 0, false}, {"hashkey", 0, false}, {"spicy", 0, false}, + {"cluster", 0, false}}; DebugLogger::~DebugLogger() { if ( file && file != stderr ) diff --git a/src/DebugLogger.h b/src/DebugLogger.h index 7e3a4c0b6b..4cfa7c9e32 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -54,6 +54,7 @@ enum DebugStream { DBG_SUPERVISOR, // Process supervisor DBG_HASHKEY, // HashKey buffers DBG_SPICY, // Spicy functionality + DBG_CLUSTER, // Cluster functionality NUM_DBGS // Has to be last }; diff --git a/src/broker/CMakeLists.txt b/src/broker/CMakeLists.txt index 472f67e11d..210f5be138 100644 --- a/src/broker/CMakeLists.txt +++ b/src/broker/CMakeLists.txt @@ -12,3 +12,6 @@ zeek_add_subdir_library( data.bif messaging.bif store.bif) + +# Small plugin shim to make the CLUSTER_BACKEND_BROKER enum value available. +zeek_add_plugin(Zeek Cluster_Backend_Broker SOURCES Plugin.cc) diff --git a/src/broker/Plugin.cc b/src/broker/Plugin.cc new file mode 100644 index 0000000000..3f252f2690 --- /dev/null +++ b/src/broker/Plugin.cc @@ -0,0 +1,28 @@ +#include "zeek/broker/Plugin.h" + +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" + +using namespace zeek::plugin::Zeek_Cluster_Backend_Broker; + +zeek::plugin::Configuration Plugin::Configure() { + // For now, there's always the broker_mgr instance that's explicitly + // instantiated in zeek-setup.cc. Don't even allow to instantiate + // a second one via the plugin mechanism. In the future, this could + // be changed so that broker is instantiated on demand only. + auto fail_instantiate = [](std::unique_ptr, + std::unique_ptr) -> std::unique_ptr { + zeek::reporter->FatalError("do not instantiate broker explicitly"); + return nullptr; + }; + + AddComponent(new cluster::BackendComponent("BROKER", fail_instantiate)); + + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_Backend_Broker"; + config.description = "Cluster backend using Broker"; + return config; +} diff --git a/src/broker/Plugin.h b/src/broker/Plugin.h new file mode 100644 index 0000000000..2ce9b48b75 --- /dev/null +++ b/src/broker/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Zeek_Cluster_Backend_Broker { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +} plugin; + +} // namespace zeek::plugin::Zeek_Cluster_Backend_Broker diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index 632cf06ce5..aba84fe344 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -99,7 +99,7 @@ function Broker::make_event%(...%): Broker::Event %{ zeek::Broker::Manager::ScriptScopeGuard ssg; - auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@}); + auto ev = zeek::broker_mgr->MakeEvent(ArgsSpan{*@ARGS@}, frame); return zeek::cast_intrusive(ev); %} diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc new file mode 100644 index 0000000000..3737b9134f --- /dev/null +++ b/src/cluster/Backend.cc @@ -0,0 +1,214 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/Backend.h" + +#include + +#include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" +#include "zeek/Func.h" +#include "zeek/Reporter.h" +#include "zeek/Type.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/iosource/Manager.h" + +using namespace zeek::cluster; + +std::optional detail::check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args) { + const auto& func_type = handler->GetType(); + + if ( func_type->Flavor() != zeek::FUNC_FLAVOR_EVENT ) { + zeek::reporter->Error("unexpected function type for %s: %s", handler->AsFunc()->GetName().c_str(), + func_type->FlavorString().c_str()); + return std::nullopt; + } + + const auto& types = func_type->ParamList()->GetTypes(); + if ( args.size() != types.size() ) { + zeek::reporter->Error("bad number of arguments for %s: got %zu, expect %zu", + handler->AsFunc()->GetName().c_str(), args.size(), types.size()); + return std::nullopt; + } + + zeek::Args result(args.size()); + + for ( size_t i = 0; i < args.size(); i++ ) { + const auto& a = args[i]; + const auto& got_type = a->GetType(); + const auto& expected_type = types[i]; + + if ( ! same_type(got_type, expected_type) ) { + zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1, + zeek::obj_desc_short(got_type.get()).c_str(), + zeek::obj_desc_short(expected_type.get()).c_str()); + return std::nullopt; + } + + result[i] = args[i]; + } + + return result; +} + +std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp) const { + auto checked_args = detail::check_args(handler, args); + if ( ! checked_args ) + return std::nullopt; + + if ( timestamp == 0.0 ) + timestamp = zeek::event_mgr.CurrentEventTime(); + + const auto& eh = zeek::event_registry->Lookup(handler->AsFuncPtr()->GetName()); + if ( ! eh ) { + zeek::reporter->Error("event registry lookup of '%s' failed", obj_desc(handler.get()).c_str()); + return std::nullopt; + } + + return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp}; +} + +// Default implementation doing the serialization. +bool Backend::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { + cluster::detail::byte_buffer buf; + + if ( ! event_serializer->SerializeEvent(buf, event) ) + return false; + + return DoPublishEvent(topic, event_serializer->Name(), buf); +} + +// Default implementation doing log record serialization. +bool Backend::DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records) { + cluster::detail::byte_buffer buf; + + if ( ! log_serializer->SerializeLogWrite(buf, header, records) ) + return false; + + return DoPublishLogWrites(header, log_serializer->Name(), buf); +} + +bool Backend::ProcessEventMessage(const std::string_view& topic, const std::string_view& format, + const detail::byte_buffer_span payload) { + if ( format != event_serializer->Name() ) { + zeek::reporter->Error("ProcessEventMessage: Wrong format: %s vs %s", std::string{format}.c_str(), + event_serializer->Name().c_str()); + return false; + } + + auto r = event_serializer->UnserializeEvent(payload); + + if ( ! r ) { + auto escaped = + util::get_escaped_string(std::string(reinterpret_cast(payload.data()), payload.size()), false); + zeek::reporter->Error("Failed to unserialize message: %s: %s", std::string{topic}.c_str(), escaped.c_str()); + return false; + } + + auto& event = *r; + zeek::event_mgr.Enqueue(event.Handler(), std::move(event.args), util::detail::SOURCE_BROKER, 0, nullptr, + event.timestamp); + + return true; +} + +bool Backend::ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload) { + // We could also dynamically lookup the right de-serializer, but + // for now assume we just receive what is configured. + if ( format != log_serializer->Name() ) { + zeek::reporter->Error("Got log message in format '%s', but have deserializer '%s'", std::string{format}.c_str(), + log_serializer->Name().c_str()); + return false; + } + + auto result = log_serializer->UnserializeLogWrite(payload); + + if ( ! result ) { + zeek::reporter->Error("Failed to unserialize log message using '%s'", std::string{format}.c_str()); + return false; + } + + // TODO: Send the whole batch to the logging manager. + // return zeek::log_mgr->WritesFromRemote(result->header, std::move(result->records)); + zeek::reporter->FatalError("not implemented"); + return false; +} + +bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span payload) { + return DoProcessBackendMessage(tag, payload); +} + +namespace { + +bool register_io_source(zeek::iosource::IOSource* src, int fd, bool dont_count) { + constexpr bool manage_lifetime = true; + + zeek::iosource_mgr->Register(src, dont_count, manage_lifetime); + + if ( ! zeek::iosource_mgr->RegisterFd(fd, src) ) { + zeek::reporter->Error("Failed to register messages_flare with IO manager"); + return false; + } + + return true; +} +} // namespace + +bool ThreadedBackend::DoInit() { + // Register as counting during DoInit() to avoid Zeek from shutting down. + return register_io_source(this, messages_flare.FD(), false); +} + +void ThreadedBackend::DoInitPostScript() { + // Register non-counting after parsing scripts. + register_io_source(this, messages_flare.FD(), true); +} + +void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { + bool fire = false; + + // Enqueue under lock. + { + std::scoped_lock lock(messages_mtx); + fire = messages.empty(); + + if ( messages.empty() ) { + messages = std::move(qmessages); + } + else { + messages.reserve(messages.size() + qmessages.size()); + for ( auto& qmsg : qmessages ) + messages.emplace_back(std::move(qmsg)); + } + } + + if ( fire ) + messages_flare.Fire(); +} + +void ThreadedBackend::Process() { + QueueMessages to_process; + { + std::scoped_lock lock(messages_mtx); + to_process = std::move(messages); + messages_flare.Extinguish(); + messages.clear(); + } + + for ( const auto& msg : to_process ) { + // sonarlint wants to use std::visit. not sure... + if ( auto* emsg = std::get_if(&msg) ) { + ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); + } + else if ( auto* lmsg = std::get_if(&msg) ) { + ProcessLogMessage(lmsg->format, lmsg->payload_span()); + } + else if ( auto* bmsg = std::get_if(&msg) ) { + ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); + } + else { + zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); + } + } +} diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h new file mode 100644 index 0000000000..55d2928736 --- /dev/null +++ b/src/cluster/Backend.h @@ -0,0 +1,399 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// The interface for cluster backends and remote events. + +#pragma once + +#include +#include +#include +#include +#include + +#include "zeek/EventHandler.h" +#include "zeek/Flare.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Span.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/iosource/IOSource.h" +#include "zeek/logging/Types.h" + +namespace zeek { + +class FuncVal; + +using FuncValPtr = IntrusivePtr; + +class Val; +using ValPtr = IntrusivePtr; +using ArgsSpan = Span; + +namespace cluster { + +namespace detail { + +/** + * Cluster event class. + */ +class Event { +public: + /** + * Constructor. + */ + Event(const EventHandlerPtr& handler, zeek::Args args, double timestamp = 0.0) + : handler(handler), args(std::move(args)), timestamp(timestamp) {} + + EventHandlerPtr handler; + zeek::Args args; + double timestamp; // TODO: This should be more generic, possibly holding a + // vector of key/value metadata, rather than just + // the timestamp. + + std::string_view HandlerName() const { return handler->Name(); } + const EventHandlerPtr& Handler() const { return handler; } +}; + +/** + * Validate that the provided args are suitable for handler. + * + * @param handler An event handler. + * @param args The provide arguments for the handler as a span. + * + * @return A zeek::Args instance if successful, else std::nullopt. + */ +std::optional check_args(const zeek::FuncValPtr& handler, zeek::ArgsSpan args); +} // namespace detail + +/** + * Interface for a cluster backend implementing publish/subscribe communication. + * Serialization of events should be done using the serializers injected into + * the constructor. + */ +class Backend { +public: + virtual ~Backend() = default; + + /** + * Hook invoked after all scripts have been parsed. + */ + void InitPostScript() { DoInitPostScript(); } + + /** + * Method invoked from the Cluster::Backend::__init() bif. + */ + bool Init() { return DoInit(); } + + /** + * Hook invoked when Zeek is about to terminate. + */ + void Terminate() { DoTerminate(); } + + /** + * Create a cluster::detail::Event instance given an event handler and the + * script function arguments to it. + * + * @param handler A function val representing an event handler. + * @param args The arguments for the event handler. + * @param timestamp The network time to add to the event as metadata. + */ + std::optional MakeClusterEvent(FuncValPtr handler, ArgsSpan args, double timestamp = 0.0) const; + + /** + * Publish a cluster::detail::Event instance to a given topic. + * + * @param topic The topic string to publish the event to. + * @param event The event to publish. + * + * @return true if the event was successfully published. + */ + bool PublishEvent(const std::string& topic, const cluster::detail::Event& event) { + return DoPublishEvent(topic, event); + } + + /** + * Register interest in messages that use a certain topic prefix. + * + * @param topic_prefix a prefix to match against remote message topics. + * @return true if it's a new event subscription and it is now registered. + */ + bool Subscribe(const std::string& topic_prefix) { return DoSubscribe(topic_prefix); } + + /** + * Unregister interest in messages on a certain topic. + * + * @param topic_prefix a prefix previously supplied to Subscribe() + * @return true if interest in topic prefix is no longer advertised. + */ + bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + + /** + * Publish multiple log records. + * + * All log records belong to the (stream, filter, path) tuple that is + * described by \a header. + * + * @param header Fixed information about the stream, writer, filter and schema of the records. + * @param records A span of logging::detail::LogRecords to be published. + */ + bool PublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records) { + return DoPublishLogWrites(header, records); + } + +protected: + /** + * Constructor. + */ + Backend(std::unique_ptr es, std::unique_ptr ls) + : event_serializer(std::move(es)), log_serializer(std::move(ls)) {} + + /** + * Process an incoming event message. + */ + bool ProcessEventMessage(const std::string_view& topic, const std::string_view& format, + detail::byte_buffer_span payload); + + /** + * Process an incoming log message. + */ + bool ProcessLogMessage(const std::string_view& format, detail::byte_buffer_span payload); + +private: + /** + * Called after all Zeek scripts have been loaded. + * + * A cluster backend should initialize itself based on script variables, + * register any IO sources, etc. It should not yet start any connections, that + * should happen in DoInit() instead. + */ + virtual void DoInitPostScript() = 0; + + /** + * Called from Cluster::Backend::__init(). + * + * Backend implementations should start connections with + * remote systems or other nodes, open listening ports or + * do whatever is needed to be functional. + */ + virtual bool DoInit() = 0; + + /** + * Called at termination time. + * + * This should be used to shut down connectivity. Any last messages + * to be published should be sent from script land, rather than in + * DoTerminate(). A backend may wait for a bounded and configurable + * amount of time to flush any last messages out. + */ + virtual void DoTerminate() = 0; + + /** + * Publish a cluster::detail::Event to the given topic. + * + * The default implementation serializes to a detail::byte_buffer and + * calls DoPublishEvent() with the resulting buffer. + * + * This hook method only exists for the existing Broker implementation that + * short-circuits serialization. Other backends should not override this. + */ + virtual bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event); + + /** + * Send a serialized cluster::detail::Event to the given topic. + * + * Semantics of this call are "fire-and-forget". An implementation should + * ensure the message is enqueued for delivery, but may not have been sent out + * let alone received by any subscribers of the topic when this call returns. + * + * If the backend has not established a connection, the published message is + * allowed to be discarded. + * + * @param topic a topic string associated with the message. + * @param format the format/serializer used for serialization of the message payload. + * @param buf the serialized Event. + * @return true if the message has been published successfully. + */ + virtual bool DoPublishEvent(const std::string& topic, const std::string& format, + const detail::byte_buffer& buf) = 0; + + /** + * Register interest in messages that use a certain topic prefix. + * + * If the backend hasn't yet established a connection, any subscriptions + * should be queued until they can be processed. + * + * @param topic_prefix a prefix to match against remote message topics. + * + * @return true if it's a new event subscription and now registered. + */ + virtual bool DoSubscribe(const std::string& topic_prefix) = 0; + + /** + * Unregister interest in messages on a certain topic. + * + * @param topic_prefix a prefix previously supplied to Subscribe() + * @return true if interest in topic prefix is no longer advertised. + */ + virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0; + + /** + * Serialize a log batch, then forward it to DoPublishLogWrites() below. + + * The default implementation serializes to a detail::byte_buffer and + * calls DoPublishLogWrites() with the resulting buffer. + * + * This hook method only exists for the existing Broker implementation that + * short-circuits serialization. Other backends should not override this. + * + * @param header The header describing the writer frontend where the records originate from. + * @param records Records to be serialized. + * + * @return true if the message has been published successfully. + */ + virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, + zeek::Span records); + + /** + * Send out a serialized log batch. + * + * A backend implementation may use the values from \a header to + * construct a topic to write the logs to. + * + * Semantics of this call are "fire-and-forget". An implementation should + * ensure the message is enqueued for delivery, but may not have been sent out + * let alone received by the destination when this call returns. + * + * Sharding log writes to multiple receivers (logger nodes) is backend specific. + * Broker, for example, involves Zeek script layer cluster pool concepts. + * Other backends may use appropriate native mechanisms that may be more + * efficient. + * + * @param header the header describing the writer frontend where the records originate from. + * @param format the format/serializer used for serialization of the message payload. + * @param buf the serialized log batch. This is the message payload. + * @return true if the message has been published successfully. + */ + virtual bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, + detail::byte_buffer& buf) = 0; + + std::unique_ptr event_serializer; + std::unique_ptr log_serializer; +}; + +/** + * A cluster backend may receive event and log messages asynchronously + * through threads. The following structs can be used with QueueForProcessing() + * to enqueue these messages onto the main IO loop for processing. + * + * EventMessage and LogMessage are processed in a generic fashion in + * the Process() method. The BackendMessage can be intercepted with + * DoProcessBackendMessage(). DoProcessBackendMessage() is guaranteed + * to run on Zeek's main thread. + */ + +/** + * A message on a topic for events was received. + */ +struct EventMessage { + std::string topic; + std::string format; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +/** + * A message that represents log records. + */ +struct LogMessage { + std::string format; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +/** + * A backend specific message. + * + * This provides a mechanism to transfer auxiliary information + * from a background thread to Zeek's main thread. + */ +struct BackendMessage { + int tag; + detail::byte_buffer payload; + + auto payload_span() const { return Span(payload.data(), payload.size()); }; +}; + +using QueueMessage = std::variant; +using QueueMessages = std::vector; + +/** + * Support for backends that use background threads or invoke + * callbacks on non-main threads. + */ +class ThreadedBackend : public Backend, public zeek::iosource::IOSource { +public: + using Backend::Backend; + +protected: + /** + * To be used by implementations to enqueue messages for processing on the IO loop. + * + * It's safe to call this method from any thread. + * + * @param messages Messages to be enqueued. + */ + void QueueForProcessing(QueueMessages&& messages); + + void Process() override; + + double GetNextTimeout() override { return -1; } + + /** + * The DoInitPostScript() implementation of ThreadedBackend + * registers itself as a non-counting IO source. + * + * Classes deriving from ThreadedBackend and providing their + * own DoInitPostScript() method should invoke the ThreadedBackend's + * implementation to register themselves as a non-counting + * IO source with the IO loop. + */ + void DoInitPostScript() override; + + /** + * The default DoInit() implementation of ThreadedBackend + * registers itself as a counting IO source to keep the IO + * loop alive after initialization. + * + * Classes deriving from ThreadedBackend and providing their + * own DoInit() method should invoke the ThreadedBackend's + * implementation to register themselves as a counting + * IO source with the IO loop. + */ + bool DoInit() override; + +private: + /** + * Process a backend specific message queued as BackendMessage. + */ + bool ProcessBackendMessage(int tag, detail::byte_buffer_span payload); + + /** + * If a cluster backend produces messages of type BackendMessage, + * this method will be invoked by the main thread to process it. + */ + virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; }; + + // Members used for communication with the main thread. + std::mutex messages_mtx; + std::vector messages; + zeek::detail::Flare messages_flare; +}; + + +// Cluster backend instance used for publish() and subscribe() calls. +extern Backend* backend; + +} // namespace cluster +} // namespace zeek diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt new file mode 100644 index 0000000000..0395d28786 --- /dev/null +++ b/src/cluster/CMakeLists.txt @@ -0,0 +1,9 @@ +zeek_add_subdir_library( + cluster + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + SOURCES + Component.cc + Backend.cc + Manager.cc) diff --git a/src/cluster/Component.cc b/src/cluster/Component.cc new file mode 100644 index 0000000000..09dd74c938 --- /dev/null +++ b/src/cluster/Component.cc @@ -0,0 +1,56 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/Component.h" + +#include "zeek/Desc.h" +#include "zeek/Tag.h" +#include "zeek/cluster/Manager.h" +#include "zeek/util.h" + +using namespace zeek::cluster; + +BackendComponent::BackendComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::CLUSTER_BACKEND, name, 0, cluster::manager->Backends().GetTagType()) { + factory = arg_factory; +} + +void BackendComponent::Initialize() { + InitializeTag(); + cluster::manager->Backends().RegisterComponent(this, "CLUSTER_BACKEND_"); +} + +void BackendComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::CLUSTER_BACKEND_"); + d->Add(CanonicalName()); +} + +EventSerializerComponent::EventSerializerComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::EVENT_SERIALIZER, name, 0, + cluster::manager->EventSerializers().GetTagType()) { + factory = arg_factory; +} + +void EventSerializerComponent::Initialize() { + InitializeTag(); + cluster::manager->EventSerializers().RegisterComponent(this, "EVENT_SERIALIZER_"); +} + +void EventSerializerComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::EVENT_SERIALIZER_"); + d->Add(CanonicalName()); +} + +LogSerializerComponent::LogSerializerComponent(const std::string& name, factory_callback arg_factory) + : plugin::Component(plugin::component::LOG_SERIALIZER, name, 0, cluster::manager->LogSerializers().GetTagType()) { + factory = arg_factory; +} + +void LogSerializerComponent::Initialize() { + InitializeTag(); + cluster::manager->LogSerializers().RegisterComponent(this, "LOG_SERIALIZER_"); +} + +void LogSerializerComponent::DoDescribe(ODesc* d) const { + d->Add("Cluster::LOG_SERIALIZER_"); + d->Add(CanonicalName()); +} diff --git a/src/cluster/Component.h b/src/cluster/Component.h new file mode 100644 index 0000000000..944557feae --- /dev/null +++ b/src/cluster/Component.h @@ -0,0 +1,131 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/plugin/Component.h" + +namespace zeek::cluster { + +class BackendComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(std::unique_ptr, + std::unique_ptr); + + /** + * Constructor. + * + * @param name The name of the cluster backend. A Zeek script-level enum + * with the name Cluster::CLUSTER_BACKEND_ will be created. + * + * @param factory A factory function to instantiate instances of the + * cluster backend. + */ + BackendComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~BackendComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; + + +class EventSerializerComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(); + + /** + * Constructor. + * + * @param name The name of the event serializer. A Zeek script-level enum + * with the name Cluster::EVENT_SERIALIZER_ will be created. + * + * @param factory A factory function to instantiate instances of the + * event serializer. + */ + EventSerializerComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~EventSerializerComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; + +class LogSerializerComponent : public plugin::Component { +public: + using factory_callback = std::unique_ptr (*)(); + + /** + * Constructor. + * + * @param name The name of the log serializer. A Zeek script-level enum + * with the name Cluster::LOG_SERIALIZER_ will be created. + * + * @param factory A factory function to instantiate instances of the + * log serializer. + */ + LogSerializerComponent(const std::string& name, factory_callback factory); + + /** + * Destructor. + */ + ~LogSerializerComponent() override = default; + + /** + * Initialization function. This function has to be called before any + * plugin component functionality is used; it is used to add the + * plugin component to the list of components and to initialize tags + */ + void Initialize() override; + + /** + * Returns the analyzer's factory function. + */ + factory_callback Factory() const { return factory; } + +protected: + void DoDescribe(ODesc* d) const override; + +private: + factory_callback factory; +}; +} // namespace zeek::cluster diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc new file mode 100644 index 0000000000..eff6196164 --- /dev/null +++ b/src/cluster/Manager.cc @@ -0,0 +1,27 @@ +#include "zeek/cluster/Manager.h" + +#include "zeek/cluster/Serializer.h" + +using namespace zeek::cluster; + +Manager::Manager() + : backends(plugin::ComponentManager("Cluster", "BackendTag")), + event_serializers(plugin::ComponentManager("Cluster", "EventSerializerTag")), + log_serializers(plugin::ComponentManager("Cluster", "LogSerializerTag")) {} + +std::unique_ptr Manager::InstantiateBackend(const zeek::EnumValPtr& tag, + std::unique_ptr event_serializer, + std::unique_ptr log_serializer) { + const BackendComponent* c = Backends().Lookup(tag); + return c ? c->Factory()(std::move(event_serializer), std::move(log_serializer)) : nullptr; +} + +std::unique_ptr Manager::InstantiateEventSerializer(const zeek::EnumValPtr& tag) { + const EventSerializerComponent* c = EventSerializers().Lookup(tag); + return c ? c->Factory()() : nullptr; +} + +std::unique_ptr Manager::InstantiateLogSerializer(const zeek::EnumValPtr& tag) { + const LogSerializerComponent* c = LogSerializers().Lookup(tag); + return c ? c->Factory()() : nullptr; +} diff --git a/src/cluster/Manager.h b/src/cluster/Manager.h new file mode 100644 index 0000000000..fd4a44f31c --- /dev/null +++ b/src/cluster/Manager.h @@ -0,0 +1,80 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include + +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/plugin/ComponentManager.h" + +namespace zeek::cluster { + +/** + * Manager to allow registration of cluster components. + * + * This manager holds three component manager for event and log serializers + * components, as well as backend components themselves. + */ +class Manager { +public: + Manager(); + + /** + * Instantiate a cluster backend with the given enum value and + * pre-instantiated event and log serializers. + * + * @param tag The enum value identifying the backend. + * @param event_serializer The event serializer to inject. + * @param log_serializer The log serializer to inject. + * + * @return New ClusterBackend instance, or null if there's no such component. + */ + std::unique_ptr InstantiateBackend(const EnumValPtr& tag, + std::unique_ptr event_serializer, + std::unique_ptr log_serializer); + + /** + * Instantiate a event serializer with the given enum value. + * + * @param tag The enum value identifying a serializer. + * + * @return New Serializer instance, or null if there's no such component. + */ + std::unique_ptr InstantiateEventSerializer(const EnumValPtr& tag); + + /** + * Instantiate a log serializer with the given enum value. + * + * @param tag The enum value identifying a serializer. + * + * @return New Serializer instance, or null if there's no such component. + */ + std::unique_ptr InstantiateLogSerializer(const EnumValPtr& tag); + + /** + * @return The ComponentManager for backends. + */ + plugin::ComponentManager& Backends() { return backends; }; + + /** + * @return The ComponentManager for event serializers. + */ + plugin::ComponentManager& EventSerializers() { return event_serializers; }; + + /** + * @return The ComponentManager for serializers. + */ + plugin::ComponentManager& LogSerializers() { return log_serializers; }; + +private: + plugin::ComponentManager backends; + plugin::ComponentManager event_serializers; + plugin::ComponentManager log_serializers; +}; + +// This manager instance only exists for plugins to register components, +// not for actual cluster functionality. +extern Manager* manager; + +} // namespace zeek::cluster diff --git a/src/cluster/Serializer.h b/src/cluster/Serializer.h new file mode 100644 index 0000000000..2be72ceca2 --- /dev/null +++ b/src/cluster/Serializer.h @@ -0,0 +1,107 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Interfaces to be implemented by event and log serializer components. + +#pragma once + +#include +#include +#include + +#include "zeek/Span.h" +#include "zeek/logging/Types.h" + +namespace zeek::cluster { + +namespace detail { +class Event; + +using byte_buffer = std::vector; +using byte_buffer_span = Span; + +} // namespace detail + + +/** + * This class handles encoding of events into byte buffers and back. + * + * An event and its parameters can be serialized as a message which + * another node can unserialize and then enqueue as an event. + */ +class EventSerializer { +public: + virtual ~EventSerializer() = default; + + /** + * Serialize an event into the given byte buffer. + * + * @param buf The buffer to use for serialization. + * @param event The event to serialize. + * + * @returns True on success, false in exceptional cases (e.g. unsupported serialization). + */ + virtual bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) = 0; + + /** + * Unserialize an event from a given byte buffer. + * + * @param buf A span representing a received remote event. + * + * @returns The event, or std::nullopt on error. + */ + virtual std::optional UnserializeEvent(detail::byte_buffer_span buf) = 0; + + /** + * @returns The name of this event serializer instance. + */ + const std::string& Name() { return name; } + +protected: + /** + * Constructor. + */ + EventSerializer(std::string name) : name(std::move(name)) {} + +private: + std::string name; +}; + +/** + * Interface for a serializer for logging::LogRecord instances. + */ +class LogSerializer { +public: + /** + * Constructor. + */ + explicit LogSerializer(std::string name) : name(std::move(name)) {}; + + virtual ~LogSerializer() = default; + + /** + * Serialize log records into a byte buffer. + * + * @param buf The buffer to serialize into. + * @param header The log batch header. + * @param records The actual log writes. + */ + virtual bool SerializeLogWrite(detail::byte_buffer& buf, const logging::detail::LogWriteHeader& header, + zeek::Span records) = 0; + + /** + * Unserialize log writes from a given byte buffer. + * + * @param buf The span representing received log writes. + */ + virtual std::optional UnserializeLogWrite(detail::byte_buffer_span buf) = 0; + + /** + * @returns The name of this log serializer instance. + */ + const std::string& Name() { return name; } + +private: + std::string name; +}; + +} // namespace zeek::cluster diff --git a/src/logging/CMakeLists.txt b/src/logging/CMakeLists.txt index 7690998520..d064fb278a 100644 --- a/src/logging/CMakeLists.txt +++ b/src/logging/CMakeLists.txt @@ -5,6 +5,7 @@ zeek_add_subdir_library( Manager.cc WriterBackend.cc WriterFrontend.cc + Types.cc BIFS logging.bif) diff --git a/src/logging/Types.cc b/src/logging/Types.cc new file mode 100644 index 0000000000..9fab8baf05 --- /dev/null +++ b/src/logging/Types.cc @@ -0,0 +1,48 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/logging/Types.h" + +#include "zeek/Desc.h" +#include "zeek/Type.h" +#include "zeek/Val.h" + +namespace zeek::logging::detail { + +LogWriteHeader::LogWriteHeader() = default; + +LogWriteHeader::LogWriteHeader(EnumValPtr arg_stream_id, EnumValPtr arg_writer_id, std::string arg_filter_name, + std::string arg_path) + : stream_id(std::move(arg_stream_id)), + writer_id(std::move(arg_writer_id)), + filter_name(std::move(arg_filter_name)), + path(std::move(arg_path)) { + stream_name = obj_desc_short(stream_id.get()); + writer_name = obj_desc_short(writer_id.get()); +} + +LogWriteHeader& LogWriteHeader::operator=(const LogWriteHeader& other) = default; + +LogWriteHeader::~LogWriteHeader() = default; + +bool LogWriteHeader::PopulateEnumVals() { + static const auto& stream_id_type = zeek::id::find_type("Log::ID"); + static const auto& writer_id_type = zeek::id::find_type("Log::Writer"); + + if ( stream_name.empty() || writer_name.empty() ) + return false; + + auto sid = stream_id_type->Lookup(stream_name); + if ( sid < 0 ) + return false; + + auto wid = writer_id_type->Lookup(writer_name); + if ( wid < 0 ) + return false; + + stream_id = stream_id_type->GetEnumVal(sid); + writer_id = writer_id_type->GetEnumVal(wid); + + return true; +} + +} // namespace zeek::logging::detail diff --git a/src/logging/Types.h b/src/logging/Types.h new file mode 100644 index 0000000000..06d83897a8 --- /dev/null +++ b/src/logging/Types.h @@ -0,0 +1,99 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +// Header for types shared between cluster and logging components. +// +// Currently these are in detail, but over time may move into the +// public namespace once established. + +#pragma once + +#include +#include + +#include "zeek/IntrusivePtr.h" +#include "zeek/threading/SerialTypes.h" + +namespace zeek { +class EnumVal; +using EnumValPtr = IntrusivePtr; + +namespace logging::detail { + +/** + * A single log record. + * + * This is what a Zeek record value passed into Log::write() + * is converted into before passed to a local log writer or + * send via the cluster to a remote node. + */ +using LogRecord = std::vector; + +/** + * A struct holding all necessary information that relates to + * log writes for a given path. These values are constant over + * the lifetime of a \a WriterFrontend. + * + * Note that the constructor, destructor and assignment operator are + * defaulted in Types.cc. This is to avoid a Val.h include here. + */ +struct LogWriteHeader { + /** + * Default constructor. + */ + LogWriteHeader(); + + /** + * Constructor that populates stream_name and writer_name. + * + * @param stream_id Enum value representing the stream. + * @param writer_id Enum value representing the writer. + * @param filter_name The filter name of the writer frontend. + * @param path The path of the writer frontend. + */ + LogWriteHeader(EnumValPtr stream_id, EnumValPtr writer_id, std::string filter_name, std::string path); + + /** + * Assignment operator. + */ + LogWriteHeader& operator=(const LogWriteHeader& other); + + /** + * Destructor. + */ + ~LogWriteHeader(); + + /** + * Helper to populate stream_id and writer_id after the + * stream_name and writer_name members were set. + * + * @return true if matching enum values were, else false. + */ + bool PopulateEnumVals(); + + EnumValPtr stream_id; // The enum identifying the stream. + std::string stream_name; // The name of the stream, e.g. Conn::LOG + EnumValPtr writer_id; // The enum identifying the writer. Mostly for backwards compat with broker. + std::string writer_name; // The name of the writer, e.g. WRITER_ASCII. + std::string filter_name; // The name of the filter. + std::string path; // The path as configured or produced by the filter's path_func. + std::vector fields; // The schema describing a log record. +}; + +/** + * A batch of log records including their header. + * + * This is the object created when unserialzing a log-write + * from another cluster node. + * + * This layout currently implies that during de-serialization + * data is copied into LogRecord / threading::Value structures. + * If the need for zero-copy approaches arises, might need a + * different approach to free the underlying buffer (capnproto). + */ +struct LogWriteBatch { + LogWriteHeader header; + std::vector records; +}; + +} // namespace logging::detail +} // namespace zeek diff --git a/src/plugin/Component.cc b/src/plugin/Component.cc index 5740d7f19a..96476f2aea 100644 --- a/src/plugin/Component.cc +++ b/src/plugin/Component.cc @@ -39,6 +39,12 @@ void Component::Describe(ODesc* d) const { case component::SESSION_ADAPTER: d->Add("Session Adapter"); break; + case component::CLUSTER_BACKEND: d->Add("Cluster Backend"); break; + + case component::EVENT_SERIALIZER: d->Add("Event Serializer"); break; + + case component::LOG_SERIALIZER: d->Add("Log Serializer"); break; + default: reporter->InternalWarning("unknown component type in plugin::Component::Describe"); d->Add(""); diff --git a/src/plugin/Component.h b/src/plugin/Component.h index d4eb296bf3..f61fe0555b 100644 --- a/src/plugin/Component.h +++ b/src/plugin/Component.h @@ -21,15 +21,18 @@ namespace component { * Component types. */ enum Type { - READER, /// An input reader (not currently used). - WRITER, /// A logging writer (not currently used). - ANALYZER, /// A protocol analyzer. - PACKET_ANALYZER, /// A packet analyzer. - FILE_ANALYZER, /// A file analyzer. - IOSOURCE, /// An I/O source, excluding packet sources. - PKTSRC, /// A packet source. - PKTDUMPER, /// A packet dumper. - SESSION_ADAPTER, /// A session adapter analyzer. + READER, /// An input reader (not currently used). + WRITER, /// A logging writer (not currently used). + ANALYZER, /// A protocol analyzer. + PACKET_ANALYZER, /// A packet analyzer. + FILE_ANALYZER, /// A file analyzer. + IOSOURCE, /// An I/O source, excluding packet sources. + PKTSRC, /// A packet source. + PKTDUMPER, /// A packet dumper. + SESSION_ADAPTER, /// A session adapter analyzer. + CLUSTER_BACKEND, /// A cluster backend. + EVENT_SERIALIZER, /// A serializer for events, used by cluster backends. + LOG_SERIALIZER, /// A serializer for log batches, used by cluster backends. }; } // namespace component diff --git a/src/threading/SerialTypes.h b/src/threading/SerialTypes.h index dabf27210e..1a0ea21f1b 100644 --- a/src/threading/SerialTypes.h +++ b/src/threading/SerialTypes.h @@ -19,13 +19,21 @@ namespace zeek::threading { * Definition of a log file, i.e., one column of a log stream. */ struct Field { - const char* name; //! Name of the field. + const char* name = nullptr; //! Name of the field. //! Needed by input framework. Port fields have two names (one for the //! port, one for the type), and this specifies the secondary name. - const char* secondary_name; - TypeTag type; //! Type of the field. - TypeTag subtype; //! Inner type for sets and vectors. - bool optional; //! True if field is optional. + const char* secondary_name = nullptr; + TypeTag type = TYPE_ERROR; //! Type of the field. + TypeTag subtype = TYPE_ERROR; //! Inner type for sets and vectors. + bool optional = false; //! True if field is optional. + + + /** + * Constructor. + * + * For Read() usage. Initializes with TYPE_ERROR. + */ + Field() = default; /** * Constructor. @@ -47,6 +55,23 @@ struct Field { subtype(other.subtype), optional(other.optional) {} + /** + * Move constructor. + */ + Field(Field&& other) noexcept { + name = other.name; + secondary_name = other.secondary_name; + type = other.type; + subtype = other.subtype; + optional = other.optional; + + other.name = nullptr; + other.secondary_name = nullptr; + other.type = TYPE_ERROR; + other.subtype = TYPE_ERROR; + other.optional = false; + } + ~Field() { delete[] name; delete[] secondary_name; @@ -91,10 +116,6 @@ struct Field { * thread-safe. */ std::string TypeName() const; - -private: - // Force usage of constructor above. - Field() {} }; /** diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 8fdfd5111a..293bc3febe 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -50,6 +50,7 @@ #include "zeek/analyzer/Manager.h" #include "zeek/binpac_zeek.h" #include "zeek/broker/Manager.h" +#include "zeek/cluster/Manager.h" #include "zeek/file_analysis/Manager.h" #include "zeek/input.h" #include "zeek/input/Manager.h" @@ -177,6 +178,8 @@ zeek::detail::trigger::Manager* zeek::detail::trigger_mgr = nullptr; zeek::spicy::Manager* zeek::spicy_mgr = nullptr; #endif +zeek::cluster::Manager* zeek::cluster::manager = nullptr; + std::vector zeek::detail::zeek_script_prefixes; zeek::detail::Stmt* zeek::detail::stmts = nullptr; zeek::EventRegistry* zeek::event_registry = nullptr; @@ -389,6 +392,7 @@ static void terminate_zeek() { delete packet_mgr; delete analyzer_mgr; delete file_mgr; + delete cluster::manager; // broker_mgr, timer_mgr, supervisor, and dns_mgr are deleted via iosource_mgr delete iosource_mgr; delete event_registry; @@ -666,6 +670,7 @@ SetupResult setup(int argc, char** argv, Options* zopts) { log_mgr = new logging::Manager(); input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); + cluster::manager = new cluster::Manager(); auto broker_real_time = ! options.pcap_file && ! options.deterministic_mode; broker_mgr = new Broker::Manager(broker_real_time); trigger_mgr = new trigger::Manager(); diff --git a/src/zeekygen/ScriptInfo.cc b/src/zeekygen/ScriptInfo.cc index 817174cca9..70347baa88 100644 --- a/src/zeekygen/ScriptInfo.cc +++ b/src/zeekygen/ScriptInfo.cc @@ -313,6 +313,14 @@ void ScriptInfo::DoInitPostScript() { const auto& id = zeek::detail::global_scope()->Find("Log::Writer"); types.push_back(new IdentifierInfo(id, this)); } + else if ( name == "base/frameworks/cluster/main.zeek" ) { + const auto& backend_id = zeek::detail::global_scope()->Find("Cluster::BackendTag"); + types.push_back(new IdentifierInfo(backend_id, this)); + const auto& event_serializer_id = zeek::detail::global_scope()->Find("Cluster::EventSerializerTag"); + types.push_back(new IdentifierInfo(event_serializer_id, this)); + const auto& log_serializer_id = zeek::detail::global_scope()->Find("Cluster::LogSerializerTag"); + types.push_back(new IdentifierInfo(log_serializer_id, this)); + } } vector ScriptInfo::GetComments() const { return comments; } diff --git a/testing/btest/Baseline/cluster.backend-enum/out b/testing/btest/Baseline/cluster.backend-enum/out new file mode 100644 index 0000000000..1948596eaa --- /dev/null +++ b/testing/btest/Baseline/cluster.backend-enum/out @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Zeek::Cluster_Backend_Broker - Cluster backend using Broker (built-in) + [Cluster Backend] BROKER (Cluster::CLUSTER_BACKEND_BROKER) + +Cluster::CLUSTER_BACKEND_BROKER, Cluster::BackendTag diff --git a/testing/btest/Baseline/scripts.base.files.x509.files/files.log b/testing/btest/Baseline/scripts.base.files.x509.files/files.log index e64dfc52c0..ce19924fa1 100644 --- a/testing/btest/Baseline/scripts.base.files.x509.files/files.log +++ b/testing/btest/Baseline/scripts.base.files.x509.files/files.log @@ -7,10 +7,10 @@ #open XXXX-XX-XX-XX-XX-XX #fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid md5 sha1 sha256 #types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string string string string -XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 -XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 -XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d -XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 SHA256,X509,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FgN3AE3of2TRIqaeQe CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX Fv2Agc4z5boBOacQi6 CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Ftmyeg2qgI2V38Dt3g CHhAvVGS1DHFjwGM9 192.168.4.149 60623 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 +XXXXXXXXXX.XXXXXX FUFNf84cduA0IJCp07 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-user-cert - 0.000000 F F 1859 - 0 0 F - 7af07aca6d5c6e8e87fe4bb34786edc0 548b9e03bc183d1cd39f93a37985cb3950f8f06f 6bacfa4536150ed996f2b0c05ab6e345a257225f449aeb9d2018ccd88f4ede43 +XXXXXXXXXX.XXXXXX F1H4bd2OKGbLPEdHm4 ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 1032 - 0 0 F - 9e4ac96474245129d9766700412a1f89 d83c1a7f4d0446bb2081b81a1670f8183451ca24 a047a37fa2d2e118a4f5095fe074d6cfe0e352425a7632bf8659c03919a6c81d +XXXXXXXXXX.XXXXXX Fgsbci2jxFXYMOHOhi ClEkJM2Vm5giqnMf4h 192.168.4.149 60624 74.125.239.129 443 SSL 0 X509,SHA256,SHA1,MD5 application/x-x509-ca-cert - 0.000000 F F 897 - 0 0 F - 2e7db2a31d0e3da4b25f49b9542a2e1a 7359755c6df9a0abc3060bce369564c8ec4542a3 3c35cc963eb004451323d3275d05b353235053490d9cd83729a2faf5e7ca1cc0 #close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout new file mode 100644 index 0000000000..b966781c80 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/manager-1..stdout @@ -0,0 +1,52 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +1st stuff +hrw, 0, T +hrw, 1, T +hrw, 2, T +hrw, 3, T +hrw, 13, T +hrw, 37, T +hrw, 42, T +hrw, 101, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +2nd stuff +hrw, 0, T +hrw, 1, T +hrw, 2, T +hrw, 3, T +hrw, 13, T +hrw, 37, T +hrw, 42, T +hrw, 101, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +rr, T +no stuff +hrw, 0, F +hrw, 1, F +hrw, 2, F +hrw, 3, F +hrw, 13, F +hrw, 37, F +hrw, 42, F +hrw, 101, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F +rr, F diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout new file mode 100644 index 0000000000..c4e16958ba --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-1..stdout @@ -0,0 +1,10 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got distributed event hrw, 1 +got distributed event hrw, 13 +got distributed event hrw, 37 +got distributed event hrw, 42 +got distributed event hrw, 101 +got distributed event rr, 0 +got distributed event rr, 2 +got distributed event rr, 13 +got distributed event rr, 42 diff --git a/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout new file mode 100644 index 0000000000..fec1ae9fe4 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.cluster.topic_distribution_make_event/proxy-2..stdout @@ -0,0 +1,24 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got distributed event hrw, 0 +got distributed event hrw, 2 +got distributed event hrw, 3 +got distributed event rr, 1 +got distributed event rr, 3 +got distributed event rr, 37 +got distributed event rr, 101 +got distributed event hrw, 0 +got distributed event hrw, 1 +got distributed event hrw, 2 +got distributed event hrw, 3 +got distributed event hrw, 13 +got distributed event hrw, 37 +got distributed event hrw, 42 +got distributed event hrw, 101 +got distributed event rr, 0 +got distributed event rr, 1 +got distributed event rr, 2 +got distributed event rr, 3 +got distributed event rr, 13 +got distributed event rr, 37 +got distributed event rr, 42 +got distributed event rr, 101 diff --git a/testing/btest/btest.cfg b/testing/btest/btest.cfg index 9b459a0a05..405ad41f3c 100644 --- a/testing/btest/btest.cfg +++ b/testing/btest/btest.cfg @@ -4,7 +4,7 @@ build_dir = build [btest] -TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr +TestDirs = af_packet doc bifs language core scripts coverage signatures plugins broker spicy supervisor telemetry javascript misc opt dns_mgr cluster TmpDir = %(testbase)s/.tmp BaselineDir = %(testbase)s/Baseline IgnoreDirs = .svn CVS .tmp diff --git a/testing/btest/cluster/backend-enum.zeek b/testing/btest/cluster/backend-enum.zeek new file mode 100644 index 0000000000..04d4b89963 --- /dev/null +++ b/testing/btest/cluster/backend-enum.zeek @@ -0,0 +1,10 @@ +# @TEST-DOC: Test cluster backend enum +# +# @TEST-EXEC: zeek -NN Zeek::Cluster_Backend_Broker >>out +# @TEST-EXEC: zeek -b %INPUT >>out +# @TEST-EXEC: btest-diff out + +event zeek_init() + { + print Cluster::CLUSTER_BACKEND_BROKER, type_name(Cluster::CLUSTER_BACKEND_BROKER); + } diff --git a/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek b/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek new file mode 100644 index 0000000000..e966fc73ce --- /dev/null +++ b/testing/btest/scripts/base/frameworks/cluster/topic_distribution_make_event.zeek @@ -0,0 +1,99 @@ +# @TEST-DOC: Broker::make_event() together with Cluster::publish_hrw() and Cluster::publish_rr() +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: BROKER_PORT2 +# @TEST-PORT: BROKER_PORT3 +# @TEST-PORT: BROKER_PORT4 +# @TEST-PORT: BROKER_PORT5 +# +# @TEST-EXEC: zeek -b --parse-only %INPUT +# @TEST-EXEC: btest-bg-run manager-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run proxy-1 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-1 zeek -b %INPUT +# @TEST-EXEC: btest-bg-run proxy-2 ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=proxy-2 zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff manager-1/.stdout +# @TEST-EXEC: btest-diff proxy-1/.stdout +# @TEST-EXEC: btest-diff proxy-2/.stdout + +@load policy/frameworks/cluster/experimental + +@TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], + ["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager-1"], + ["proxy-2"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager-1"], +}; +@TEST-END-FILE + +global q = 0; + +event go_away() + { + terminate(); + } + +event distributed_event_hrw(c: count) + { + print "got distributed event hrw", c; + } + +event distributed_event_rr(c: count) + { + print "got distributed event rr", c; + } + +function send_stuff(heading: string) + { + print heading; + + local v: vector of count = vector(0, 1, 2, 3, 13, 37, 42, 101); + local e: Broker::Event; + + for ( i in v ) + { + e = Broker::make_event(distributed_event_hrw, v[i]); + print "hrw", v[i], Cluster::publish_hrw(Cluster::proxy_pool, v[i], e); + } + + local rr_key = "test"; + + for ( i in v ) + { + e = Broker::make_event(distributed_event_rr, v[i]); + print "rr", Cluster::publish_rr(Cluster::proxy_pool, rr_key, e); + } + } + +event Cluster::Experimental::cluster_started() + { + if ( Cluster::node != "manager-1" ) + return; + + send_stuff("1st stuff"); + local e = Broker::make_event(go_away); + Broker::publish(Cluster::node_topic("proxy-1"), e); + } + +event Cluster::node_down(name: string, id: string) + { + if ( Cluster::node != "manager-1" ) + return; + + if ( name == "proxy-1" ) + { + send_stuff("2nd stuff"); + local e = Broker::make_event(go_away); + Broker::publish(Cluster::node_topic("proxy-2"), e); + } + + if ( name == "proxy-2" ) + { + send_stuff("no stuff"); + terminate(); + } + } + +event Cluster::node_down(name: string, id: string) + { + if ( name == "manager-1" ) + terminate(); + }