From 0ad1c78baea9e21da36e9c1367727b1c7684913d Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 24 Sep 2025 13:29:04 +0200 Subject: [PATCH 01/12] btest/Files/zeromq/metrics: Add missing load for telemetry framework --- testing/btest/Files/zeromq/metrics.zeek | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/btest/Files/zeromq/metrics.zeek b/testing/btest/Files/zeromq/metrics.zeek index 0812e4659e..40c26d69f1 100644 --- a/testing/btest/Files/zeromq/metrics.zeek +++ b/testing/btest/Files/zeromq/metrics.zeek @@ -1,3 +1,5 @@ +@load base/frameworks/telemetry + module Cluster::Backend::ZeroMQ; export { From e3373d31f00cdaa3a20c5fdf66d9aac12a756a57 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 16:40:33 +0200 Subject: [PATCH 02/12] btest/broker: Remove get_broker_stats() print They seem mostly irrelevant for the tests and depending on what is logged, the num_logs_outgoing changes, so just stop including them in the baseline for now. --- testing/btest/Baseline/broker.remote_event/recv.recv.out | 1 - .../btest/Baseline/broker.remote_event_any/recv.recv.out | 1 - .../Baseline/broker.remote_event_ssl_auth/recv.recv.out | 1 - testing/btest/broker/remote_event.zeek | 8 -------- testing/btest/broker/remote_event_any.zeek | 8 -------- testing/btest/broker/remote_event_ssl_auth.zeek | 8 -------- 6 files changed, 27 deletions(-) diff --git a/testing/btest/Baseline/broker.remote_event/recv.recv.out b/testing/btest/Baseline/broker.remote_event/recv.recv.out index c13ab4ea41..fcd1b41ad2 100644 --- a/testing/btest/Baseline/broker.remote_event/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event/recv.recv.out @@ -10,4 +10,3 @@ is_remote should be T, and is, T receiver got ping: my-message, 4 is_remote should be T, and is, T receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/Baseline/broker.remote_event_any/recv.recv.out b/testing/btest/Baseline/broker.remote_event_any/recv.recv.out index c13ab4ea41..fcd1b41ad2 100644 --- a/testing/btest/Baseline/broker.remote_event_any/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event_any/recv.recv.out @@ -10,4 +10,3 @@ is_remote should be T, and is, T receiver got ping: my-message, 4 is_remote should be T, and is, T receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out b/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out index 7a0dc495f2..24ee373fe9 100644 --- a/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out @@ -5,4 +5,3 @@ receiver got ping: my-message, 2 receiver got ping: my-message, 3 receiver got ping: my-message, 4 receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/broker/remote_event.zeek b/testing/btest/broker/remote_event.zeek index de44ae87cd..823c9b06dc 100644 --- a/testing/btest/broker/remote_event.zeek +++ b/testing/btest/broker/remote_event.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -98,10 +96,4 @@ event ping(msg: string, n: count) local e = Broker::make_event(pong, msg, n); Broker::publish("zeek/event/my_topic", e); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_any.zeek b/testing/btest/broker/remote_event_any.zeek index 8bd37a36a5..c578cb7073 100644 --- a/testing/btest/broker/remote_event_any.zeek +++ b/testing/btest/broker/remote_event_any.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -106,10 +104,4 @@ event ping(msg: string, n: any) # internals should not wrap n into another Broker::Data record Broker::publish("zeek/event/my_topic", pong, msg, n); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_ssl_auth.zeek b/testing/btest/broker/remote_event_ssl_auth.zeek index cc4b052607..b8cd97a806 100644 --- a/testing/btest/broker/remote_event_ssl_auth.zeek +++ b/testing/btest/broker/remote_event_ssl_auth.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -235,10 +233,4 @@ event ping(msg: string, n: count) local e = Broker::make_event(pong, msg, n); Broker::publish("zeek/event/my_topic", e); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE From 8777c1522d4fdd346e31e3868db511b18b3efc52 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 24 Sep 2025 12:22:35 +0200 Subject: [PATCH 03/12] btest/broker/remote_event_ts_compat: Replace 15 second sleep with wait-for-file Just so the test runs faster when it can. --- testing/btest/broker/remote_event_ts_compat.zeek | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/testing/btest/broker/remote_event_ts_compat.zeek b/testing/btest/broker/remote_event_ts_compat.zeek index 36a00a74c9..ec71d07272 100644 --- a/testing/btest/broker/remote_event_ts_compat.zeek +++ b/testing/btest/broker/remote_event_ts_compat.zeek @@ -7,8 +7,7 @@ # @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check # # @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output" -# Leave room for Zeek to start up, which can be slow when using -O ZAM -# @TEST-EXEC: sleep 15 +# @TEST-EXEC: wait-for-file server/ready 20 # @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output" # # @TEST-EXEC: btest-bg-wait 45 @@ -29,6 +28,8 @@ event zeek_init() Broker::subscribe(getenv("TOPIC")); Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); set_network_time(double_to_time(42.0)); + + system("touch ready"); } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) From ba8305e1e644a55e05b98ae86f5e3819966a7f87 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 23 Sep 2025 15:46:15 +0200 Subject: [PATCH 04/12] cluster: Add None backend Mostly so we have the CLUSTER_BACKEND_NONE enum value available, but potentially useful for testing, too. The rough idea is that Zeek by default has Cluster::backend set to CLUSTER_BACKEND_NONE, and script packages in frameworks/cluster/backend redef it accordingly. --- src/cluster/backend/CMakeLists.txt | 2 + src/cluster/backend/none/CMakeLists.txt | 3 ++ src/cluster/backend/none/Plugin.cc | 57 +++++++++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 src/cluster/backend/none/CMakeLists.txt create mode 100644 src/cluster/backend/none/Plugin.cc diff --git a/src/cluster/backend/CMakeLists.txt b/src/cluster/backend/CMakeLists.txt index c5a6d1b4a2..c312395c16 100644 --- a/src/cluster/backend/CMakeLists.txt +++ b/src/cluster/backend/CMakeLists.txt @@ -17,3 +17,5 @@ if (ENABLE_CLUSTER_BACKEND_ZEROMQ) add_subdirectory(zeromq) endif () + +add_subdirectory(none) diff --git a/src/cluster/backend/none/CMakeLists.txt b/src/cluster/backend/none/CMakeLists.txt new file mode 100644 index 0000000000..60b3dff0cb --- /dev/null +++ b/src/cluster/backend/none/CMakeLists.txt @@ -0,0 +1,3 @@ +zeek_add_plugin( + Zeek Cluster_Backend_None + SOURCES Plugin.cc) diff --git a/src/cluster/backend/none/Plugin.cc b/src/cluster/backend/none/Plugin.cc new file mode 100644 index 0000000000..2ee9d1d051 --- /dev/null +++ b/src/cluster/backend/none/Plugin.cc @@ -0,0 +1,57 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/plugin/Plugin.h" + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" + +namespace { + +using namespace zeek::cluster; + +/** + * A backend that does nothing. + */ +class NoneBackend : public Backend { +private: + bool DoInit() override { return true; }; + void DoTerminate() override {}; + void DoInitPostScript() override {}; + bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, + zeek::byte_buffer& buf) override { + return true; + } + bool DoPublishEvent(const std::string& topic, const std::string& format, const zeek::byte_buffer& buf) override { + return true; + }; + bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override { return true; }; + bool DoUnsubscribe(const std::string& topic_prefix) override { return true; }; + +public: + NoneBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) + : Backend("None", std::move(es), std::move(ls), std::move(ehs)) {} + + static std::unique_ptr Instantiate(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) { + return std::make_unique(std::move(es), std::move(ls), std::move(ehs)); + } +}; +} // namespace + + +namespace zeek::plugin::Zeek_Cluster_None { + +class Plugin : public zeek::plugin::Plugin { + zeek::plugin::Configuration Configure() override { + AddComponent(new cluster::BackendComponent("None", NoneBackend::Instantiate)); + + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_Backend_None"; + config.description = "Cluster backend none"; + return config; + } +} plugin; + +} // namespace zeek::plugin::Zeek_Cluster_None From 2f15f5ce6a05f2714fba3c0d15efc7daf039e5c1 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 15:57:40 +0200 Subject: [PATCH 05/12] cluster: Introduce pubsub.zeek and types.zeek Allow access to the Cluster's subscribe(), unsubscribe(), publish(), publish_hrw() and publish_rr() methods by loading only the base/frameworks/cluster/pubsub, rather than everything that __load__.zeek or also main.zeek pulls in. This can be used by other scripts to use these functions without relying or expecting the rest of the cluster framework to be loaded already. Concretely, this is needed to move the Supervisor framework from Broker to Cluster. --- scripts/base/frameworks/cluster/__load__.zeek | 2 + scripts/base/frameworks/cluster/main.zeek | 165 +----------------- scripts/base/frameworks/cluster/pubsub.zeek | 49 ++++++ scripts/base/frameworks/cluster/types.zeek | 128 ++++++++++++++ .../canonified_loaded_scripts.log | 4 +- .../canonified_loaded_scripts.log | 4 +- testing/btest/Baseline/plugins.hooks/output | 6 + 7 files changed, 193 insertions(+), 165 deletions(-) create mode 100644 scripts/base/frameworks/cluster/pubsub.zeek create mode 100644 scripts/base/frameworks/cluster/types.zeek diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 2072b457a7..2ca60fb67a 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -1,7 +1,9 @@ # Load the core cluster support. @load ./main @load ./pools +@load ./pubsub @load ./telemetry +@load ./types @if ( Cluster::is_enabled() ) diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 7124cf23d8..8b86188996 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -18,6 +18,8 @@ @load base/frameworks/control @load base/frameworks/broker +@load ./types + module Cluster; export { @@ -75,19 +77,6 @@ export { ## :zeek:see:`Cluster::create_store` with the *persistent* argument set true. const default_persistent_backend = Broker::SQLITE &redef; - ## The default maximum queue size for WebSocket event dispatcher instances. - ## - ## If the maximum queue size is reached, events from external WebSocket - ## clients will be stalled and processed once the queue has been drained. - ## - ## An internal metric named ``cluster_onloop_queue_stalls`` and - ## labeled with a ``WebSocketEventDispatcher::`` tag - ## is incremented when the maximum queue size is reached. - const default_websocket_max_event_queue_size = 32 &redef; - - ## The default ping interval for WebSocket clients. - const default_websocket_ping_interval = 5 sec &redef; - ## Setting a default dir will, for persistent backends that have not ## been given an explicit file path via :zeek:see:`Cluster::stores`, ## automatically create a path within this dir that is based on the name of @@ -157,55 +146,6 @@ export { message: string; } &log; - ## Types of nodes that are allowed to participate in the cluster - ## configuration. - type NodeType: enum { - ## A dummy node type indicating the local node is not operating - ## within a cluster. - NONE, - ## A node type which is allowed to view/manipulate the configuration - ## of other nodes in the cluster. - CONTROL, - ## A node type responsible for log management. - LOGGER, - ## A node type responsible for policy management. - MANAGER, - ## A node type for relaying worker node communication and synchronizing - ## worker node state. - PROXY, - ## The node type doing all the actual traffic analysis. - WORKER, - }; - - ## Record type to indicate a node in a cluster. - type Node: record { - ## Identifies the type of cluster node in this node's configuration. - node_type: NodeType; - ## The IP address of the cluster node. - ip: addr; - ## If the *ip* field is a non-global IPv6 address, this field - ## can specify a particular :rfc:`4007` ``zone_id``. - zone_id: string &default=""; - ## The port that this node will listen on for peer connections. - ## A value of ``0/unknown`` means the node is not pre-configured to listen. - p: port &default=0/unknown; - ## Name of the manager node this node uses. For workers and proxies. - manager: string &optional; - ## A unique identifier assigned to the node by the broker framework. - ## This field is only set while a node is connected. - id: string &optional; - ## The port used to expose metrics to Prometheus. Setting this in a cluster - ## configuration will override the setting for Telemetry::metrics_port for - ## the node. - metrics_port: port &optional; - }; - - ## Record to represent a cluster node including its name. - type NamedNode: record { - name: string; - node: Node; - }; - ## This function can be called at any time to determine if the cluster ## framework is being enabled for this run. ## @@ -317,105 +257,14 @@ export { ## Returns: T on success, else F. global init: function(): bool; - ## Subscribe to the given topic. - ## - ## topic: The topic to subscribe to. - ## - ## Returns: T on success, else F. - global subscribe: function(topic: string): bool; - - ## Unsubscribe from the given topic. - ## - ## topic: The topic to unsubscribe from. - ## - ## Returns: T on success, else F. - global unsubscribe: function(topic: string): bool; - - ## An event instance for cluster pub/sub. - ## - ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. - type Event: record { - ## The event handler to be invoked on the remote node. - ev: any; - ## The arguments for the event. - args: vector of any; - }; - - ## The TLS options for a WebSocket server. - ## - ## If cert_file and key_file are set, TLS is enabled. If both - ## are unset, TLS is disabled. Any other combination is an error. - type WebSocketTLSOptions: record { - ## The cert file to use. - cert_file: string &optional; - ## The key file to use. - key_file: string &optional; - ## Expect peers to send client certificates. - enable_peer_verification: bool &default=F; - ## The CA certificate or CA bundle used for peer verification. - ## Empty will use the implementations's default when - ## ``enable_peer_verification`` is T. - ca_file: string &default=""; - ## The ciphers to use. Empty will use the implementation's defaults. - ciphers: string &default=""; - }; - - ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. - type WebSocketServerOptions: record { - ## The address to listen on, cannot be used together with ``listen_host``. - listen_addr: addr &optional; - ## The port the WebSocket server is supposed to listen on. - listen_port: port; - ## The maximum event queue size for this server. - max_event_queue_size: count &default=default_websocket_max_event_queue_size; - ## Ping interval to use. A WebSocket client not responding to - ## the pings will be disconnected. Set to a negative value to - ## disable pings. Subsecond intervals are currently not supported. - ping_interval: interval &default=default_websocket_ping_interval; - ## The TLS options used for this WebSocket server. By default, - ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. - tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); - }; - ## Start listening on a WebSocket address. ## ## options: The server :zeek:see:`Cluster::WebSocketServerOptions` to use. ## ## Returns: T on success, else F. global listen_websocket: function(options: WebSocketServerOptions): bool; - - ## Network information of an endpoint. - type NetworkInfo: record { - ## The IP address or hostname where the endpoint listens. - address: string; - ## The port where the endpoint is bound to. - bound_port: port; - }; - - ## Information about a WebSocket endpoint. - type EndpointInfo: record { - id: string; - network: NetworkInfo; - ## The value of the X-Application-Name HTTP header, if any. - application_name: string &optional; - }; - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. - ## - ## Breaking from this hook has no effect. - ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_subscribe: hook(topic: string); - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. - ## - ## Breaking from this hook has no effect. - ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_unsubscribe: hook(topic: string); } -# Needs declaration of Cluster::Event type. @load base/bif/cluster.bif @load base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek @@ -674,16 +523,6 @@ function init(): bool return Cluster::Backend::__init(Cluster::node_id()); } -function subscribe(topic: string): bool - { - return Cluster::__subscribe(topic); - } - -function unsubscribe(topic: string): bool - { - return Cluster::__unsubscribe(topic); - } - function listen_websocket(options: WebSocketServerOptions): bool { return Cluster::__listen_websocket(options); diff --git a/scripts/base/frameworks/cluster/pubsub.zeek b/scripts/base/frameworks/cluster/pubsub.zeek new file mode 100644 index 0000000000..591a80f986 --- /dev/null +++ b/scripts/base/frameworks/cluster/pubsub.zeek @@ -0,0 +1,49 @@ +# A script that can be loaded by other scripts to access the publish subscribe API. + +@load ./types + +module Cluster; + +export { + ## Subscribe to the given topic. + ## + ## topic: The topic to subscribe to. + ## + ## Returns: T on success, else F. + global subscribe: function(topic: string): bool; + + ## Unsubscribe from the given topic. + ## + ## topic: The topic to unsubscribe from. + ## + ## Returns: T on success, else F. + global unsubscribe: function(topic: string): bool; + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_subscribe: hook(topic: string); + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_unsubscribe: hook(topic: string); +} + +# base/bif/cluster.bif.zeek generated from from src/cluster/cluster.bif contains the +# Cluster::publish(), Cluster::publish_hrw() and Cluster::publish_rr() APIs +@load base/bif/cluster.bif + +function subscribe(topic: string): bool + { + return Cluster::__subscribe(topic); + } + +function unsubscribe(topic: string): bool + { + return Cluster::__unsubscribe(topic); + } diff --git a/scripts/base/frameworks/cluster/types.zeek b/scripts/base/frameworks/cluster/types.zeek new file mode 100644 index 0000000000..f3243a93c6 --- /dev/null +++ b/scripts/base/frameworks/cluster/types.zeek @@ -0,0 +1,128 @@ +# Types used by the Cluster framework. +module Cluster; + +export { + ## Types of nodes that are allowed to participate in the cluster + ## configuration. + type NodeType: enum { + ## A dummy node type indicating the local node is not operating + ## within a cluster. + NONE, + ## A node type which is allowed to view/manipulate the configuration + ## of other nodes in the cluster. + CONTROL, + ## A node type responsible for log management. + LOGGER, + ## A node type responsible for policy management. + MANAGER, + ## A node type for relaying worker node communication and synchronizing + ## worker node state. + PROXY, + ## The node type doing all the actual traffic analysis. + WORKER, + }; + + ## Record type to indicate a node in a cluster. + type Node: record { + ## Identifies the type of cluster node in this node's configuration. + node_type: NodeType; + ## The IP address of the cluster node. + ip: addr; + ## If the *ip* field is a non-global IPv6 address, this field + ## can specify a particular :rfc:`4007` ``zone_id``. + zone_id: string &default=""; + ## The port that this node will listen on for peer connections. + ## A value of ``0/unknown`` means the node is not pre-configured to listen. + p: port &default=0/unknown; + ## Name of the manager node this node uses. For workers and proxies. + manager: string &optional; + ## A unique identifier assigned to the node by the broker framework. + ## This field is only set while a node is connected. + id: string &optional; + ## The port used to expose metrics to Prometheus. Setting this in a cluster + ## configuration will override the setting for Telemetry::metrics_port for + ## the node. + metrics_port: port &optional; + }; + + ## Record to represent a cluster node including its name. + type NamedNode: record { + name: string; + node: Node; + }; + + ## An event instance for cluster pub/sub. + ## + ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. + type Event: record { + ## The event handler to be invoked on the remote node. + ev: any; + ## The arguments for the event. + args: vector of any; + }; + + ## The default maximum queue size for WebSocket event dispatcher instances. + ## + ## If the maximum queue size is reached, events from external WebSocket + ## clients will be stalled and processed once the queue has been drained. + ## + ## An internal metric named ``cluster_onloop_queue_stalls`` and + ## labeled with a ``WebSocketEventDispatcher::`` tag + ## is incremented when the maximum queue size is reached. + const default_websocket_max_event_queue_size = 32 &redef; + + ## The default ping interval for WebSocket clients. + const default_websocket_ping_interval = 5 sec &redef; + + ## The TLS options for a WebSocket server. + ## + ## If cert_file and key_file are set, TLS is enabled. If both + ## are unset, TLS is disabled. Any other combination is an error. + type WebSocketTLSOptions: record { + ## The cert file to use. + cert_file: string &optional; + ## The key file to use. + key_file: string &optional; + ## Expect peers to send client certificates. + enable_peer_verification: bool &default=F; + ## The CA certificate or CA bundle used for peer verification. + ## Empty will use the implementations's default when + ## ``enable_peer_verification`` is T. + ca_file: string &default=""; + ## The ciphers to use. Empty will use the implementation's defaults. + ciphers: string &default=""; + }; + + ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. + type WebSocketServerOptions: record { + ## The address to listen on, cannot be used together with ``listen_host``. + listen_addr: addr &optional; + ## The port the WebSocket server is supposed to listen on. + listen_port: port; + ## The maximum event queue size for this server. + max_event_queue_size: count &default=default_websocket_max_event_queue_size; + ## Ping interval to use. A WebSocket client not responding to + ## the pings will be disconnected. Set to a negative value to + ## disable pings. Subsecond intervals are currently not supported. + ping_interval: interval &default=default_websocket_ping_interval; + ## The TLS options used for this WebSocket server. By default, + ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. + tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); + }; + + ## Network information of an endpoint. + type NetworkInfo: record { + ## The IP address or hostname where the endpoint listens. + address: string; + ## The port where the endpoint is bound to. + bound_port: port; + }; + + ## Information about a WebSocket endpoint. + type EndpointInfo: record { + id: string; + network: NetworkInfo; + ## The value of the X-Application-Name HTTP header, if any. + application_name: string &optional; + }; +} diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 92ae304ef1..57c9152075 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -136,10 +136,12 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek + scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 8d2b58f0e1..2476ec6e1b 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -136,10 +136,12 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek + scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 8697db08b7..6ad9831781 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -477,6 +477,7 @@ 0.000000 MetaHookPost LoadFile(0, ./polling, <...>/polling.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./pools, <...>/pools.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./postprocessors, <...>/postprocessors) -> -1 +0.000000 MetaHookPost LoadFile(0, ./pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./scp, <...>/scp.zeek) -> -1 @@ -793,6 +794,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./polling, <...>/polling.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./pools, <...>/pools.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./postprocessors, <...>/postprocessors) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./scp, <...>/scp.zeek) -> (-1, ) @@ -1419,6 +1421,7 @@ 0.000000 MetaHookPre LoadFile(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFile(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFile(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFile(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./scp, <...>/scp.zeek) @@ -1735,6 +1738,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./scp, <...>/scp.zeek) @@ -2371,6 +2375,7 @@ 0.000000 | HookLoadFile ./pools <...>/pools.zeek 0.000000 | HookLoadFile ./postprocessors <...>/postprocessors 0.000000 | HookLoadFile ./programming <...>/programming.sig +0.000000 | HookLoadFile ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile ./python <...>/python.sig 0.000000 | HookLoadFile ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile ./reporter.bif.zeek <...>/reporter.bif.zeek @@ -2687,6 +2692,7 @@ 0.000000 | HookLoadFileExtended ./pools <...>/pools.zeek 0.000000 | HookLoadFileExtended ./postprocessors <...>/postprocessors 0.000000 | HookLoadFileExtended ./programming <...>/programming.sig +0.000000 | HookLoadFileExtended ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended ./python <...>/python.sig 0.000000 | HookLoadFileExtended ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended ./reporter.bif.zeek <...>/reporter.bif.zeek From 351bd7b81b1428d3afcf974032324aab7b444aac Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 10:10:17 +0200 Subject: [PATCH 06/12] cluster: Move Broker connect logic into policy package Place a package into policy/frameworks/cluster/backend/broker and move Broker's connection specific logic there. --- scripts/base/frameworks/cluster/main.zeek | 30 ++-- .../frameworks/cluster/setup-connections.zeek | 110 +------------ .../cluster/backend/broker/__load__.zeek | 1 + .../cluster/backend/broker/main.zeek | 152 ++++++++++++++++++ scripts/test-all-policy.zeek | 2 + 5 files changed, 165 insertions(+), 130 deletions(-) create mode 100644 scripts/policy/frameworks/cluster/backend/broker/__load__.zeek create mode 100644 scripts/policy/frameworks/cluster/backend/broker/main.zeek diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 8b86188996..f4e2a17808 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -263,6 +263,15 @@ export { ## ## Returns: T on success, else F. global listen_websocket: function(options: WebSocketServerOptions): bool; + + ## This hook is called when the local node connects to other nodes based on + ## the given cluster layout. Breaking from the hook will prevent connection + ## establishment. + ## + ## This hook only applies to the Broker cluster backend. + ## + ## connectee: The node to connect to. + global connect_node_hook: hook(connectee: NamedNode); } @load base/bif/cluster.bif @@ -383,27 +392,6 @@ event Cluster::hello(name: string, id: string) &priority=10 add active_node_ids[n$node_type][id]; } -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - if ( ! Cluster::is_enabled() ) - return; - - local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); - Broker::publish(nodeid_topic(endpoint$id), e); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - for ( node_name, n in nodes ) - { - if ( n?$id && n$id == endpoint$id ) - { - event Cluster::node_down(node_name, endpoint$id); - break; - } - } - } - event node_down(name: string, id: string) &priority=10 { local found = F; diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index 84c2a7d2fe..ea67b5115a 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -3,63 +3,10 @@ @load ./main @load ./pools -@load base/frameworks/broker module Cluster; -export { - ## This hook is called when the local node connects to other nodes based on - ## the given cluster layout. Breaking from the hook will prevent connection - ## establishment. - ## - ## connectee: The node to connect to. - global connect_node_hook: hook(connectee: NamedNode); -} - -function connect_peer(node_type: NodeType, node_name: string) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( n$name != node_name ) - next; - if ( ! hook connect_node_hook(n) ) - return; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - return; - } - - Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); - } - -function connect_peers_with_type(node_type: NodeType) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( ! hook connect_node_hook(n) ) - next; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - } - } - -event zeek_init() &priority=-10 +event zeek_init() &priority=-5 { if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) return; @@ -98,59 +45,4 @@ event zeek_init() &priority=-10 Cluster::subscribe(nodeid_topic(Cluster::node_id())); Cluster::subscribe(node_topic(node)); - - - # Listening and connecting to other peers is broker specific, - # short circuit if Zeek is configured with a different - # cluster backend. - # - # In the future, this could move into a policy script, but - # for the time being it's easier for backwards compatibility - # to keep this here. - if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) - return; - - # Logging setup: Anything handling logging additionally subscribes - # to Broker::default_log_topic_prefix. - switch ( self$node_type ) { - case LOGGER: - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - case MANAGER: - if ( Cluster::manager_is_logger ) - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - } - - if ( self$p != 0/unknown ) - { - Broker::listen(Broker::default_listen_address, - self$p, - Broker::default_listen_retry); - - Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); - } - - - switch ( self$node_type ) { - case MANAGER: - connect_peers_with_type(LOGGER); - - break; - case PROXY: - connect_peers_with_type(LOGGER); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - case WORKER: - connect_peers_with_type(LOGGER); - connect_peers_with_type(PROXY); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - } } diff --git a/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/cluster/backend/broker/main.zeek b/scripts/policy/frameworks/cluster/backend/broker/main.zeek new file mode 100644 index 0000000000..91da149856 --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/main.zeek @@ -0,0 +1,152 @@ +##! Broker cluster backend support. +##! +##! The Broker cluster backend is a peer-to-peer backend that has been +##! in use since Bro 2.6 until Zeek 8.1. Individual Zeek cluster nodes +##! peer with each other using a fixed connection strategy using the information +##! stored in :zeek:see:`Cluster::nodes` populated by the cluster-layout.zeek +##! file or via supervisor internal functionality. +##! +##! Conceptually: +##! +##! * All nodes peer with all logger nodes +##! * All worker nodes peer with all proxy nodes and the manager node +##! * All proxy nodes peer with the manager +##! +##! This implies that logger, manager and proxy nodes are all listening +##! on the ports defined in the cluster layout. +##! +##! Note that publish-subscribe visibility with Broker is limited to nodes +##! that are directly peered. A worker publishing a message to a topic another +##! worker node is subscribed to will not be visible by the other worker. + +module Cluster; + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; + +function connect_peer(node_type: NodeType, node_name: string) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( n$name != node_name ) + next; + if ( ! hook connect_node_hook(n) ) + return; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + return; + } + + Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); + } + +function connect_peers_with_type(node_type: NodeType) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( ! hook connect_node_hook(n) ) + next; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + } + } + +# Whenever a node adds a Broker peer, it sends Cluster::hello() identifying +# itself to the peer. The other peer then raises Cluster::node_up(), upon +# seeing the Cluster::hello() +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + if ( ! Cluster::is_enabled() ) + return; + + local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); + Broker::publish(nodeid_topic(endpoint$id), e); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + for ( node_name, n in nodes ) + { + if ( n?$id && n$id == endpoint$id ) + { + event Cluster::node_down(node_name, endpoint$id); + break; + } + } + } + +# The event handler setting up subscriptions has priority -5. It runs +# before this handler. Priority -10 also means that a user can fiddle +# with the cluster-layout in zeek_init() for testing. +event zeek_init() &priority=-10 + { + if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) + return; + + if ( ! Cluster::is_enabled() ) + return; + + if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) + return; + + local self = Cluster::nodes[Cluster::node]; + + # Logging setup: Anything handling logging additionally subscribes + # to Broker::default_log_topic_prefix. + switch ( self$node_type ) { + case LOGGER: + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + case MANAGER: + if ( Cluster::manager_is_logger ) + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + } + + if ( self$p != 0/unknown ) + { + Broker::listen(Broker::default_listen_address, + self$p, + Broker::default_listen_retry); + + Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); + } + + + switch ( self$node_type ) { + case MANAGER: + connect_peers_with_type(LOGGER); + + break; + case PROXY: + connect_peers_with_type(LOGGER); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + case WORKER: + connect_peers_with_type(LOGGER); + connect_peers_with_type(PROXY); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + } + } diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 3292921e86..6f1f5cd879 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -14,6 +14,8 @@ @load frameworks/analyzer/packet-segment-logging.zeek # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/backend/broker/__load__.zeek +@load frameworks/cluster/backend/broker/main.zeek @ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ ) @load frameworks/cluster/backend/zeromq/__load__.zeek # @load frameworks/cluster/backend/zeromq/connect.zeek From d122894d0dd2b6fffff70ab46840fcf174bedf50 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 18:10:38 +0200 Subject: [PATCH 07/12] cluster: Rename setup-connections to setup-subscriptions Now that all the logic about establishing connections among nodes has moved to the broker policy script, the setup-connections name seems unfortunate, transition away from it. --- scripts/base/frameworks/cluster/__load__.zeek | 2 +- scripts/base/frameworks/cluster/pools.zeek | 2 +- .../frameworks/cluster/setup-connections.zeek | 49 +------------------ .../cluster/setup-subscriptions.zeek | 46 +++++++++++++++++ .../Baseline/coverage.bare-mode-errors/errors | 1 + .../coverage.init-default/missing_loads | 1 + 6 files changed, 52 insertions(+), 49 deletions(-) create mode 100644 scripts/base/frameworks/cluster/setup-subscriptions.zeek diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 2ca60fb67a..a7c200b16c 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -43,7 +43,7 @@ redef Cluster::manager_is_logger = F; @if ( Cluster::node in Cluster::nodes ) -@load ./setup-connections +@load ./setup-subscriptions @if ( Cluster::local_node_type() == Cluster::MANAGER ) @load ./nodes/manager diff --git a/scripts/base/frameworks/cluster/pools.zeek b/scripts/base/frameworks/cluster/pools.zeek index 418706b554..effb491d10 100644 --- a/scripts/base/frameworks/cluster/pools.zeek +++ b/scripts/base/frameworks/cluster/pools.zeek @@ -346,7 +346,7 @@ function pool_sorter(a: Pool, b: Pool): int return strcmp(a$spec$topic, b$spec$topic); } -# Needs to execute before the zeek_init in setup-connections +# Needs to execute before the zeek_init in setup-subscriptions event zeek_init() &priority=-5 { if ( ! Cluster::is_enabled() ) diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index ea67b5115a..ef373ecf47 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -1,48 +1,3 @@ -##! This script establishes communication among all nodes in a cluster -##! as defined by :zeek:id:`Cluster::nodes`. +@deprecated "Remove in v9.1: Load base/frameworks/cluster/setup-subscriptions instead" -@load ./main -@load ./pools - -module Cluster; - -event zeek_init() &priority=-5 - { - if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) - return; - - local self = nodes[node]; - - for ( i in registered_pools ) - { - local pool = registered_pools[i]; - - if ( node in pool$nodes ) - Cluster::subscribe(pool$spec$topic); - } - - switch ( self$node_type ) { - case NONE: - return; - case CONTROL: - break; - case LOGGER: - Cluster::subscribe(Cluster::logger_topic); - break; - case MANAGER: - Cluster::subscribe(Cluster::manager_topic); - break; - case PROXY: - Cluster::subscribe(Cluster::proxy_topic); - break; - case WORKER: - Cluster::subscribe(Cluster::worker_topic); - break; - default: - Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); - return; - } - - Cluster::subscribe(nodeid_topic(Cluster::node_id())); - Cluster::subscribe(node_topic(node)); - } +@load ./setup-subscriptions diff --git a/scripts/base/frameworks/cluster/setup-subscriptions.zeek b/scripts/base/frameworks/cluster/setup-subscriptions.zeek new file mode 100644 index 0000000000..9aef5f302b --- /dev/null +++ b/scripts/base/frameworks/cluster/setup-subscriptions.zeek @@ -0,0 +1,46 @@ +##! This script contains the common subscription setup logic. +@load ./main +@load ./pools + +module Cluster; + +event zeek_init() &priority=-5 + { + if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) + return; + + local self = nodes[node]; + + for ( i in registered_pools ) + { + local pool = registered_pools[i]; + + if ( node in pool$nodes ) + Cluster::subscribe(pool$spec$topic); + } + + switch ( self$node_type ) { + case NONE: + return; + case CONTROL: + break; + case LOGGER: + Cluster::subscribe(Cluster::logger_topic); + break; + case MANAGER: + Cluster::subscribe(Cluster::manager_topic); + break; + case PROXY: + Cluster::subscribe(Cluster::proxy_topic); + break; + case WORKER: + Cluster::subscribe(Cluster::worker_topic); + break; + default: + Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); + return; + } + + Cluster::subscribe(nodeid_topic(Cluster::node_id())); + Cluster::subscribe(node_topic(node)); + } diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index b1bb951e92..5ed62ac0af 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,2 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. +warning in <...>/setup-connections.zeek, line 1: deprecated script loaded from command line arguments "Remove in v9.1: Load base<...>/setup-subscriptions instead" diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 93af34614e..44c9f1ea1f 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -7,6 +7,7 @@ -./frameworks/cluster/nodes/proxy.zeek -./frameworks/cluster/nodes/worker.zeek -./frameworks/cluster/setup-connections.zeek +-./frameworks/cluster/setup-subscriptions.zeek -./frameworks/cluster/supervisor.zeek -./frameworks/intel/cluster.zeek -./frameworks/netcontrol/cluster.zeek From 78ce72307821522de44d512b8a85051721403529 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 11:15:18 +0200 Subject: [PATCH 08/12] cluster: Move more Broker specific scripts to policy/ --- scripts/base/frameworks/cluster/__load__.zeek | 6 ------ .../policy/frameworks/cluster/backend/broker/__load__.zeek | 3 +++ .../frameworks/cluster/backend/broker/backpressure.zeek} | 0 .../frameworks/cluster/backend/broker/telemetry.zeek} | 0 scripts/test-all-policy.zeek | 2 ++ testing/btest/Baseline/coverage.init-default/missing_loads | 2 -- 6 files changed, 5 insertions(+), 8 deletions(-) rename scripts/{base/frameworks/cluster/broker-backpressure.zeek => policy/frameworks/cluster/backend/broker/backpressure.zeek} (100%) rename scripts/{base/frameworks/cluster/broker-telemetry.zeek => policy/frameworks/cluster/backend/broker/telemetry.zeek} (100%) diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index a7c200b16c..c1656ef734 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -17,12 +17,6 @@ redef Broker::log_topic = Cluster::rr_log_topic; # Add a cluster prefix. @prefixes += cluster -# Broker-specific additions: -@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) -@load ./broker-backpressure -@load ./broker-telemetry -@endif - @if ( Supervisor::is_supervised() ) # When running a supervised cluster, populate Cluster::nodes from the node table # the Supervisor provides to new Zeek nodes. The management framework configures diff --git a/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek index a10fe855df..afbc86f258 100644 --- a/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek +++ b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek @@ -1 +1,4 @@ @load ./main + +@load ./backpressure +@load ./telemetry diff --git a/scripts/base/frameworks/cluster/broker-backpressure.zeek b/scripts/policy/frameworks/cluster/backend/broker/backpressure.zeek similarity index 100% rename from scripts/base/frameworks/cluster/broker-backpressure.zeek rename to scripts/policy/frameworks/cluster/backend/broker/backpressure.zeek diff --git a/scripts/base/frameworks/cluster/broker-telemetry.zeek b/scripts/policy/frameworks/cluster/backend/broker/telemetry.zeek similarity index 100% rename from scripts/base/frameworks/cluster/broker-telemetry.zeek rename to scripts/policy/frameworks/cluster/backend/broker/telemetry.zeek diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 6f1f5cd879..ba016a0de4 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -15,7 +15,9 @@ # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek @load frameworks/cluster/backend/broker/__load__.zeek +@load frameworks/cluster/backend/broker/backpressure.zeek @load frameworks/cluster/backend/broker/main.zeek +@load frameworks/cluster/backend/broker/telemetry.zeek @ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ ) @load frameworks/cluster/backend/zeromq/__load__.zeek # @load frameworks/cluster/backend/zeromq/connect.zeek diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 44c9f1ea1f..711e65bea1 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,7 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. --./frameworks/cluster/broker-backpressure.zeek -./frameworks/cluster/broker-stores.zeek --./frameworks/cluster/broker-telemetry.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek From 1570dd6a96584b40b1f29821359b2eadb1130914 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 24 Sep 2025 14:39:30 +0200 Subject: [PATCH 09/12] init-bare: Default to CLUSTER_BACKEND_NONE --- scripts/base/init-bare.zeek | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 07aeece67d..01f899777e 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -6011,8 +6011,8 @@ module Cluster; export { type Cluster::Pool: record {}; - ## Cluster backend to use. Default is the broker backend. - const backend = Cluster::CLUSTER_BACKEND_BROKER &redef; + ## Cluster backend to use. Default is the None backend. + const backend = Cluster::CLUSTER_BACKEND_NONE &redef; ## The event serializer to use by the cluster backend. ## From 5a10772b7d736ea6c912bb2269483407aafa90b8 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 16:34:19 +0200 Subject: [PATCH 10/12] btest: Use Broker for Broker specific tests Now that Cluster::backend defaults to NONE by default, some tests won't just work anymore for various reasons (mostly due to the logging framework not sending log writes to broker anymore). Load the new frameworks/cluster/backend/broker package for these to keep them functional. Also add the @load to the common broker/cluster-layout.zeek file to avoid changing all of the files that use that layout. --- testing/btest/Files/broker/cluster-layout.zeek | 6 ++++++ testing/btest/cluster/generic/cluster-publish-errors.zeek | 2 +- .../btest/cluster/websocket/terminate-while-queuing.zeek | 3 +++ testing/btest/core/suspend_processing/websocket.zeek | 2 ++ testing/btest/plugins/publish-event-hook.zeek | 3 ++- .../base/frameworks/control/configuration_update.zeek | 2 ++ testing/btest/scripts/base/frameworks/control/id_value.zeek | 2 ++ testing/btest/scripts/base/frameworks/control/shutdown.zeek | 2 ++ .../scripts/base/frameworks/logging/length-checking.zeek | 3 +++ .../frameworks/cluster/cluster_started_restart_manager.zeek | 1 + .../frameworks/cluster/cluster_started_restart_worker.zeek | 1 + .../scripts/policy/frameworks/telemetry/prometheus.zeek | 1 + testing/btest/supervisor/config-cluster.zeek | 1 + 13 files changed, 27 insertions(+), 2 deletions(-) diff --git a/testing/btest/Files/broker/cluster-layout.zeek b/testing/btest/Files/broker/cluster-layout.zeek index f495e8da29..d8cc306386 100644 --- a/testing/btest/Files/broker/cluster-layout.zeek +++ b/testing/btest/Files/broker/cluster-layout.zeek @@ -19,6 +19,12 @@ # to Cluster::nodes. +# Using this testing cluster-layout.zeek switches Zeek to use Broker. +# +# This is side-loaded here to keep all existing tests that rely on +# this Broker testing cluster setup to continue working. +@load frameworks/cluster/backend/broker + # Redef'ed to F if logger-1 or logger-2 are active. redef Cluster::manager_is_logger = T; diff --git a/testing/btest/cluster/generic/cluster-publish-errors.zeek b/testing/btest/cluster/generic/cluster-publish-errors.zeek index 5e106012a0..25cc5cd733 100644 --- a/testing/btest/cluster/generic/cluster-publish-errors.zeek +++ b/testing/btest/cluster/generic/cluster-publish-errors.zeek @@ -1,7 +1,7 @@ # @TEST-DOC: Test errors of cluster bifs # # @TEST-EXEC: zeek --parse-only -b %INPUT -# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: zeek -b %INPUT frameworks/cluster/backend/broker # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek index 787f02671a..1a057ec983 100644 --- a/testing/btest/cluster/websocket/terminate-while-queuing.zeek +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -19,6 +19,9 @@ # @TEST-EXEC: btest-diff ./client/.stderr # @TEST-START-FILE manager.zeek + +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; # Force dispatcher queue being full quickly! diff --git a/testing/btest/core/suspend_processing/websocket.zeek b/testing/btest/core/suspend_processing/websocket.zeek index 631d57dd95..4201014673 100644 --- a/testing/btest/core/suspend_processing/websocket.zeek +++ b/testing/btest/core/suspend_processing/websocket.zeek @@ -15,6 +15,8 @@ # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff worker/.stdout # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff client/.stdout +@load frameworks/cluster/backend/broker + global my_new_connection: event(uid: string, c: count); event zeek_init() diff --git a/testing/btest/plugins/publish-event-hook.zeek b/testing/btest/plugins/publish-event-hook.zeek index 704a1ccc59..c5efda7caa 100644 --- a/testing/btest/plugins/publish-event-hook.zeek +++ b/testing/btest/plugins/publish-event-hook.zeek @@ -4,8 +4,9 @@ # @TEST-EXEC: ZEEK_PLUGIN_PATH=`pwd` zeek -b Demo::PublishEvent %INPUT > output # @TEST-EXEC: btest-diff output -redef allow_network_time_forward = F; +@load frameworks/cluster/backend/broker +redef allow_network_time_forward = F; module App; diff --git a/testing/btest/scripts/base/frameworks/control/configuration_update.zeek b/testing/btest/scripts/base/frameworks/control/configuration_update.zeek index d2507f7ee3..2afe60c0a8 100644 --- a/testing/btest/scripts/base/frameworks/control/configuration_update.zeek +++ b/testing/btest/scripts/base/frameworks/control/configuration_update.zeek @@ -5,6 +5,8 @@ # @TEST-EXEC: btest-bg-wait 30 # @TEST-EXEC: btest-diff controllee/.stdout +@load frameworks/cluster/backend/broker + @load base/frameworks/control const test_var = "ORIGINAL VALUE (this should be printed out first)" &redef; diff --git a/testing/btest/scripts/base/frameworks/control/id_value.zeek b/testing/btest/scripts/base/frameworks/control/id_value.zeek index c12d137c6a..33e8b525cf 100644 --- a/testing/btest/scripts/base/frameworks/control/id_value.zeek +++ b/testing/btest/scripts/base/frameworks/control/id_value.zeek @@ -9,6 +9,8 @@ # @TEST-EXEC: btest-bg-wait 30 # @TEST-EXEC: btest-diff controller/.stdout +@load frameworks/cluster/backend/broker + @load base/frameworks/control # This value shouldn't ever be printed to the controllers stdout. diff --git a/testing/btest/scripts/base/frameworks/control/shutdown.zeek b/testing/btest/scripts/base/frameworks/control/shutdown.zeek index 832ca8a591..3741cf98df 100644 --- a/testing/btest/scripts/base/frameworks/control/shutdown.zeek +++ b/testing/btest/scripts/base/frameworks/control/shutdown.zeek @@ -4,3 +4,5 @@ # @TEST-EXEC: btest-bg-run controller ZEEKPATH=$ZEEKPATH:.. zeek -b %INPUT frameworks/control/controller Control::host=127.0.0.1 Control::host_port=$BROKER_PORT Control::cmd=shutdown # @TEST-EXEC: btest-bg-wait 20 +@load frameworks/cluster/backend/broker + diff --git a/testing/btest/scripts/base/frameworks/logging/length-checking.zeek b/testing/btest/scripts/base/frameworks/logging/length-checking.zeek index 95e966c3d6..cee6e8cc25 100644 --- a/testing/btest/scripts/base/frameworks/logging/length-checking.zeek +++ b/testing/btest/scripts/base/frameworks/logging/length-checking.zeek @@ -17,6 +17,9 @@ # @TEST-START-FILE common.zeek @load base/frameworks/notice/weird +# Ensure logging is done via Broker +@load frameworks/cluster/backend/broker + module Test; # Disable the string and container length filtering. diff --git a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek index fcc9f7e3e9..a762121483 100644 --- a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek +++ b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek @@ -12,6 +12,7 @@ # @TEST-EXEC: btest-diff zeek/worker-1/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away. diff --git a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek index 457d34db11..112559c10c 100644 --- a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek +++ b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek @@ -13,6 +13,7 @@ # @TEST-EXEC: btest-diff zeek/worker-2/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away. diff --git a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek index 9b09784e1f..aeef3df959 100644 --- a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -55,6 +55,7 @@ for host in $(echo ${services_data} | jq -r '.[0].targets[]' | sort); do done # @TEST-END-FILE +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental @load base/frameworks/telemetry diff --git a/testing/btest/supervisor/config-cluster.zeek b/testing/btest/supervisor/config-cluster.zeek index f551ec5044..ebdaaf7019 100644 --- a/testing/btest/supervisor/config-cluster.zeek +++ b/testing/btest/supervisor/config-cluster.zeek @@ -11,6 +11,7 @@ # @TEST-EXEC: btest-diff zeek/worker-1/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away. From 9dc1f9b211e7fa976e200b74f7a3667fcc47e086 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 16:43:29 +0200 Subject: [PATCH 11/12] btest/broker: Fix logging tests This is a bit annoying: The logging framework looks at Cluster::backend to decide what should happen with log writes and only do the right thing if it was switched to broker. --- testing/btest/broker/remote_log.zeek | 3 +++ testing/btest/broker/remote_log_batch.zeek | 3 +++ testing/btest/broker/remote_log_late_join.zeek | 3 +++ testing/btest/broker/remote_log_types.zeek | 3 +++ 4 files changed, 12 insertions(+) diff --git a/testing/btest/broker/remote_log.zeek b/testing/btest/broker/remote_log.zeek index 9a030b2d9e..c3f0cc9303 100644 --- a/testing/btest/broker/remote_log.zeek +++ b/testing/btest/broker/remote_log.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_batch.zeek b/testing/btest/broker/remote_log_batch.zeek index d0f3b44ce8..f8b2694bb2 100644 --- a/testing/btest/broker/remote_log_batch.zeek +++ b/testing/btest/broker/remote_log_batch.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_late_join.zeek b/testing/btest/broker/remote_log_late_join.zeek index bc5660f424..294605385e 100644 --- a/testing/btest/broker/remote_log_late_join.zeek +++ b/testing/btest/broker/remote_log_late_join.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_types.zeek b/testing/btest/broker/remote_log_types.zeek index cb4e74251c..1817fe4cf6 100644 --- a/testing/btest/broker/remote_log_types.zeek +++ b/testing/btest/broker/remote_log_types.zeek @@ -16,6 +16,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; global quit_receiver: event(); From 101060e194844c8c7f066aa568025520f17f2e51 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 19:01:49 +0200 Subject: [PATCH 12/12] supervisor: Switch from Broker:: to Cluster:: --- scripts/base/frameworks/supervisor/main.zeek | 14 ++++++++------ .../canonified_loaded_scripts.log | 6 +++--- .../canonified_loaded_scripts.log | 6 +++--- testing/btest/Baseline/plugins.hooks/output | 18 ++++++++++++------ .../management/controller/agent-checkin.zeek | 1 + 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index fbc7cf1009..2b6f0ed86a 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -1,6 +1,8 @@ ##! Implements Zeek process supervision API and default behavior for its ##! associated (remote) control events. +@load base/frameworks/cluster/pubsub + @load ./api @load ./control @@ -49,7 +51,7 @@ event zeek_init() &priority=10 Broker::listen(); } - Broker::subscribe(SupervisorControl::topic_prefix); + Cluster::subscribe(SupervisorControl::topic_prefix); } event SupervisorControl::stop_request() @@ -67,7 +69,7 @@ event SupervisorControl::status_request(reqid: string, node: string) local res = Supervisor::status(node); local topic = SupervisorControl::topic_prefix + fmt("/status_response/%s", reqid); - Broker::publish(topic, SupervisorControl::status_response, reqid, res); + Cluster::publish(topic, SupervisorControl::status_response, reqid, res); } event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeConfig) @@ -77,7 +79,7 @@ event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeCon local res = Supervisor::create(node); local topic = SupervisorControl::topic_prefix + fmt("/create_response/%s", reqid); - Broker::publish(topic, SupervisorControl::create_response, reqid, res); + Cluster::publish(topic, SupervisorControl::create_response, reqid, res); } event SupervisorControl::destroy_request(reqid: string, node: string) @@ -87,7 +89,7 @@ event SupervisorControl::destroy_request(reqid: string, node: string) local res = Supervisor::destroy(node); local topic = SupervisorControl::topic_prefix + fmt("/destroy_response/%s", reqid); - Broker::publish(topic, SupervisorControl::destroy_response, reqid, res); + Cluster::publish(topic, SupervisorControl::destroy_response, reqid, res); } event SupervisorControl::restart_request(reqid: string, node: string) @@ -97,7 +99,7 @@ event SupervisorControl::restart_request(reqid: string, node: string) local res = Supervisor::restart(node); local topic = SupervisorControl::topic_prefix + fmt("/restart_response/%s", reqid); - Broker::publish(topic, SupervisorControl::restart_response, reqid, res); + Cluster::publish(topic, SupervisorControl::restart_response, reqid, res); } event Supervisor::node_status(node: string, pid: count) @@ -106,5 +108,5 @@ event Supervisor::node_status(node: string, pid: count) return; local topic = SupervisorControl::topic_prefix + "/node_status"; - Broker::publish(topic, SupervisorControl::node_status, node, pid); + Cluster::publish(topic, SupervisorControl::node_status, node, pid); } diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 57c9152075..bb8a7653d4 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek - build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek - scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 2476ec6e1b..bdc4807cdf 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,12 +139,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek - build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek - scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 6ad9831781..560b182567 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -18,13 +18,13 @@ 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -> -0.000000 MetaHookPost CallFunction(Broker::__subscribe, , (zeek/supervisor)) -> -0.000000 MetaHookPost CallFunction(Broker::subscribe, , (zeek/supervisor)) -> +0.000000 MetaHookPost CallFunction(Cluster::__subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) -> +0.000000 MetaHookPost CallFunction(Cluster::subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) -> 0.000000 MetaHookPost CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) -> 0.000000 MetaHookPost CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) -> @@ -583,6 +583,7 @@ 0.000000 MetaHookPost LoadFile(0, base<...>/ppp, <...>/ppp) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/pppoe, <...>/pppoe) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/root, <...>/root) -> -1 @@ -900,6 +901,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp, <...>/ppp) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/root, <...>/root) -> (-1, ) @@ -962,13 +964,13 @@ 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -0.000000 MetaHookPre CallFunction(Broker::__subscribe, , (zeek/supervisor)) -0.000000 MetaHookPre CallFunction(Broker::subscribe, , (zeek/supervisor)) +0.000000 MetaHookPre CallFunction(Cluster::__subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) +0.000000 MetaHookPre CallFunction(Cluster::subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) 0.000000 MetaHookPre CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) 0.000000 MetaHookPre CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) @@ -1527,6 +1529,7 @@ 0.000000 MetaHookPre LoadFile(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFile(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/root, <...>/root) @@ -1844,6 +1847,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/root, <...>/root) @@ -1906,12 +1910,12 @@ 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 81/tcp) 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 8888/tcp) 0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, {80<...>/tcp}) -0.000000 | HookCallFunction Broker::__subscribe(zeek/supervisor) -0.000000 | HookCallFunction Broker::subscribe(zeek/supervisor) +0.000000 | HookCallFunction Cluster::__subscribe(zeek/supervisor) 0.000000 | HookCallFunction Cluster::is_enabled() 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F]) +0.000000 | HookCallFunction Cluster::subscribe(zeek/supervisor) 0.000000 | HookCallFunction Config::config_option_changed(Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, ) 0.000000 | HookCallFunction Files::register_protocol(Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}]) 0.000000 | HookCallFunction Log::__add_filter(Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=]) @@ -2483,6 +2487,7 @@ 0.000000 | HookLoadFile base<...>/ppp <...>/ppp 0.000000 | HookLoadFile base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFile base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFile base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFile base<...>/root <...>/root @@ -2800,6 +2805,7 @@ 0.000000 | HookLoadFileExtended base<...>/ppp <...>/ppp 0.000000 | HookLoadFileExtended base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFileExtended base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFileExtended base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFileExtended base<...>/root <...>/root diff --git a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek index 26f049a6e6..c441f0a9e5 100644 --- a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek +++ b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek @@ -11,6 +11,7 @@ # @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-diff zeek/nodes/controller/stdout +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/management/agent @load policy/frameworks/management/controller