diff --git a/CHANGES b/CHANGES index c26df4fbea..b9834f78a0 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,44 @@ +7.1.0-dev.629 | 2024-11-26 17:45:08 +0100 + + * ci/test.sh: Run doctest with TZ=UTC (Arne Welzel, Corelight) + + Broker's JSON serialization is TZ dependent (which seems a bug). For now + do the same as we do in btest.cfg and run doctests with TZ set to UTC. + + Reported in zeek/broker#434. + + * cluster/setup-connections: Switch to Cluster::subscribe(), short-circuit broker (Arne Welzel, Corelight) + + For the time being, this is easiest, otherwise we'd need to + conditionally load a broker-specific policy script based on + Cluster::backend being set. + + * cluster/serializer: Add Broker based event serializers (Arne Welzel, Corelight) + + This adds the first event serializers that use + broker functionality. Binary and JSON formats. + + * cluster: Add Cluster scoped bifs (Arne Welzel, Corelight) + + ... and a broker based test using Cluster::publish() and + Cluster::subscribe(). + + * Reporter: Add ScriptLocationScope helper (Arne Welzel, Corelight) + + * init-bare/zeek-setup: Add Cluster::backend const &redef (Arne Welzel, Corelight) + + * broker: Implement cluster::Backend interface (Arne Welzel, Corelight) + + * Broker: Fix some error messages (Arne Welzel, Corelight) + + * broker: Remove MakeEvent(ArgsSpan) (Arne Welzel, Corelight) + + This was added previously in the 7.1 cycle. Now that MakeEvent() was + removed from cluster::Backend, there's no need for Broker to provide + this version. + + * Update 3dparty submodule (Johanna Amann, Corelight) + 7.1.0-dev.618 | 2024-11-26 17:17:00 +0100 * GH-4052: More robust memory management for ZAM execution - fixes #4052 (Vern Paxson, Corelight) diff --git a/VERSION b/VERSION index ed7b075349..90674fa1c9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.1.0-dev.618 +7.1.0-dev.629 diff --git a/ci/test.sh b/ci/test.sh index 1d5da9e2a7..ff3935579a 100755 --- a/ci/test.sh +++ b/ci/test.sh @@ -48,7 +48,7 @@ function run_unit_tests { banner "Running unit tests" pushd build - (. ./zeek-path-dev.sh && zeek --test --no-skip) || result=1 + (. ./zeek-path-dev.sh && TZ=UTC zeek --test --no-skip) || result=1 popd return 0 } diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index d27a9160a9..caf2e6a11d 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -281,7 +281,30 @@ export { ## a given cluster node. global nodeid_topic: function(id: string): string; + ## Initialize the cluster backend. + ## + ## Cluster backends usually invoke this from a :zeek:see:`zeek_init` handler. + ## + ## Returns: T on success, else F. + global init: function(): bool; + + ## Subscribe to the given topic. + ## + ## topic: The topic to subscribe to. + ## + ## Returns: T on success, else F. + global subscribe: function(topic: string): bool; + + ## Unsubscribe from the given topic. + ## + ## topic: The topic to unsubscribe from. + ## + ## Returns: T on success, else F. + global unsubscribe: function(topic: string): bool; + ## An event instance for cluster pub/sub. + ## + ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. type Event: record { ## The event handler to be invoked on the remote node. ev: any; @@ -290,6 +313,10 @@ export { }; } +# Needs declaration of Cluster::Event type. +@load base/bif/cluster.bif + + # Track active nodes per type. global active_node_ids: table[NodeType] of set[string]; @@ -528,3 +555,18 @@ function log(msg: string) { Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]); } + +function init(): bool + { + return Cluster::Backend::__init(); + } + +function subscribe(topic: string): bool + { + return Cluster::__subscribe(topic); + } + +function unsubscribe(topic: string): bool + { + return Cluster::__unsubscribe(topic); + } diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index ba3010129c..453658067f 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -69,7 +69,7 @@ event zeek_init() &priority=-10 local pool = registered_pools[i]; if ( node in pool$nodes ) - Broker::subscribe(pool$spec$topic); + Cluster::subscribe(pool$spec$topic); } switch ( self$node_type ) { @@ -78,29 +78,47 @@ event zeek_init() &priority=-10 case CONTROL: break; case LOGGER: - Broker::subscribe(Cluster::logger_topic); - Broker::subscribe(Broker::default_log_topic_prefix); + Cluster::subscribe(Cluster::logger_topic); break; case MANAGER: - Broker::subscribe(Cluster::manager_topic); - - if ( Cluster::manager_is_logger ) - Broker::subscribe(Broker::default_log_topic_prefix); - + Cluster::subscribe(Cluster::manager_topic); break; case PROXY: - Broker::subscribe(Cluster::proxy_topic); + Cluster::subscribe(Cluster::proxy_topic); break; case WORKER: - Broker::subscribe(Cluster::worker_topic); + Cluster::subscribe(Cluster::worker_topic); break; default: Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); return; } - Broker::subscribe(nodeid_topic(Broker::node_id())); - Broker::subscribe(node_topic(node)); + Cluster::subscribe(nodeid_topic(Broker::node_id())); + Cluster::subscribe(node_topic(node)); + + + # Listening and connecting to other peers is broker specific, + # short circuit if Zeek is configured with a different + # cluster backend. + # + # In the future, this could move into a policy script, but + # for the time being it's easier for backwards compatibility + # to keep this here. + if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) + return; + + # Logging setup: Anything handling logging additionally subscribes + # to Broker::default_log_topic_prefix. + switch ( self$node_type ) { + case LOGGER: + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + case MANAGER: + if ( Cluster::manager_is_logger ) + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + } if ( self$p != 0/unknown ) { diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 9ee646300e..55cac66883 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -5753,6 +5753,9 @@ export { module Cluster; export { type Cluster::Pool: record {}; + + ## Cluster backend to use. Default is the broker backend. + const backend = Cluster::CLUSTER_BACKEND_BROKER &redef; } module Weird; diff --git a/src/Reporter.cc b/src/Reporter.cc index 6c8b1981e9..0368f72478 100644 --- a/src/Reporter.cc +++ b/src/Reporter.cc @@ -14,6 +14,7 @@ #include "zeek/Event.h" #include "zeek/EventHandler.h" #include "zeek/Expr.h" +#include "zeek/Frame.h" #include "zeek/ID.h" #include "zeek/NetVar.h" #include "zeek/RunState.h" @@ -649,4 +650,10 @@ void Reporter::DoLog(const char* prefix, EventHandlerPtr event, FILE* out, Conne bool Reporter::EmitToStderr(bool flag) { return flag || ! run_state::detail::zeek_init_done; } +ScriptLocationScope::ScriptLocationScope(const zeek::detail::Frame* frame) { + zeek::reporter->PushLocation(frame->GetCallLocation()); +} + +ScriptLocationScope::~ScriptLocationScope() { zeek::reporter->PopLocation(); } + } // namespace zeek diff --git a/src/Reporter.h b/src/Reporter.h index d67a68c5a8..9c03920dce 100644 --- a/src/Reporter.h +++ b/src/Reporter.h @@ -31,8 +31,9 @@ using StringValPtr = IntrusivePtr; namespace detail { class AssertStmt; -class Location; class Expr; +class Frame; +class Location; } // namespace detail @@ -334,6 +335,20 @@ private: bool ignore_deprecations; }; +/** + * Helper class pushing the frame's call location onto + * the reporter's location stack, associating the caller's + * location with subsequent output. + * + * The frame's location must remain valid until the object + * is destroyed. + */ +class ScriptLocationScope { +public: + ScriptLocationScope(const zeek::detail::Frame* frame); + ~ScriptLocationScope(); +}; + extern Reporter* reporter; } // namespace zeek diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 7b6178187b..1b0ba2cdaf 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -28,6 +28,7 @@ #include "zeek/broker/store.bif.h" #include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" +#include "zeek/logging/Types.h" #include "zeek/telemetry/Manager.h" #include "zeek/util.h" @@ -252,7 +253,7 @@ std::string RenderEvent(const std::string& topic, const std::string& name, const } // namespace #endif -Manager::Manager(bool arg_use_real_time) { +Manager::Manager(bool arg_use_real_time) : Backend(nullptr, nullptr) { bound_port = 0; use_real_time = arg_use_real_time; peer_count = 0; @@ -262,7 +263,7 @@ Manager::Manager(bool arg_use_real_time) { writer_id_type = nullptr; } -void Manager::InitPostScript() { +void Manager::DoInitPostScript() { DBG_LOG(DBG_BROKER, "Initializing"); log_batch_size = get_option("Broker::log_batch_size")->AsCount(); @@ -404,7 +405,7 @@ void Manager::InitializeBrokerStoreForwarding() { } } -void Manager::Terminate() { +void Manager::DoTerminate() { FlushLogBuffers(); iosource_mgr->UnregisterFd(bstate->subscriber.fd(), this); @@ -545,6 +546,22 @@ std::vector Manager::Peers() const { std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id()); } +bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) { + broker::vector xs; + xs.reserve(event.args.size()); + for ( const auto& a : event.args ) { + auto r = detail::val_to_data(a.get()); + if ( ! r ) { + Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str()); + return false; + } + xs.emplace_back(std::move(r.value())); + } + + std::string name(event.HandlerName()); + return PublishEvent(topic, name, std::move(xs), event.timestamp); +} + bool Manager::PublishEvent(string topic, std::string name, broker::vector args, double ts) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -862,10 +879,6 @@ RecordVal* Manager::MakeEvent(ValPList* args, zeek::detail::Frame* frame) { zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args, zeek::detail::Frame* frame) { scoped_reporter_location srl{frame}; - return MakeEvent(args); -} - -zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) { auto rval = zeek::make_intrusive(BifType::Record::Broker::Event); auto arg_vec = make_intrusive(vector_of_data_type); rval->Assign(1, arg_vec); @@ -877,7 +890,8 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) { // Event val must come first. if ( arg_val->GetType()->Tag() != TYPE_FUNC ) { - Error("attempt to convert non-event into an event type"); + Error("attempt to convert non-event into an event type (%s)", + zeek::obj_desc_short(arg_val.get()).c_str()); return rval; } @@ -891,7 +905,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) { auto num_args = static_cast(func->GetType()->Params()->NumFields()); if ( num_args != args.size() - 1 ) { - Error("bad # of arguments: got %zu, expect %zu", args.size() - 1, num_args + 1); + Error("bad # of arguments: got %zu, expect %zu", args.size() - 1, num_args); return rval; } @@ -933,7 +947,7 @@ zeek::RecordValPtr Manager::MakeEvent(ArgsSpan args) { return rval; } -bool Manager::Subscribe(const string& topic_prefix) { +bool Manager::DoSubscribe(const string& topic_prefix) { DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str()); bstate->subscriber.add_topic(topic_prefix, ! run_state::detail::zeek_init_done); @@ -951,7 +965,7 @@ bool Manager::Forward(string topic_prefix) { return true; } -bool Manager::Unsubscribe(const string& topic_prefix) { +bool Manager::DoUnsubscribe(const string& topic_prefix) { for ( size_t i = 0; i < forwarded_prefixes.size(); ++i ) if ( forwarded_prefixes[i] == topic_prefix ) { DBG_LOG(DBG_BROKER, "Unforwarding topic prefix %s", topic_prefix.c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 297d7a5f05..a27e61ec14 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -10,13 +10,16 @@ #include #include #include +#include #include #include #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" #include "zeek/broker/Data.h" +#include "zeek/cluster/Backend.h" #include "zeek/iosource/IOSource.h" +#include "zeek/logging/Types.h" #include "zeek/logging/WriterBackend.h" namespace zeek { @@ -75,7 +78,7 @@ struct Stats { * Manages various forms of communication between peer Zeek processes * or other external applications via use of the Broker messaging library. */ -class Manager : public iosource::IOSource { +class Manager : public zeek::cluster::Backend, public iosource::IOSource { public: /** Broker protocol to expect on a listening port. */ enum class BrokerProtocol { @@ -95,17 +98,6 @@ public: */ ~Manager() override = default; - /** - * Initialization of the manager. This is called late during Zeek's - * initialization after any scripts are processed. - */ - void InitPostScript(); - - /** - * Shuts Broker down at termination. - */ - void Terminate(); - /** * Returns true if any Broker communication is currently active. */ @@ -193,6 +185,8 @@ public: return PublishEvent(std::move(topic), std::move(name), std::move(broker::get(args.value_)), ts); } + using cluster::Backend::PublishEvent; + /** * Send an event to any interested peers. * @param topic a topic string associated with the message. @@ -268,14 +262,6 @@ public: using ArgsSpan = Span; - /** - * Create an `Event` record value from an event and its arguments. - * @param args A span pointing at the event arguments. - * @return an `Event` record value. If an invalid event or arguments - * were supplied the optional "name" field will not be set. - */ - zeek::RecordValPtr MakeEvent(ArgsSpan args); - /** * Create an `Event` record value from an event and its arguments. * @param args A span pointing at the event arguments. @@ -285,15 +271,6 @@ public: */ zeek::RecordValPtr MakeEvent(ArgsSpan args, zeek::detail::Frame* frame); - /** - * Register interest in peer event messages that use a certain topic prefix. - * @param topic_prefix a prefix to match against remote message topics. - * e.g. an empty prefix will match everything and "a" will match "alice" - * and "amy" but not "bob". - * @return true if it's a new event subscription and it is now registered. - */ - bool Subscribe(const std::string& topic_prefix); - /** * Register interest in peer event messages that use a certain topic prefix, * but that should not be raised locally, just forwarded to any subscribing @@ -305,14 +282,6 @@ public: */ bool Forward(std::string topic_prefix); - /** - * Unregister interest in peer event messages. - * @param topic_prefix a prefix previously supplied to a successful call - * to zeek::Broker::Manager::Subscribe() or zeek::Broker::Manager::Forward(). - * @return true if interest in topic prefix is no longer advertised. - */ - bool Unsubscribe(const std::string& topic_prefix); - /** * Create a new *master* data store. * @param name The name of the store. @@ -402,6 +371,48 @@ public: }; private: + // Register interest in peer event messages that use a certain topic prefix. + bool DoSubscribe(const std::string& topic_prefix) override; + + // Unregister interest in peer event messages. + bool DoUnsubscribe(const std::string& topic_prefix) override; + + // Initialization of the manager. This is called late during Zeek's + // initialization after any scripts are processed. + void DoInitPostScript() override; + + // Broker doesn't do anything during Broker::Backend::init(). + bool DoInit() override { return true; } + + // Shuts Broker down at termination. + void DoTerminate() override; + + // Broker overrides this to do its own serialization. + bool DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) override; + + // This should never be reached, broker itself doesn't call this and overrides + // the generic DoPublishEvent() method that would call this. + bool DoPublishEvent(const std::string& topic, const std::string& format, + const cluster::detail::byte_buffer& buf) override { + throw std::logic_error("not implemented"); + } + + // WriterFrontend instances are broker-aware and never call this + // method and instead call the existing PublishLogWrite() method. + // + // TODO: Move log buffering out of broker and implement. + bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, + zeek::Span records) override { + // Not implemented by broker. + throw std::logic_error("not implemented"); + } + + bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, + cluster::detail::byte_buffer& buf) override { + // Not implemented by broker. + throw std::logic_error("not implemented"); + } + // Process events used for Broker store backed zeek tables void ProcessStoreEvent(broker::data msg); // Common functionality for processing insert and update events. diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc new file mode 100644 index 0000000000..1ed4b49760 --- /dev/null +++ b/src/cluster/BifSupport.cc @@ -0,0 +1,137 @@ +#include "zeek/cluster/BifSupport.h" + +#include "zeek/Desc.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" +#include "zeek/Frame.h" +#include "zeek/Func.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Reporter.h" +#include "zeek/Type.h" +#include "zeek/Val.h" +#include "zeek/broker/Manager.h" // For publishing to broker_mgr directly. +#include "zeek/cluster/Backend.h" + +namespace { + +// Convert a script-level Cluster::Event to a cluster::detail::Event. +std::optional to_cluster_event(const zeek::RecordValPtr& rec) { + const auto& func = rec->GetField(0); + const auto& vargs = rec->GetField(1); + + const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName()); + if ( ! eh ) { + zeek::emit_builtin_error( + zeek::util::fmt("event registry lookup of '%s' failed", zeek::obj_desc_short(func.get()).c_str())); + return std::nullopt; + } + + // Need to copy from VectorVal to zeek::Args + zeek::Args args(vargs->Size()); + for ( size_t i = 0; i < vargs->Size(); i++ ) + args[i] = vargs->ValAt(i); + + // TODO: Support configurable timestamps or custom metadata on the record. + auto timestamp = zeek::event_mgr.CurrentEventTime(); + + return zeek::cluster::detail::Event(eh, std::move(args), timestamp); +} +} // namespace + + +namespace zeek::cluster::detail::bif { + +zeek::RecordValPtr make_event(zeek::ArgsSpan args) { + static const auto& any_vec_type = zeek::id::find_type("any_vec"); + static const auto& event_record_type = zeek::id::find_type("Cluster::Event"); + auto rec = zeek::make_intrusive(event_record_type); + + if ( args.empty() ) { + zeek::emit_builtin_error("not enough arguments"); + return rec; + } + + const auto& maybe_func_val = args[0]; + + if ( maybe_func_val->GetType()->Tag() != zeek::TYPE_FUNC ) { + zeek::emit_builtin_error( + zeek::util::fmt("got non-event type '%s'", zeek::obj_desc_short(maybe_func_val->GetType().get()).c_str())); + return rec; + } + + const auto func = zeek::FuncValPtr{zeek::NewRef{}, maybe_func_val->AsFuncVal()}; + auto checked_args = cluster::detail::check_args(func, args.subspan(1)); + if ( ! checked_args ) + return rec; + + // Making a copy from zeek::Args to a VectorVal and then back again on publish. + auto vec = zeek::make_intrusive(any_vec_type); + vec->Reserve(checked_args->size()); + rec->Assign(0, maybe_func_val); + for ( const auto& arg : *checked_args ) + vec->Append(arg); + + rec->Assign(1, vec); // Args + + return rec; +} + +zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { + static const auto& cluster_event_type = zeek::id::find_type("Cluster::Event"); + static const auto& broker_event_type = zeek::id::find_type("Broker::Event"); + + if ( args.empty() ) { + zeek::emit_builtin_error("no event arguments given"); + return zeek::val_mgr->False(); + } + + if ( topic->GetType()->Tag() != zeek::TYPE_STRING ) { + zeek::emit_builtin_error("topic is not a string"); + return zeek::val_mgr->False(); + } + + const auto topic_str = topic->AsStringVal()->ToStdString(); + + auto timestamp = zeek::event_mgr.CurrentEventTime(); + + if ( args[0]->GetType()->Tag() == zeek::TYPE_FUNC ) { + auto event = zeek::cluster::backend->MakeClusterEvent({zeek::NewRef{}, args[0]->AsFuncVal()}, args.subspan(1), + timestamp); + if ( event ) + return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *event)); + + return zeek::val_mgr->False(); + } + else if ( args[0]->GetType()->Tag() == zeek::TYPE_RECORD ) { + if ( args[0]->GetType() == cluster_event_type ) { // Handling Cluster::Event record type + auto ev = to_cluster_event(cast_intrusive(args[0])); + if ( ! ev ) + return zeek::val_mgr->False(); + + return zeek::val_mgr->Bool(zeek::cluster::backend->PublishEvent(topic_str, *ev)); + } + else if ( args[0]->GetType() == broker_event_type ) { + // Handling Broker::Event record type created by Broker::make_event() + // only works if the backend is broker_mgr! + if ( zeek::cluster::backend != zeek::broker_mgr ) { + zeek::emit_builtin_error( + zeek::util::fmt("Publish of Broker::Event record instance with type '%s' to a non-Broker backend", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + + return zeek::val_mgr->False(); + } + + return zeek::val_mgr->Bool(zeek::broker_mgr->PublishEvent(topic_str, args[0]->AsRecordVal())); + } + else { + zeek::emit_builtin_error(zeek::util::fmt("Publish of unknown record type '%s'", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + return zeek::val_mgr->False(); + } + } + + zeek::emit_builtin_error(zeek::util::fmt("expected function or record as first argument, got %s", + zeek::obj_desc_short(args[0]->GetType().get()).c_str())); + return zeek::val_mgr->False(); +} +} // namespace zeek::cluster::detail::bif diff --git a/src/cluster/BifSupport.h b/src/cluster/BifSupport.h new file mode 100644 index 0000000000..2434482796 --- /dev/null +++ b/src/cluster/BifSupport.h @@ -0,0 +1,49 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/IntrusivePtr.h" +#include "zeek/Span.h" + +// Helpers for cluster.bif + +namespace zeek { + +namespace detail { +class Frame; +} + +class RecordVal; +using RecordValPtr = IntrusivePtr; + +class Val; +using ValPtr = IntrusivePtr; +using ArgsSpan = Span; + +namespace cluster::detail::bif { + +/** + * Cluster::make_event() implementation. + * + * @param topic The topic to publish to. Should be a StringVal. + * @param args The arguments to the BiF function. May either be a prepared event from make_event(), + * or a FuncValPtr and it's arguments + * + * @return A RecordValPtr representing a Cluster::Event record instance. + */ +zeek::RecordValPtr make_event(zeek::ArgsSpan args); + +/** + * Publish helper. + * + * @param topic The topic to publish to. Should be a StringVal. + * @param args The arguments to the BiF function. May either be a prepared event from make_event(), + * or a FuncValPtr and it's arguments + * + * @return A BoolValPtr that's true if the event was published, else false. + */ +zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args); + +} // namespace cluster::detail::bif + +} // namespace zeek diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index 0395d28786..14a063943d 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -6,4 +6,9 @@ zeek_add_subdir_library( SOURCES Component.cc Backend.cc - Manager.cc) + BifSupport.cc + Manager.cc + BIFS + cluster.bif) + +add_subdirectory(serializer) diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif new file mode 100644 index 0000000000..cdbe5edf9d --- /dev/null +++ b/src/cluster/cluster.bif @@ -0,0 +1,71 @@ +%%{ +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/BifSupport.h" + +using namespace zeek::cluster::detail::bif; + +%%} + +module Cluster; + +type Cluster::Event: record; + +## Publishes an event to a given topic. +## +## topic: a topic associated with the event message. +## +## args: Either the event arguments as already made by +## :zeek:see:`Cluster::make_event` or the argument list to pass along +## to it. +## +## Returns: true if the message is sent. +function Cluster::publish%(topic: string, ...%): bool + %{ + ScriptLocationScope scope{frame}; + + auto args = zeek::ArgsSpan{*@ARGS@}.subspan(1); + return publish_event({zeek::NewRef{}, topic}, args); + %} + +## Create a data structure that may be used to send a remote event via +## :zeek:see:`Broker::publish`. +## +## args: an event, followed by a list of argument values that may be used +## to call it. +## +## Returns: A :zeek:type:`Cluster::Event` instance that can be published via +## :zeek:see:`Cluster::publish`, :zeek:see:`Cluster::publish_rr` +## or :zeek:see:`Cluster::publish_hrw`. +function Cluster::make_event%(...%): Cluster::Event + %{ + ScriptLocationScope scope{frame}; + + return make_event(zeek::ArgsSpan{*@ARGS@}); + %} + +function Cluster::__subscribe%(topic_prefix: string%): bool + %{ + ScriptLocationScope scope{frame}; + + auto rval = zeek::cluster::backend->Subscribe(topic_prefix->CheckString()); + return zeek::val_mgr->Bool(rval); + %} + +function Cluster::__unsubscribe%(topic_prefix: string%): bool + %{ + ScriptLocationScope scope{frame}; + + auto rval = zeek::cluster::backend->Unsubscribe(topic_prefix->CheckString()); + return zeek::val_mgr->Bool(rval); + %} + +## Initialize the global cluster backend. +## +## Returns: true on success. +function Cluster::Backend::__init%(%): bool + %{ + auto rval = zeek::cluster::backend->Init(); + return zeek::val_mgr->Bool(rval); + %} diff --git a/src/cluster/serializer/CMakeLists.txt b/src/cluster/serializer/CMakeLists.txt new file mode 100644 index 0000000000..4f3a423cd9 --- /dev/null +++ b/src/cluster/serializer/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(broker) diff --git a/src/cluster/serializer/broker/CMakeLists.txt b/src/cluster/serializer/broker/CMakeLists.txt new file mode 100644 index 0000000000..4854cbb333 --- /dev/null +++ b/src/cluster/serializer/broker/CMakeLists.txt @@ -0,0 +1,9 @@ +zeek_add_plugin( + Zeek + Cluster_Serializer_Binary_Serialization_Format + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + SOURCES + Plugin.cc + Serializer.cc) diff --git a/src/cluster/serializer/broker/Plugin.cc b/src/cluster/serializer/broker/Plugin.cc new file mode 100644 index 0000000000..aa9ab83d79 --- /dev/null +++ b/src/cluster/serializer/broker/Plugin.cc @@ -0,0 +1,24 @@ +#include "Plugin.h" + +#include + +#include "zeek/cluster/Component.h" + +#include "Serializer.h" + +using namespace zeek::cluster; +using namespace zeek::plugin::Broker_Serializer; + +zeek::plugin::Configuration Plugin::Configure() { + AddComponent(new EventSerializerComponent("BROKER_BIN_V1", []() -> std::unique_ptr { + return std::make_unique(); + })); + AddComponent(new EventSerializerComponent("BROKER_JSON_V1", []() -> std::unique_ptr { + return std::make_unique(); + })); + + zeek::plugin::Configuration config; + config.name = "Zeek::Broker_Serializer"; + config.description = "Event serialization using Broker event formats (binary and json)"; + return config; +} diff --git a/src/cluster/serializer/broker/Plugin.h b/src/cluster/serializer/broker/Plugin.h new file mode 100644 index 0000000000..a14eb8a8ea --- /dev/null +++ b/src/cluster/serializer/broker/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Broker_Serializer { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +} plugin; + +} // namespace zeek::plugin::Broker_Serializer diff --git a/src/cluster/serializer/broker/README b/src/cluster/serializer/broker/README new file mode 100644 index 0000000000..b316631357 --- /dev/null +++ b/src/cluster/serializer/broker/README @@ -0,0 +1 @@ +Contains event serializers using Broker's BIN and JSON formats. diff --git a/src/cluster/serializer/broker/Serializer.cc b/src/cluster/serializer/broker/Serializer.cc new file mode 100644 index 0000000000..068dd20767 --- /dev/null +++ b/src/cluster/serializer/broker/Serializer.cc @@ -0,0 +1,219 @@ +#include "zeek/cluster/serializer/broker/Serializer.h" + +#include + +#include "zeek/Desc.h" +#include "zeek/Func.h" +#include "zeek/Reporter.h" +#include "zeek/broker/Data.h" +#include "zeek/cluster/Backend.h" + +#include "broker/data_envelope.hh" +#include "broker/error.hh" +#include "broker/format/json.hh" +#include "broker/zeek.hh" + +#include "zeek/3rdparty/doctest.h" + +using namespace zeek::cluster; + +namespace { + +/** + * Convert a cluster::detail::Event to a broker::zeek::Event. + * + * @param ev The cluster::detail::Event + * @return A broker::zeek::Event to be serialized, or nullopt in case of errors. + */ +std::optional to_broker_event(const detail::Event& ev) { + broker::vector xs; + xs.reserve(ev.args.size()); + for ( const auto& a : ev.args ) { + if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) { + xs.emplace_back(std::move(res.value())); + } + else { + return std::nullopt; + } + } + + return broker::zeek::Event(ev.HandlerName(), xs, broker::to_timestamp(ev.timestamp)); +} + +/** + * Convert a broker::zeek::Event to cluster::detail::Event by looking + * it up in Zeek's event handler registry and converting event arguments + * to the appropriate Val instances. + * + * @param broker_ev The broker side event. + * @returns A zeek::cluster::detail::Event instance, or std::nullopt if the conversion failed. + */ +std::optional to_zeek_event(const broker::zeek::Event& ev) { + auto&& name = ev.name(); + auto&& args = ev.args(); + + // Meh, technically need to convert ev.metadata() and + // expose it to script land as `table[count] of any` + // where consumers then know what to do with it. + // + // For now, handle the timestamp explicitly. + double ts; + if ( auto ev_ts = ev.ts() ) + broker::convert(*ev_ts, ts); + else + ts = zeek::run_state::network_time; + + zeek::Args vl; + zeek::EventHandlerPtr handler = zeek::event_registry->Lookup(name); + if ( ! handler ) { + zeek::reporter->Error("Failed to lookup handler for '%s'", std::string(name).c_str()); + return std::nullopt; + } + + const auto& arg_types = handler->GetFunc()->GetType()->ParamList()->GetTypes(); + + if ( arg_types.size() != args.size() ) { + std::string event_name(name); + zeek::reporter->Error("Unserialize error '%s' arg_types.size()=%zu and args.size()=%zu", event_name.c_str(), + arg_types.size(), args.size()); + + return std::nullopt; + } + + for ( size_t i = 0; i < args.size(); ++i ) { + const auto& expected_type = arg_types[i]; + auto arg = args[i].to_data(); + auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get()); + if ( val ) + vl.emplace_back(std::move(val)); + else { + std::string event_name(name); + auto got_type = args[i].get_type_name(); + std::string argstr = broker::to_string(arg); + zeek::reporter + ->Error("Unserialize error for event '%s': broker value '%s' type '%s' to Zeek type '%s' failed", + event_name.c_str(), argstr.c_str(), got_type, obj_desc(expected_type.get()).c_str()); + + return std::nullopt; + } + } + + return detail::Event{handler, std::move(vl), ts}; +} + +} // namespace + +bool detail::BrokerBinV1_Serializer::SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) { + auto ev = to_broker_event(event); + if ( ! ev ) + return false; + + // The produced broker::zeek::Event is already in bin::v1 format after + // constructing it, so we can take the raw bytes directly rather than + // going through encode() again. + // + // broker::format::bin::v1::encode(ev->move_data(), std::back_inserter(buf)); + assert(ev->raw()->shared_envelope() != nullptr); + auto [raw, size] = ev->raw().shared_envelope()->raw_bytes(); + buf.insert(buf.begin(), raw, raw + size); + return true; +} + +std::optional detail::BrokerBinV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { + auto r = broker::data_envelope::deserialize(broker::endpoint_id::nil(), broker::endpoint_id::nil(), 0, "", + buf.data(), buf.size()); + if ( ! r ) + return std::nullopt; + + broker::zeek::Event ev(*r); + return to_zeek_event(ev); +} + + +// Convert char to std::byte during push_back() so that +// we don't need to copy from std::vector to a +// std::vector when rendering JSON. +template +struct PushBackAdapter { + explicit PushBackAdapter(T& c) : container(&c) {} + using value_type = char; + + void push_back(char c) { container->push_back(static_cast(c)); } + + T* container; +}; + + +bool detail::BrokerJsonV1_Serializer::SerializeEvent(byte_buffer& buf, const detail::Event& event) { + auto ev = to_broker_event(event); + if ( ! ev ) + return false; + + auto push_back_adaptor = PushBackAdapter(buf); + broker::format::json::v1::encode(ev->move_data(), std::back_inserter(push_back_adaptor)); + return true; +} + +std::optional detail::BrokerJsonV1_Serializer::UnserializeEvent(detail::byte_buffer_span buf) { + broker::variant res; + auto err = + broker::format::json::v1::decode(std::string_view{reinterpret_cast(buf.data()), buf.size()}, res); + if ( err ) { + zeek::reporter->Error("Decode error for JSON payload: '%s'", + err.message() ? err.message()->c_str() : "unknown"); + return std::nullopt; + } + + broker::zeek::Event ev(std::move(res)); + return to_zeek_event(ev); +} + +TEST_SUITE_BEGIN("cluster serializer broker"); + +#include "zeek/EventRegistry.h" + +TEST_CASE("roundtrip") { + auto* handler = zeek::event_registry->Lookup("Supervisor::node_status"); + detail::Event e{handler, zeek::Args{zeek::make_intrusive("TEST"), zeek::val_mgr->Count(42)}}; + detail::byte_buffer buf; + + SUBCASE("json") { + detail::BrokerJsonV1_Serializer serializer; + std::string expected = + R"({"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"count","data":1},{"@data-type":"vector","data":[{"@data-type":"string","data":"Supervisor::node_status"},{"@data-type":"vector","data":[{"@data-type":"string","data":"TEST"},{"@data-type":"count","data":42}]},{"@data-type":"vector","data":[{"@data-type":"vector","data":[{"@data-type":"count","data":1},{"@data-type":"timestamp","data":"1970-01-01T00:00:00.000"}]}]}]}]})"; + + serializer.SerializeEvent(buf, e); + + CHECK_EQ(expected, std::string{reinterpret_cast(buf.data()), buf.size()}); + + auto result = serializer.UnserializeEvent(buf); + REQUIRE(result); + CHECK_EQ(result->Handler(), handler); + CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); + CHECK_EQ(result->args.size(), 2); + } + + SUBCASE("binary") { + detail::BrokerBinV1_Serializer serializer; + unsigned char expected_bytes[] = {0x0e, 0x03, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x0e, 0x03, 0x05, 0x17, 0x53, 0x75, + 0x70, 0x65, 0x72, 0x76, 0x69, 0x73, 0x6f, 0x72, 0x3a, 0x3a, 0x6e, 0x6f, 0x64, + 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x0e, 0x02, 0x05, 0x04, 0x54, + 0x45, 0x53, 0x54, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2a, 0x0e, + 0x01, 0x0e, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x09, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + std::byte* p = reinterpret_cast(&expected_bytes[0]); + detail::byte_buffer expected{p, p + sizeof(expected_bytes)}; + + serializer.SerializeEvent(buf, e); + + CHECK_EQ(expected, buf); + + auto result = serializer.UnserializeEvent(buf); + REQUIRE(result); + CHECK_EQ(result->Handler(), handler); + CHECK_EQ(result->HandlerName(), "Supervisor::node_status"); + CHECK_EQ(result->args.size(), 2); + } +} +TEST_SUITE_END(); diff --git a/src/cluster/serializer/broker/Serializer.h b/src/cluster/serializer/broker/Serializer.h new file mode 100644 index 0000000000..00973b90c1 --- /dev/null +++ b/src/cluster/serializer/broker/Serializer.h @@ -0,0 +1,31 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/cluster/Serializer.h" + +namespace zeek::cluster::detail { + +// Implementation of the EventSerializer using the existing broker::detail::val_to_data() +// and broker::format::bin::v1::encode(). +class BrokerBinV1_Serializer : public EventSerializer { +public: + BrokerBinV1_Serializer() : EventSerializer("broker-bin-v1") {} + + bool SerializeEvent(detail::byte_buffer& buf, const detail::Event& event) override; + + std::optional UnserializeEvent(detail::byte_buffer_span buf) override; +}; + +// Implementation of the EventSerializer that uses the existing broker::detail::val_to_data() +// and broker::format::json::v1::encode() +class BrokerJsonV1_Serializer : public EventSerializer { +public: + BrokerJsonV1_Serializer() : EventSerializer("broker-json-v1") {} + + bool SerializeEvent(zeek::cluster::detail::byte_buffer& buf, const detail::Event& event) override; + + std::optional UnserializeEvent(detail::byte_buffer_span buf) override; +}; + +} // namespace zeek::cluster::detail diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 293bc3febe..5beab093df 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -50,6 +50,7 @@ #include "zeek/analyzer/Manager.h" #include "zeek/binpac_zeek.h" #include "zeek/broker/Manager.h" +#include "zeek/cluster/Backend.h" #include "zeek/cluster/Manager.h" #include "zeek/file_analysis/Manager.h" #include "zeek/input.h" @@ -179,6 +180,7 @@ zeek::spicy::Manager* zeek::spicy_mgr = nullptr; #endif zeek::cluster::Manager* zeek::cluster::manager = nullptr; +zeek::cluster::Backend* zeek::cluster::backend = nullptr; std::vector zeek::detail::zeek_script_prefixes; zeek::detail::Stmt* zeek::detail::stmts = nullptr; @@ -376,7 +378,11 @@ static void terminate_zeek() { log_mgr->Terminate(); input_mgr->Terminate(); thread_mgr->Terminate(); + broker_mgr->Terminate(); + if ( cluster::backend != broker_mgr ) + cluster::backend->Terminate(); + telemetry_mgr->Terminate(); event_mgr.Drain(); @@ -814,7 +820,26 @@ SetupResult setup(int argc, char** argv, Options* zopts) { log_mgr->InitPostScript(); plugin_mgr->InitPostScript(); zeekygen_mgr->InitPostScript(); + + // If Cluster::backend is set to broker, just set zeek::cluster::backend + // to broker_mgr like it has always been. If it's an alternative + // implementation, instantiate it. + const auto& cluster_backend_val = id::find_val("Cluster::backend"); + const auto& cluster_backend_type = zeek::id::find_type("Cluster::BackendTag"); + zeek_int_t broker_enum = cluster_backend_type->Lookup("Cluster::CLUSTER_BACKEND_BROKER"); + if ( broker_enum == cluster_backend_val->AsEnum() ) { + cluster::backend = broker_mgr; + } + else { + reporter->Error("Unsupported cluster backend configured: %s", + zeek::obj_desc_short(cluster_backend_val.get()).c_str()); + exit(1); + } + broker_mgr->InitPostScript(); + if ( cluster::backend != broker_mgr ) + cluster::backend->InitPostScript(); + timer_mgr->InitPostScript(); event_mgr.InitPostScript(); diff --git a/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out b/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out new file mode 100644 index 0000000000..fcd1b41ad2 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.cluster-publish/recv.recv.out @@ -0,0 +1,12 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +receiver added peer: endpoint=127.0.0.1 msg=handshake successful +is_remote should be T, and is, T +receiver got ping: my-message, 1 +is_remote should be T, and is, T +receiver got ping: my-message, 2 +is_remote should be T, and is, T +receiver got ping: my-message, 3 +is_remote should be T, and is, T +receiver got ping: my-message, 4 +is_remote should be T, and is, T +receiver got ping: my-message, 5 diff --git a/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out b/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out new file mode 100644 index 0000000000..de80e44f07 --- /dev/null +++ b/testing/btest/Baseline/cluster.broker.cluster-publish/send.send.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +is_remote should be T, and is, T +sender got pong: my-message, 1 +is_remote should be T, and is, T +sender got pong: my-message, 2 +is_remote should be T, and is, T +sender got pong: my-message, 3 +is_remote should be T, and is, T +sender got pong: my-message, 4 +sender lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/cluster.generic.errors/.stderr b/testing/btest/Baseline/cluster.generic.errors/.stderr new file mode 100644 index 0000000000..f26dbca5a0 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.errors/.stderr @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/errors.zeek, line 20: no event arguments given (Cluster::publish(topic)) +error in <...>/errors.zeek, line 21: not enough arguments (Cluster::make_event()) +error in <...>/errors.zeek, line 24: bad number of arguments for ping1: got 0, expect 2 +error in <...>/errors.zeek, line 25: bad number of arguments for ping1: got 0, expect 2 +error in <...>/errors.zeek, line 28: bad number of arguments for ping1: got 1, expect 2 +error in <...>/errors.zeek, line 29: bad number of arguments for ping1: got 1, expect 2 +error in <...>/errors.zeek, line 32: bad number of arguments for ping1: got 3, expect 2 +error in <...>/errors.zeek, line 33: bad number of arguments for ping1: got 3, expect 2 +error in <...>/errors.zeek, line 41: event parameter #2 type mismatch, got count, expecting string +error in <...>/errors.zeek, line 42: event parameter #2 type mismatch, got count, expecting string +error in <...>/errors.zeek, line 45: unexpected function type for hook1: hook +error in <...>/errors.zeek, line 46: unexpected function type for hook1: hook +error in <...>/errors.zeek, line 49: expected function or record as first argument, got count (Cluster::publish(topic, 1)) +error in <...>/errors.zeek, line 50: got non-event type 'count' (Cluster::make_event(1)) diff --git a/testing/btest/Baseline/cluster.generic.errors/.stdout b/testing/btest/Baseline/cluster.generic.errors/.stdout new file mode 100644 index 0000000000..ee431b2959 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.errors/.stdout @@ -0,0 +1,10 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +wrong number of args +r1, [ev=, args=[]] +r2, [ev=, args=[]] +r3, [ev=, args=[]] +r4, [ev=, args=[]] +wrong types +r1, [ev=, args=[]] +r2, [ev=, args=[]] +r3, [ev=, args=[]] diff --git a/testing/btest/Baseline/cluster.generic.make_event/.stderr b/testing/btest/Baseline/cluster.generic.make_event/.stderr new file mode 100644 index 0000000000..e5dfcdb9f7 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.make_event/.stderr @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/make_event.zeek, line 32: not enough arguments (Cluster::make_event()) +error in <...>/make_event.zeek, line 37: got non-event type 'string' (Cluster::make_event(a)) +error in <...>/make_event.zeek, line 42: unexpected function type for test_fun: function +error in <...>/make_event.zeek, line 47: unexpected function type for test_hook: hook +error in <...>/make_event.zeek, line 52: bad number of arguments for test_event2: got 0, expect 1 +error in <...>/make_event.zeek, line 57: bad number of arguments for test_event2: got 2, expect 1 diff --git a/testing/btest/Baseline/cluster.generic.make_event/out b/testing/btest/Baseline/cluster.generic.make_event/out new file mode 100644 index 0000000000..1f379ec328 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.make_event/out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +event(), [] +event(s:string), [abc] diff --git a/testing/btest/Baseline/cluster.serializer-enum/out b/testing/btest/Baseline/cluster.serializer-enum/out new file mode 100644 index 0000000000..adf5b130cd --- /dev/null +++ b/testing/btest/Baseline/cluster.serializer-enum/out @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Zeek::Broker_Serializer - Event serialization using Broker event formats (binary and json) (built-in) + [Event Serializer] BROKER_BIN_V1 (Cluster::EVENT_SERIALIZER_BROKER_BIN_V1) + [Event Serializer] BROKER_JSON_V1 (Cluster::EVENT_SERIALIZER_BROKER_JSON_V1) + +Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, Cluster::EventSerializerTag +Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, Cluster::EventSerializerTag diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 38dbbc40f4..b64871a39f 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 829f16e0e9..99cb5d53a3 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -135,6 +135,7 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek scripts/base/frameworks/config/__load__.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 884e2bf4ca..9c4ee0795b 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -462,6 +462,7 @@ 0.000000 MetaHookPost LoadFile(0, ./api, <...>/api.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) -> -1 @@ -540,6 +541,7 @@ 0.000000 MetaHookPost LoadFile(0, base<...>/ayiya, <...>/ayiya) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/broker, <...>/broker) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/cluster, <...>/cluster) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/config, <...>/config) -> -1 @@ -763,6 +765,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./api, <...>/api.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) -> (-1, ) @@ -841,6 +844,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/broker, <...>/broker) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster, <...>/cluster) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/config, <...>/config) -> (-1, ) @@ -1396,6 +1400,7 @@ 0.000000 MetaHookPre LoadFile(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFile(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./const.bif.zeek, <...>/const.bif.zeek) @@ -1474,6 +1479,7 @@ 0.000000 MetaHookPre LoadFile(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFile(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFile(0, base<...>/cluster, <...>/cluster) +0.000000 MetaHookPre LoadFile(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/config, <...>/config) @@ -1697,6 +1703,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./api, <...>/api.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./bloom-filter.bif.zeek, <...>/bloom-filter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./cardinality-counter.bif.zeek, <...>/cardinality-counter.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./cluster.bif.zeek, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./comm.bif.zeek, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./communityid.bif.zeek, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./const.bif.zeek, <...>/const.bif.zeek) @@ -1775,6 +1782,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ayiya, <...>/ayiya) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/broker, <...>/broker) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster, <...>/cluster) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/cluster.bif, <...>/cluster.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/comm.bif, <...>/comm.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/communityid.bif, <...>/communityid.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/config, <...>/config) @@ -2331,6 +2339,7 @@ 0.000000 | HookLoadFile ./audio <...>/audio.sig 0.000000 | HookLoadFile ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFile ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek +0.000000 | HookLoadFile ./cluster.bif.zeek <...>/cluster.bif.zeek 0.000000 | HookLoadFile ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFile ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFile ./const.bif.zeek <...>/const.bif.zeek @@ -2419,6 +2428,7 @@ 0.000000 | HookLoadFile base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFile base<...>/broker <...>/broker 0.000000 | HookLoadFile base<...>/cluster <...>/cluster +0.000000 | HookLoadFile base<...>/cluster.bif <...>/cluster.bif.zeek 0.000000 | HookLoadFile base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFile base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFile base<...>/config <...>/config @@ -2632,6 +2642,7 @@ 0.000000 | HookLoadFileExtended ./audio <...>/audio.sig 0.000000 | HookLoadFileExtended ./bloom-filter.bif.zeek <...>/bloom-filter.bif.zeek 0.000000 | HookLoadFileExtended ./cardinality-counter.bif.zeek <...>/cardinality-counter.bif.zeek +0.000000 | HookLoadFileExtended ./cluster.bif.zeek <...>/cluster.bif.zeek 0.000000 | HookLoadFileExtended ./comm.bif.zeek <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended ./communityid.bif.zeek <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended ./const.bif.zeek <...>/const.bif.zeek @@ -2720,6 +2731,7 @@ 0.000000 | HookLoadFileExtended base<...>/ayiya <...>/ayiya 0.000000 | HookLoadFileExtended base<...>/broker <...>/broker 0.000000 | HookLoadFileExtended base<...>/cluster <...>/cluster +0.000000 | HookLoadFileExtended base<...>/cluster.bif <...>/cluster.bif.zeek 0.000000 | HookLoadFileExtended base<...>/comm.bif <...>/comm.bif.zeek 0.000000 | HookLoadFileExtended base<...>/communityid.bif <...>/communityid.bif.zeek 0.000000 | HookLoadFileExtended base<...>/config <...>/config diff --git a/testing/btest/cluster/broker/cluster-publish.zeek b/testing/btest/cluster/broker/cluster-publish.zeek new file mode 100644 index 0000000000..b96c984e6a --- /dev/null +++ b/testing/btest/cluster/broker/cluster-publish.zeek @@ -0,0 +1,106 @@ +# @TEST-DOC: Use Cluster::subscribe() and Cluster::publish() with Broker +# @TEST-GROUP: cluster +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" +# @TEST-EXEC: btest-bg-run send "zeek -b ../send.zeek >send.out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE send.zeek + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; +redef exit_only_after_terminate = T; + +global event_count = 0; + +global ping: event(msg: string, c: count); + +event zeek_init() + { + Cluster::init(); + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + Cluster::subscribe("zeek/event/my_topic"); + } + +function send_event() + { + ++event_count; + local e = Cluster::make_event(ping, "my-message", event_count); + Cluster::publish("zeek/event/my_topic", e); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + send_event(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", + endpoint$network$address, msg); + Cluster::unsubscribe("zeek/event/my_topic"); + terminate(); + } + +event pong(msg: string, n: count) + { + print "is_remote should be T, and is", is_remote_event(); + print fmt("sender got pong: %s, %s", msg, n); + send_event(); + } + +@TEST-END-FILE + + +@TEST-START-FILE recv.zeek + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; +redef exit_only_after_terminate = T; + +const events_to_recv = 5; + +global handler: event(msg: string, c: count); +global auto_handler: event(msg: string, c: count); + +global pong: event(msg: string, c: count); + +event zeek_init() + { + Cluster::init(); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + Cluster::subscribe("zeek/event/my_topic"); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("receiver lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event ping(msg: string, n: count) + { + print "is_remote should be T, and is", is_remote_event(); + print fmt("receiver got ping: %s, %s", msg, n); + + if ( n == events_to_recv ) + { + Cluster::unsubscribe("zeek/event/my_topic"); + terminate(); + return; + } + + Cluster::publish("zeek/event/my_topic", pong, msg, n); + } +@TEST-END-FILE diff --git a/testing/btest/cluster/generic/errors.zeek b/testing/btest/cluster/generic/errors.zeek new file mode 100644 index 0000000000..b428af3e1d --- /dev/null +++ b/testing/btest/cluster/generic/errors.zeek @@ -0,0 +1,52 @@ +# @TEST-DOC: Test some validation errors of cluster bifs +# +# @TEST-EXEC: zeek --parse-only -b %INPUT +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout + +event ping1(c: count, how: string) &is_used + { + } + +hook hook1(c: count, how: string) &is_used + { + } + +event zeek_init() &priority=-1 + { + print "wrong number of args"; + + Cluster::publish("topic"); + local r1 = Cluster::make_event(); + print "r1", r1; + + Cluster::publish("topic", ping1); + local r2 = Cluster::make_event(ping1); + print "r2", r2; + + Cluster::publish("topic", ping1, 1); + local r3 = Cluster::make_event(ping1, 1); + print "r3", r3; + + Cluster::publish("topic", ping1, 1, "args", 1.2.3.4); + local r4 = Cluster::make_event(ping1, 1, "event", 1.2.3.4); + print "r4", r4; + } + +event zeek_init() &priority=-2 + { + print "wrong types"; + + Cluster::publish("topic", ping1, 1, 2); + local r1 = Cluster::make_event(ping1, 1, 2); + print "r1", r1; + + Cluster::publish("topic", hook1, 1, "hook"); + local r2 = Cluster::make_event(hook1, 1, "hook"); + print "r2", r2; + + Cluster::publish("topic", 1); + local r3 = Cluster::make_event(1); + print "r3", r2; + } diff --git a/testing/btest/cluster/generic/make_event.zeek b/testing/btest/cluster/generic/make_event.zeek new file mode 100644 index 0000000000..fd551b1263 --- /dev/null +++ b/testing/btest/cluster/generic/make_event.zeek @@ -0,0 +1,58 @@ +# @TEST-DOC: Test make_event behavior. +# +# @TEST-EXEC: zeek -b %INPUT >out +# +# @TEST-EXEC: btest-diff out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr + +function test_fun() { } +hook test_hook() { } +event test_event() { } +event test_event2(s: string) { } + +function as_cluster_event(e: any): Cluster::Event + { + assert e is Cluster::Event; + return e as Cluster::Event; + } + +event zeek_init() &priority=10 + { + local e1 = Cluster::make_event(test_event); + local ce1 = as_cluster_event(e1); + print type_name(ce1$ev), ce1$args; + + local e2 = Cluster::make_event(test_event2, "abc"); + local ce2 = as_cluster_event(e2); + print type_name(ce2$ev), ce2$args; + } + +event zeek_init() &priority=-10 + { + local e = Cluster::make_event(); + } + +event zeek_init() &priority=-11 + { + local e = Cluster::make_event("a"); + } + +event zeek_init() &priority=-12 + { + local e = Cluster::make_event(test_fun); + } + +event zeek_init() &priority=-13 + { + local e = Cluster::make_event(test_hook); + } + +event zeek_init() &priority=-14 + { + local e = Cluster::make_event(test_event2); + } + +event zeek_init() &priority=-15 + { + local e = Cluster::make_event(test_event2, "a", "b"); + } diff --git a/testing/btest/cluster/serializer-enum.zeek b/testing/btest/cluster/serializer-enum.zeek new file mode 100644 index 0000000000..c619ef557f --- /dev/null +++ b/testing/btest/cluster/serializer-enum.zeek @@ -0,0 +1,11 @@ +# @TEST-DOC: Test cluster backend enum +# +# @TEST-EXEC: zeek -NN Zeek::Broker_Serializer >>out +# @TEST-EXEC: zeek -b %INPUT >>out +# @TEST-EXEC: btest-diff out + +event zeek_init() + { + print Cluster::EVENT_SERIALIZER_BROKER_BIN_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_BIN_V1); + print Cluster::EVENT_SERIALIZER_BROKER_JSON_V1, type_name(Cluster::EVENT_SERIALIZER_BROKER_JSON_V1); + }