diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 2072b457a7..c1656ef734 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -1,7 +1,9 @@ # Load the core cluster support. @load ./main @load ./pools +@load ./pubsub @load ./telemetry +@load ./types @if ( Cluster::is_enabled() ) @@ -15,12 +17,6 @@ redef Broker::log_topic = Cluster::rr_log_topic; # Add a cluster prefix. @prefixes += cluster -# Broker-specific additions: -@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) -@load ./broker-backpressure -@load ./broker-telemetry -@endif - @if ( Supervisor::is_supervised() ) # When running a supervised cluster, populate Cluster::nodes from the node table # the Supervisor provides to new Zeek nodes. The management framework configures @@ -41,7 +37,7 @@ redef Cluster::manager_is_logger = F; @if ( Cluster::node in Cluster::nodes ) -@load ./setup-connections +@load ./setup-subscriptions @if ( Cluster::local_node_type() == Cluster::MANAGER ) @load ./nodes/manager diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 7124cf23d8..f4e2a17808 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -18,6 +18,8 @@ @load base/frameworks/control @load base/frameworks/broker +@load ./types + module Cluster; export { @@ -75,19 +77,6 @@ export { ## :zeek:see:`Cluster::create_store` with the *persistent* argument set true. const default_persistent_backend = Broker::SQLITE &redef; - ## The default maximum queue size for WebSocket event dispatcher instances. - ## - ## If the maximum queue size is reached, events from external WebSocket - ## clients will be stalled and processed once the queue has been drained. - ## - ## An internal metric named ``cluster_onloop_queue_stalls`` and - ## labeled with a ``WebSocketEventDispatcher::`` tag - ## is incremented when the maximum queue size is reached. - const default_websocket_max_event_queue_size = 32 &redef; - - ## The default ping interval for WebSocket clients. - const default_websocket_ping_interval = 5 sec &redef; - ## Setting a default dir will, for persistent backends that have not ## been given an explicit file path via :zeek:see:`Cluster::stores`, ## automatically create a path within this dir that is based on the name of @@ -157,55 +146,6 @@ export { message: string; } &log; - ## Types of nodes that are allowed to participate in the cluster - ## configuration. - type NodeType: enum { - ## A dummy node type indicating the local node is not operating - ## within a cluster. - NONE, - ## A node type which is allowed to view/manipulate the configuration - ## of other nodes in the cluster. - CONTROL, - ## A node type responsible for log management. - LOGGER, - ## A node type responsible for policy management. - MANAGER, - ## A node type for relaying worker node communication and synchronizing - ## worker node state. - PROXY, - ## The node type doing all the actual traffic analysis. - WORKER, - }; - - ## Record type to indicate a node in a cluster. - type Node: record { - ## Identifies the type of cluster node in this node's configuration. - node_type: NodeType; - ## The IP address of the cluster node. - ip: addr; - ## If the *ip* field is a non-global IPv6 address, this field - ## can specify a particular :rfc:`4007` ``zone_id``. - zone_id: string &default=""; - ## The port that this node will listen on for peer connections. - ## A value of ``0/unknown`` means the node is not pre-configured to listen. - p: port &default=0/unknown; - ## Name of the manager node this node uses. For workers and proxies. - manager: string &optional; - ## A unique identifier assigned to the node by the broker framework. - ## This field is only set while a node is connected. - id: string &optional; - ## The port used to expose metrics to Prometheus. Setting this in a cluster - ## configuration will override the setting for Telemetry::metrics_port for - ## the node. - metrics_port: port &optional; - }; - - ## Record to represent a cluster node including its name. - type NamedNode: record { - name: string; - node: Node; - }; - ## This function can be called at any time to determine if the cluster ## framework is being enabled for this run. ## @@ -317,66 +257,6 @@ export { ## Returns: T on success, else F. global init: function(): bool; - ## Subscribe to the given topic. - ## - ## topic: The topic to subscribe to. - ## - ## Returns: T on success, else F. - global subscribe: function(topic: string): bool; - - ## Unsubscribe from the given topic. - ## - ## topic: The topic to unsubscribe from. - ## - ## Returns: T on success, else F. - global unsubscribe: function(topic: string): bool; - - ## An event instance for cluster pub/sub. - ## - ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. - type Event: record { - ## The event handler to be invoked on the remote node. - ev: any; - ## The arguments for the event. - args: vector of any; - }; - - ## The TLS options for a WebSocket server. - ## - ## If cert_file and key_file are set, TLS is enabled. If both - ## are unset, TLS is disabled. Any other combination is an error. - type WebSocketTLSOptions: record { - ## The cert file to use. - cert_file: string &optional; - ## The key file to use. - key_file: string &optional; - ## Expect peers to send client certificates. - enable_peer_verification: bool &default=F; - ## The CA certificate or CA bundle used for peer verification. - ## Empty will use the implementations's default when - ## ``enable_peer_verification`` is T. - ca_file: string &default=""; - ## The ciphers to use. Empty will use the implementation's defaults. - ciphers: string &default=""; - }; - - ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. - type WebSocketServerOptions: record { - ## The address to listen on, cannot be used together with ``listen_host``. - listen_addr: addr &optional; - ## The port the WebSocket server is supposed to listen on. - listen_port: port; - ## The maximum event queue size for this server. - max_event_queue_size: count &default=default_websocket_max_event_queue_size; - ## Ping interval to use. A WebSocket client not responding to - ## the pings will be disconnected. Set to a negative value to - ## disable pings. Subsecond intervals are currently not supported. - ping_interval: interval &default=default_websocket_ping_interval; - ## The TLS options used for this WebSocket server. By default, - ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. - tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); - }; - ## Start listening on a WebSocket address. ## ## options: The server :zeek:see:`Cluster::WebSocketServerOptions` to use. @@ -384,38 +264,16 @@ export { ## Returns: T on success, else F. global listen_websocket: function(options: WebSocketServerOptions): bool; - ## Network information of an endpoint. - type NetworkInfo: record { - ## The IP address or hostname where the endpoint listens. - address: string; - ## The port where the endpoint is bound to. - bound_port: port; - }; - - ## Information about a WebSocket endpoint. - type EndpointInfo: record { - id: string; - network: NetworkInfo; - ## The value of the X-Application-Name HTTP header, if any. - application_name: string &optional; - }; - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## This hook is called when the local node connects to other nodes based on + ## the given cluster layout. Breaking from the hook will prevent connection + ## establishment. ## - ## Breaking from this hook has no effect. + ## This hook only applies to the Broker cluster backend. ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_subscribe: hook(topic: string); - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. - ## - ## Breaking from this hook has no effect. - ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_unsubscribe: hook(topic: string); + ## connectee: The node to connect to. + global connect_node_hook: hook(connectee: NamedNode); } -# Needs declaration of Cluster::Event type. @load base/bif/cluster.bif @load base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek @@ -534,27 +392,6 @@ event Cluster::hello(name: string, id: string) &priority=10 add active_node_ids[n$node_type][id]; } -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - if ( ! Cluster::is_enabled() ) - return; - - local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); - Broker::publish(nodeid_topic(endpoint$id), e); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - for ( node_name, n in nodes ) - { - if ( n?$id && n$id == endpoint$id ) - { - event Cluster::node_down(node_name, endpoint$id); - break; - } - } - } - event node_down(name: string, id: string) &priority=10 { local found = F; @@ -674,16 +511,6 @@ function init(): bool return Cluster::Backend::__init(Cluster::node_id()); } -function subscribe(topic: string): bool - { - return Cluster::__subscribe(topic); - } - -function unsubscribe(topic: string): bool - { - return Cluster::__unsubscribe(topic); - } - function listen_websocket(options: WebSocketServerOptions): bool { return Cluster::__listen_websocket(options); diff --git a/scripts/base/frameworks/cluster/pools.zeek b/scripts/base/frameworks/cluster/pools.zeek index 418706b554..effb491d10 100644 --- a/scripts/base/frameworks/cluster/pools.zeek +++ b/scripts/base/frameworks/cluster/pools.zeek @@ -346,7 +346,7 @@ function pool_sorter(a: Pool, b: Pool): int return strcmp(a$spec$topic, b$spec$topic); } -# Needs to execute before the zeek_init in setup-connections +# Needs to execute before the zeek_init in setup-subscriptions event zeek_init() &priority=-5 { if ( ! Cluster::is_enabled() ) diff --git a/scripts/base/frameworks/cluster/pubsub.zeek b/scripts/base/frameworks/cluster/pubsub.zeek new file mode 100644 index 0000000000..591a80f986 --- /dev/null +++ b/scripts/base/frameworks/cluster/pubsub.zeek @@ -0,0 +1,49 @@ +# A script that can be loaded by other scripts to access the publish subscribe API. + +@load ./types + +module Cluster; + +export { + ## Subscribe to the given topic. + ## + ## topic: The topic to subscribe to. + ## + ## Returns: T on success, else F. + global subscribe: function(topic: string): bool; + + ## Unsubscribe from the given topic. + ## + ## topic: The topic to unsubscribe from. + ## + ## Returns: T on success, else F. + global unsubscribe: function(topic: string): bool; + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_subscribe: hook(topic: string); + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_unsubscribe: hook(topic: string); +} + +# base/bif/cluster.bif.zeek generated from from src/cluster/cluster.bif contains the +# Cluster::publish(), Cluster::publish_hrw() and Cluster::publish_rr() APIs +@load base/bif/cluster.bif + +function subscribe(topic: string): bool + { + return Cluster::__subscribe(topic); + } + +function unsubscribe(topic: string): bool + { + return Cluster::__unsubscribe(topic); + } diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index 84c2a7d2fe..ef373ecf47 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -1,156 +1,3 @@ -##! This script establishes communication among all nodes in a cluster -##! as defined by :zeek:id:`Cluster::nodes`. +@deprecated "Remove in v9.1: Load base/frameworks/cluster/setup-subscriptions instead" -@load ./main -@load ./pools -@load base/frameworks/broker - -module Cluster; - -export { - ## This hook is called when the local node connects to other nodes based on - ## the given cluster layout. Breaking from the hook will prevent connection - ## establishment. - ## - ## connectee: The node to connect to. - global connect_node_hook: hook(connectee: NamedNode); -} - -function connect_peer(node_type: NodeType, node_name: string) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( n$name != node_name ) - next; - if ( ! hook connect_node_hook(n) ) - return; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - return; - } - - Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); - } - -function connect_peers_with_type(node_type: NodeType) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( ! hook connect_node_hook(n) ) - next; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - } - } - -event zeek_init() &priority=-10 - { - if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) - return; - - local self = nodes[node]; - - for ( i in registered_pools ) - { - local pool = registered_pools[i]; - - if ( node in pool$nodes ) - Cluster::subscribe(pool$spec$topic); - } - - switch ( self$node_type ) { - case NONE: - return; - case CONTROL: - break; - case LOGGER: - Cluster::subscribe(Cluster::logger_topic); - break; - case MANAGER: - Cluster::subscribe(Cluster::manager_topic); - break; - case PROXY: - Cluster::subscribe(Cluster::proxy_topic); - break; - case WORKER: - Cluster::subscribe(Cluster::worker_topic); - break; - default: - Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); - return; - } - - Cluster::subscribe(nodeid_topic(Cluster::node_id())); - Cluster::subscribe(node_topic(node)); - - - # Listening and connecting to other peers is broker specific, - # short circuit if Zeek is configured with a different - # cluster backend. - # - # In the future, this could move into a policy script, but - # for the time being it's easier for backwards compatibility - # to keep this here. - if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) - return; - - # Logging setup: Anything handling logging additionally subscribes - # to Broker::default_log_topic_prefix. - switch ( self$node_type ) { - case LOGGER: - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - case MANAGER: - if ( Cluster::manager_is_logger ) - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - } - - if ( self$p != 0/unknown ) - { - Broker::listen(Broker::default_listen_address, - self$p, - Broker::default_listen_retry); - - Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); - } - - - switch ( self$node_type ) { - case MANAGER: - connect_peers_with_type(LOGGER); - - break; - case PROXY: - connect_peers_with_type(LOGGER); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - case WORKER: - connect_peers_with_type(LOGGER); - connect_peers_with_type(PROXY); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - } - } +@load ./setup-subscriptions diff --git a/scripts/base/frameworks/cluster/setup-subscriptions.zeek b/scripts/base/frameworks/cluster/setup-subscriptions.zeek new file mode 100644 index 0000000000..9aef5f302b --- /dev/null +++ b/scripts/base/frameworks/cluster/setup-subscriptions.zeek @@ -0,0 +1,46 @@ +##! This script contains the common subscription setup logic. +@load ./main +@load ./pools + +module Cluster; + +event zeek_init() &priority=-5 + { + if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) + return; + + local self = nodes[node]; + + for ( i in registered_pools ) + { + local pool = registered_pools[i]; + + if ( node in pool$nodes ) + Cluster::subscribe(pool$spec$topic); + } + + switch ( self$node_type ) { + case NONE: + return; + case CONTROL: + break; + case LOGGER: + Cluster::subscribe(Cluster::logger_topic); + break; + case MANAGER: + Cluster::subscribe(Cluster::manager_topic); + break; + case PROXY: + Cluster::subscribe(Cluster::proxy_topic); + break; + case WORKER: + Cluster::subscribe(Cluster::worker_topic); + break; + default: + Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); + return; + } + + Cluster::subscribe(nodeid_topic(Cluster::node_id())); + Cluster::subscribe(node_topic(node)); + } diff --git a/scripts/base/frameworks/cluster/types.zeek b/scripts/base/frameworks/cluster/types.zeek new file mode 100644 index 0000000000..f3243a93c6 --- /dev/null +++ b/scripts/base/frameworks/cluster/types.zeek @@ -0,0 +1,128 @@ +# Types used by the Cluster framework. +module Cluster; + +export { + ## Types of nodes that are allowed to participate in the cluster + ## configuration. + type NodeType: enum { + ## A dummy node type indicating the local node is not operating + ## within a cluster. + NONE, + ## A node type which is allowed to view/manipulate the configuration + ## of other nodes in the cluster. + CONTROL, + ## A node type responsible for log management. + LOGGER, + ## A node type responsible for policy management. + MANAGER, + ## A node type for relaying worker node communication and synchronizing + ## worker node state. + PROXY, + ## The node type doing all the actual traffic analysis. + WORKER, + }; + + ## Record type to indicate a node in a cluster. + type Node: record { + ## Identifies the type of cluster node in this node's configuration. + node_type: NodeType; + ## The IP address of the cluster node. + ip: addr; + ## If the *ip* field is a non-global IPv6 address, this field + ## can specify a particular :rfc:`4007` ``zone_id``. + zone_id: string &default=""; + ## The port that this node will listen on for peer connections. + ## A value of ``0/unknown`` means the node is not pre-configured to listen. + p: port &default=0/unknown; + ## Name of the manager node this node uses. For workers and proxies. + manager: string &optional; + ## A unique identifier assigned to the node by the broker framework. + ## This field is only set while a node is connected. + id: string &optional; + ## The port used to expose metrics to Prometheus. Setting this in a cluster + ## configuration will override the setting for Telemetry::metrics_port for + ## the node. + metrics_port: port &optional; + }; + + ## Record to represent a cluster node including its name. + type NamedNode: record { + name: string; + node: Node; + }; + + ## An event instance for cluster pub/sub. + ## + ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. + type Event: record { + ## The event handler to be invoked on the remote node. + ev: any; + ## The arguments for the event. + args: vector of any; + }; + + ## The default maximum queue size for WebSocket event dispatcher instances. + ## + ## If the maximum queue size is reached, events from external WebSocket + ## clients will be stalled and processed once the queue has been drained. + ## + ## An internal metric named ``cluster_onloop_queue_stalls`` and + ## labeled with a ``WebSocketEventDispatcher::`` tag + ## is incremented when the maximum queue size is reached. + const default_websocket_max_event_queue_size = 32 &redef; + + ## The default ping interval for WebSocket clients. + const default_websocket_ping_interval = 5 sec &redef; + + ## The TLS options for a WebSocket server. + ## + ## If cert_file and key_file are set, TLS is enabled. If both + ## are unset, TLS is disabled. Any other combination is an error. + type WebSocketTLSOptions: record { + ## The cert file to use. + cert_file: string &optional; + ## The key file to use. + key_file: string &optional; + ## Expect peers to send client certificates. + enable_peer_verification: bool &default=F; + ## The CA certificate or CA bundle used for peer verification. + ## Empty will use the implementations's default when + ## ``enable_peer_verification`` is T. + ca_file: string &default=""; + ## The ciphers to use. Empty will use the implementation's defaults. + ciphers: string &default=""; + }; + + ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. + type WebSocketServerOptions: record { + ## The address to listen on, cannot be used together with ``listen_host``. + listen_addr: addr &optional; + ## The port the WebSocket server is supposed to listen on. + listen_port: port; + ## The maximum event queue size for this server. + max_event_queue_size: count &default=default_websocket_max_event_queue_size; + ## Ping interval to use. A WebSocket client not responding to + ## the pings will be disconnected. Set to a negative value to + ## disable pings. Subsecond intervals are currently not supported. + ping_interval: interval &default=default_websocket_ping_interval; + ## The TLS options used for this WebSocket server. By default, + ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. + tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); + }; + + ## Network information of an endpoint. + type NetworkInfo: record { + ## The IP address or hostname where the endpoint listens. + address: string; + ## The port where the endpoint is bound to. + bound_port: port; + }; + + ## Information about a WebSocket endpoint. + type EndpointInfo: record { + id: string; + network: NetworkInfo; + ## The value of the X-Application-Name HTTP header, if any. + application_name: string &optional; + }; +} diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index fbc7cf1009..2b6f0ed86a 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -1,6 +1,8 @@ ##! Implements Zeek process supervision API and default behavior for its ##! associated (remote) control events. +@load base/frameworks/cluster/pubsub + @load ./api @load ./control @@ -49,7 +51,7 @@ event zeek_init() &priority=10 Broker::listen(); } - Broker::subscribe(SupervisorControl::topic_prefix); + Cluster::subscribe(SupervisorControl::topic_prefix); } event SupervisorControl::stop_request() @@ -67,7 +69,7 @@ event SupervisorControl::status_request(reqid: string, node: string) local res = Supervisor::status(node); local topic = SupervisorControl::topic_prefix + fmt("/status_response/%s", reqid); - Broker::publish(topic, SupervisorControl::status_response, reqid, res); + Cluster::publish(topic, SupervisorControl::status_response, reqid, res); } event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeConfig) @@ -77,7 +79,7 @@ event SupervisorControl::create_request(reqid: string, node: Supervisor::NodeCon local res = Supervisor::create(node); local topic = SupervisorControl::topic_prefix + fmt("/create_response/%s", reqid); - Broker::publish(topic, SupervisorControl::create_response, reqid, res); + Cluster::publish(topic, SupervisorControl::create_response, reqid, res); } event SupervisorControl::destroy_request(reqid: string, node: string) @@ -87,7 +89,7 @@ event SupervisorControl::destroy_request(reqid: string, node: string) local res = Supervisor::destroy(node); local topic = SupervisorControl::topic_prefix + fmt("/destroy_response/%s", reqid); - Broker::publish(topic, SupervisorControl::destroy_response, reqid, res); + Cluster::publish(topic, SupervisorControl::destroy_response, reqid, res); } event SupervisorControl::restart_request(reqid: string, node: string) @@ -97,7 +99,7 @@ event SupervisorControl::restart_request(reqid: string, node: string) local res = Supervisor::restart(node); local topic = SupervisorControl::topic_prefix + fmt("/restart_response/%s", reqid); - Broker::publish(topic, SupervisorControl::restart_response, reqid, res); + Cluster::publish(topic, SupervisorControl::restart_response, reqid, res); } event Supervisor::node_status(node: string, pid: count) @@ -106,5 +108,5 @@ event Supervisor::node_status(node: string, pid: count) return; local topic = SupervisorControl::topic_prefix + "/node_status"; - Broker::publish(topic, SupervisorControl::node_status, node, pid); + Cluster::publish(topic, SupervisorControl::node_status, node, pid); } diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 07aeece67d..01f899777e 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -6011,8 +6011,8 @@ module Cluster; export { type Cluster::Pool: record {}; - ## Cluster backend to use. Default is the broker backend. - const backend = Cluster::CLUSTER_BACKEND_BROKER &redef; + ## Cluster backend to use. Default is the None backend. + const backend = Cluster::CLUSTER_BACKEND_NONE &redef; ## The event serializer to use by the cluster backend. ## diff --git a/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek new file mode 100644 index 0000000000..afbc86f258 --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek @@ -0,0 +1,4 @@ +@load ./main + +@load ./backpressure +@load ./telemetry diff --git a/scripts/base/frameworks/cluster/broker-backpressure.zeek b/scripts/policy/frameworks/cluster/backend/broker/backpressure.zeek similarity index 100% rename from scripts/base/frameworks/cluster/broker-backpressure.zeek rename to scripts/policy/frameworks/cluster/backend/broker/backpressure.zeek diff --git a/scripts/policy/frameworks/cluster/backend/broker/main.zeek b/scripts/policy/frameworks/cluster/backend/broker/main.zeek new file mode 100644 index 0000000000..91da149856 --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/main.zeek @@ -0,0 +1,152 @@ +##! Broker cluster backend support. +##! +##! The Broker cluster backend is a peer-to-peer backend that has been +##! in use since Bro 2.6 until Zeek 8.1. Individual Zeek cluster nodes +##! peer with each other using a fixed connection strategy using the information +##! stored in :zeek:see:`Cluster::nodes` populated by the cluster-layout.zeek +##! file or via supervisor internal functionality. +##! +##! Conceptually: +##! +##! * All nodes peer with all logger nodes +##! * All worker nodes peer with all proxy nodes and the manager node +##! * All proxy nodes peer with the manager +##! +##! This implies that logger, manager and proxy nodes are all listening +##! on the ports defined in the cluster layout. +##! +##! Note that publish-subscribe visibility with Broker is limited to nodes +##! that are directly peered. A worker publishing a message to a topic another +##! worker node is subscribed to will not be visible by the other worker. + +module Cluster; + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; + +function connect_peer(node_type: NodeType, node_name: string) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( n$name != node_name ) + next; + if ( ! hook connect_node_hook(n) ) + return; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + return; + } + + Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); + } + +function connect_peers_with_type(node_type: NodeType) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( ! hook connect_node_hook(n) ) + next; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + } + } + +# Whenever a node adds a Broker peer, it sends Cluster::hello() identifying +# itself to the peer. The other peer then raises Cluster::node_up(), upon +# seeing the Cluster::hello() +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + if ( ! Cluster::is_enabled() ) + return; + + local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); + Broker::publish(nodeid_topic(endpoint$id), e); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + for ( node_name, n in nodes ) + { + if ( n?$id && n$id == endpoint$id ) + { + event Cluster::node_down(node_name, endpoint$id); + break; + } + } + } + +# The event handler setting up subscriptions has priority -5. It runs +# before this handler. Priority -10 also means that a user can fiddle +# with the cluster-layout in zeek_init() for testing. +event zeek_init() &priority=-10 + { + if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) + return; + + if ( ! Cluster::is_enabled() ) + return; + + if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) + return; + + local self = Cluster::nodes[Cluster::node]; + + # Logging setup: Anything handling logging additionally subscribes + # to Broker::default_log_topic_prefix. + switch ( self$node_type ) { + case LOGGER: + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + case MANAGER: + if ( Cluster::manager_is_logger ) + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + } + + if ( self$p != 0/unknown ) + { + Broker::listen(Broker::default_listen_address, + self$p, + Broker::default_listen_retry); + + Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); + } + + + switch ( self$node_type ) { + case MANAGER: + connect_peers_with_type(LOGGER); + + break; + case PROXY: + connect_peers_with_type(LOGGER); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + case WORKER: + connect_peers_with_type(LOGGER); + connect_peers_with_type(PROXY); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + } + } diff --git a/scripts/base/frameworks/cluster/broker-telemetry.zeek b/scripts/policy/frameworks/cluster/backend/broker/telemetry.zeek similarity index 100% rename from scripts/base/frameworks/cluster/broker-telemetry.zeek rename to scripts/policy/frameworks/cluster/backend/broker/telemetry.zeek diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 3292921e86..ba016a0de4 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -14,6 +14,10 @@ @load frameworks/analyzer/packet-segment-logging.zeek # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/backend/broker/__load__.zeek +@load frameworks/cluster/backend/broker/backpressure.zeek +@load frameworks/cluster/backend/broker/main.zeek +@load frameworks/cluster/backend/broker/telemetry.zeek @ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ ) @load frameworks/cluster/backend/zeromq/__load__.zeek # @load frameworks/cluster/backend/zeromq/connect.zeek diff --git a/src/cluster/backend/CMakeLists.txt b/src/cluster/backend/CMakeLists.txt index c5a6d1b4a2..c312395c16 100644 --- a/src/cluster/backend/CMakeLists.txt +++ b/src/cluster/backend/CMakeLists.txt @@ -17,3 +17,5 @@ if (ENABLE_CLUSTER_BACKEND_ZEROMQ) add_subdirectory(zeromq) endif () + +add_subdirectory(none) diff --git a/src/cluster/backend/none/CMakeLists.txt b/src/cluster/backend/none/CMakeLists.txt new file mode 100644 index 0000000000..60b3dff0cb --- /dev/null +++ b/src/cluster/backend/none/CMakeLists.txt @@ -0,0 +1,3 @@ +zeek_add_plugin( + Zeek Cluster_Backend_None + SOURCES Plugin.cc) diff --git a/src/cluster/backend/none/Plugin.cc b/src/cluster/backend/none/Plugin.cc new file mode 100644 index 0000000000..2ee9d1d051 --- /dev/null +++ b/src/cluster/backend/none/Plugin.cc @@ -0,0 +1,57 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/plugin/Plugin.h" + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Component.h" +#include "zeek/cluster/Serializer.h" + +namespace { + +using namespace zeek::cluster; + +/** + * A backend that does nothing. + */ +class NoneBackend : public Backend { +private: + bool DoInit() override { return true; }; + void DoTerminate() override {}; + void DoInitPostScript() override {}; + bool DoPublishLogWrites(const zeek::logging::detail::LogWriteHeader& header, const std::string& format, + zeek::byte_buffer& buf) override { + return true; + } + bool DoPublishEvent(const std::string& topic, const std::string& format, const zeek::byte_buffer& buf) override { + return true; + }; + bool DoSubscribe(const std::string& topic_prefix, SubscribeCallback cb) override { return true; }; + bool DoUnsubscribe(const std::string& topic_prefix) override { return true; }; + +public: + NoneBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) + : Backend("None", std::move(es), std::move(ls), std::move(ehs)) {} + + static std::unique_ptr Instantiate(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) { + return std::make_unique(std::move(es), std::move(ls), std::move(ehs)); + } +}; +} // namespace + + +namespace zeek::plugin::Zeek_Cluster_None { + +class Plugin : public zeek::plugin::Plugin { + zeek::plugin::Configuration Configure() override { + AddComponent(new cluster::BackendComponent("None", NoneBackend::Instantiate)); + + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_Backend_None"; + config.description = "Cluster backend none"; + return config; + } +} plugin; + +} // namespace zeek::plugin::Zeek_Cluster_None diff --git a/testing/btest/Baseline/broker.remote_event/recv.recv.out b/testing/btest/Baseline/broker.remote_event/recv.recv.out index c13ab4ea41..fcd1b41ad2 100644 --- a/testing/btest/Baseline/broker.remote_event/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event/recv.recv.out @@ -10,4 +10,3 @@ is_remote should be T, and is, T receiver got ping: my-message, 4 is_remote should be T, and is, T receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/Baseline/broker.remote_event_any/recv.recv.out b/testing/btest/Baseline/broker.remote_event_any/recv.recv.out index c13ab4ea41..fcd1b41ad2 100644 --- a/testing/btest/Baseline/broker.remote_event_any/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event_any/recv.recv.out @@ -10,4 +10,3 @@ is_remote should be T, and is, T receiver got ping: my-message, 4 is_remote should be T, and is, T receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out b/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out index 7a0dc495f2..24ee373fe9 100644 --- a/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_event_ssl_auth/recv.recv.out @@ -5,4 +5,3 @@ receiver got ping: my-message, 2 receiver got ping: my-message, 3 receiver got ping: my-message, 4 receiver got ping: my-message, 5 -[num_peers=1, num_stores=0, num_pending_queries=0, num_events_incoming=5, num_events_outgoing=4, num_logs_incoming=0, num_logs_outgoing=1, num_ids_incoming=0, num_ids_outgoing=0] diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 92ae304ef1..bb8a7653d4 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,7 +139,6 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index b1bb951e92..5ed62ac0af 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,2 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. +warning in <...>/setup-connections.zeek, line 1: deprecated script loaded from command line arguments "Remove in v9.1: Load base<...>/setup-subscriptions instead" diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 8d2b58f0e1..bdc4807cdf 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -123,6 +123,9 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/supervisor/__load__.zeek scripts/base/frameworks/supervisor/control.zeek scripts/base/frameworks/supervisor/main.zeek + scripts/base/frameworks/cluster/pubsub.zeek + scripts/base/frameworks/cluster/types.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/input/__load__.zeek scripts/base/frameworks/input/main.zeek build/scripts/base/bif/input.bif.zeek @@ -136,7 +139,6 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 93af34614e..711e65bea1 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,12 +1,11 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. --./frameworks/cluster/broker-backpressure.zeek -./frameworks/cluster/broker-stores.zeek --./frameworks/cluster/broker-telemetry.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek -./frameworks/cluster/nodes/worker.zeek -./frameworks/cluster/setup-connections.zeek +-./frameworks/cluster/setup-subscriptions.zeek -./frameworks/cluster/supervisor.zeek -./frameworks/intel/cluster.zeek -./frameworks/netcontrol/cluster.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 8697db08b7..560b182567 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -18,13 +18,13 @@ 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) -> 0.000000 MetaHookPost CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -> -0.000000 MetaHookPost CallFunction(Broker::__subscribe, , (zeek/supervisor)) -> -0.000000 MetaHookPost CallFunction(Broker::subscribe, , (zeek/supervisor)) -> +0.000000 MetaHookPost CallFunction(Cluster::__subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::is_enabled, , ()) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) -> 0.000000 MetaHookPost CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) -> +0.000000 MetaHookPost CallFunction(Cluster::subscribe, , (zeek/supervisor)) -> 0.000000 MetaHookPost CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) -> 0.000000 MetaHookPost CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) -> 0.000000 MetaHookPost CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) -> @@ -477,6 +477,7 @@ 0.000000 MetaHookPost LoadFile(0, ./polling, <...>/polling.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./pools, <...>/pools.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./postprocessors, <...>/postprocessors) -> -1 +0.000000 MetaHookPost LoadFile(0, ./pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./scp, <...>/scp.zeek) -> -1 @@ -582,6 +583,7 @@ 0.000000 MetaHookPost LoadFile(0, base<...>/ppp, <...>/ppp) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/pppoe, <...>/pppoe) -> -1 +0.000000 MetaHookPost LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, base<...>/root, <...>/root) -> -1 @@ -793,6 +795,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./polling, <...>/polling.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./pools, <...>/pools.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./postprocessors, <...>/postprocessors) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./scp, <...>/scp.zeek) -> (-1, ) @@ -898,6 +901,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp, <...>/ppp) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, base<...>/root, <...>/root) -> (-1, ) @@ -960,13 +964,13 @@ 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 81/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_port, , (Analyzer::ANALYZER_HTTP, 8888/tcp)) 0.000000 MetaHookPre CallFunction(Analyzer::register_for_ports, , (Analyzer::ANALYZER_HTTP, {80<...>/tcp})) -0.000000 MetaHookPre CallFunction(Broker::__subscribe, , (zeek/supervisor)) -0.000000 MetaHookPre CallFunction(Broker::subscribe, , (zeek/supervisor)) +0.000000 MetaHookPre CallFunction(Cluster::__subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::is_enabled, , ()) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F])) 0.000000 MetaHookPre CallFunction(Cluster::register_pool, , ([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F])) +0.000000 MetaHookPre CallFunction(Cluster::subscribe, , (zeek/supervisor)) 0.000000 MetaHookPre CallFunction(Config::config_option_changed, , (Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, )) 0.000000 MetaHookPre CallFunction(Files::register_protocol, , (Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}])) 0.000000 MetaHookPre CallFunction(Log::__add_filter, , (Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=])) @@ -1419,6 +1423,7 @@ 0.000000 MetaHookPre LoadFile(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFile(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFile(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFile(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./scp, <...>/scp.zeek) @@ -1524,6 +1529,7 @@ 0.000000 MetaHookPre LoadFile(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFile(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFile(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFile(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, base<...>/root, <...>/root) @@ -1735,6 +1741,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./scp, <...>/scp.zeek) @@ -1840,6 +1847,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp, <...>/ppp) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/ppp_serial, <...>/ppp_serial) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/pppoe, <...>/pppoe) +0.000000 MetaHookPre LoadFileExtended(0, base<...>/pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/reporter.bif, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, base<...>/root, <...>/root) @@ -1902,12 +1910,12 @@ 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 81/tcp) 0.000000 | HookCallFunction Analyzer::register_for_port(Analyzer::ANALYZER_HTTP, 8888/tcp) 0.000000 | HookCallFunction Analyzer::register_for_ports(Analyzer::ANALYZER_HTTP, {80<...>/tcp}) -0.000000 | HookCallFunction Broker::__subscribe(zeek/supervisor) -0.000000 | HookCallFunction Broker::subscribe(zeek/supervisor) +0.000000 | HookCallFunction Cluster::__subscribe(zeek/supervisor) 0.000000 | HookCallFunction Cluster::is_enabled() 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/logger, node_type=Cluster::LOGGER, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/proxy, node_type=Cluster::PROXY, max_nodes=, exclusive=F]) 0.000000 | HookCallFunction Cluster::register_pool([topic=zeek<...>/worker, node_type=Cluster::WORKER, max_nodes=, exclusive=F]) +0.000000 | HookCallFunction Cluster::subscribe(zeek/supervisor) 0.000000 | HookCallFunction Config::config_option_changed(Site::local_nets, {64:ff9b:1::<...>/15,fc00::<...>/10,::/128,2002:ffff:ffff::/48,::1/128,fec0::/10,2002:cb00:7100::/40,2002:c633:6400::<...>/4,2002:a00::/24,100::<...>/8,2001:2::<...>/12,2002:c000:200::/40,2002:f000::/20,2002:7f00::/24,2001::/23,2002:6440::/26,2002:c000::<...>/16,2002:ac10::/28,2002:a9fe::<...>/16,2002:c612::/31,2002::/24,fe80::/10,2001:db8::/32,2002:ef00::<...>/24,2002:e000::/40,2002:c0a8::<...>/24}, ) 0.000000 | HookCallFunction Files::register_protocol(Analyzer::ANALYZER_HTTP, [get_file_handle=HTTP::get_file_handle: function(c:connection, is_orig:bool) : string{ if (!HTTP::c?$http) return ()if (HTTP::c$http$range_request && !HTTP::is_orig) { return (cat(Analyzer::ANALYZER_HTTP, HTTP::is_orig, HTTP::c$id$orig_h, HTTP::build_url(HTTP::c$http)))}else{ HTTP::mime_depth = HTTP::is_orig ? HTTP::c$http$orig_mime_depth : HTTP::c$http$resp_mime_depthreturn (cat(Analyzer::ANALYZER_HTTP, HTTP::c$start_time, HTTP::is_orig, HTTP::c$http$trans_depth, HTTP::mime_depth, id_string(HTTP::c$id)))}}, describe=HTTP::describe_file: function(f:fa_file) : string{ if (HTTP::f$source != HTTP) return ()for ([HTTP::_], HTTP::c in HTTP::f$conns) { if (HTTP::c?$http) return (HTTP::build_url_http(HTTP::c$http))}return ()}]) 0.000000 | HookCallFunction Log::__add_filter(Analyzer::Logging::LOG, [name=default, writer=Log::WRITER_ASCII, path=analyzer, path_func=, include=, exclude=, log_local=T, log_remote=T, field_name_map={}, scope_sep=., ext_prefix=_, ext_func=lambda_<4692973652431675528>: function(path:string) : void, interv=0 secs, postprocessor=, config={}, policy=]) @@ -2371,6 +2379,7 @@ 0.000000 | HookLoadFile ./pools <...>/pools.zeek 0.000000 | HookLoadFile ./postprocessors <...>/postprocessors 0.000000 | HookLoadFile ./programming <...>/programming.sig +0.000000 | HookLoadFile ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile ./python <...>/python.sig 0.000000 | HookLoadFile ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile ./reporter.bif.zeek <...>/reporter.bif.zeek @@ -2478,6 +2487,7 @@ 0.000000 | HookLoadFile base<...>/ppp <...>/ppp 0.000000 | HookLoadFile base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFile base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFile base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFile base<...>/root <...>/root @@ -2687,6 +2697,7 @@ 0.000000 | HookLoadFileExtended ./pools <...>/pools.zeek 0.000000 | HookLoadFileExtended ./postprocessors <...>/postprocessors 0.000000 | HookLoadFileExtended ./programming <...>/programming.sig +0.000000 | HookLoadFileExtended ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended ./python <...>/python.sig 0.000000 | HookLoadFileExtended ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended ./reporter.bif.zeek <...>/reporter.bif.zeek @@ -2794,6 +2805,7 @@ 0.000000 | HookLoadFileExtended base<...>/ppp <...>/ppp 0.000000 | HookLoadFileExtended base<...>/ppp_serial <...>/ppp_serial 0.000000 | HookLoadFileExtended base<...>/pppoe <...>/pppoe +0.000000 | HookLoadFileExtended base<...>/pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended base<...>/removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended base<...>/reporter.bif <...>/reporter.bif.zeek 0.000000 | HookLoadFileExtended base<...>/root <...>/root diff --git a/testing/btest/Files/broker/cluster-layout.zeek b/testing/btest/Files/broker/cluster-layout.zeek index f495e8da29..d8cc306386 100644 --- a/testing/btest/Files/broker/cluster-layout.zeek +++ b/testing/btest/Files/broker/cluster-layout.zeek @@ -19,6 +19,12 @@ # to Cluster::nodes. +# Using this testing cluster-layout.zeek switches Zeek to use Broker. +# +# This is side-loaded here to keep all existing tests that rely on +# this Broker testing cluster setup to continue working. +@load frameworks/cluster/backend/broker + # Redef'ed to F if logger-1 or logger-2 are active. redef Cluster::manager_is_logger = T; diff --git a/testing/btest/Files/zeromq/metrics.zeek b/testing/btest/Files/zeromq/metrics.zeek index 0812e4659e..40c26d69f1 100644 --- a/testing/btest/Files/zeromq/metrics.zeek +++ b/testing/btest/Files/zeromq/metrics.zeek @@ -1,3 +1,5 @@ +@load base/frameworks/telemetry + module Cluster::Backend::ZeroMQ; export { diff --git a/testing/btest/broker/remote_event.zeek b/testing/btest/broker/remote_event.zeek index de44ae87cd..823c9b06dc 100644 --- a/testing/btest/broker/remote_event.zeek +++ b/testing/btest/broker/remote_event.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -98,10 +96,4 @@ event ping(msg: string, n: count) local e = Broker::make_event(pong, msg, n); Broker::publish("zeek/event/my_topic", e); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_any.zeek b/testing/btest/broker/remote_event_any.zeek index 8bd37a36a5..c578cb7073 100644 --- a/testing/btest/broker/remote_event_any.zeek +++ b/testing/btest/broker/remote_event_any.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -106,10 +104,4 @@ event ping(msg: string, n: any) # internals should not wrap n into another Broker::Data record Broker::publish("zeek/event/my_topic", pong, msg, n); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_ssl_auth.zeek b/testing/btest/broker/remote_event_ssl_auth.zeek index cc4b052607..b8cd97a806 100644 --- a/testing/btest/broker/remote_event_ssl_auth.zeek +++ b/testing/btest/broker/remote_event_ssl_auth.zeek @@ -1,7 +1,5 @@ # @TEST-GROUP: broker # -# @TEST-REQUIRES: $SCRIPTS/have-spicy # The logging of Broker's `num_logs_outgoing` depends on whether the Spicy plugin is loaded or not. -# # @TEST-PORT: BROKER_PORT # # @TEST-EXEC: btest-bg-run recv "zeek -b ../recv.zeek >recv.out" @@ -235,10 +233,4 @@ event ping(msg: string, n: count) local e = Broker::make_event(pong, msg, n); Broker::publish("zeek/event/my_topic", e); } - -event zeek_done() - { - print get_broker_stats(); - } - # @TEST-END-FILE diff --git a/testing/btest/broker/remote_event_ts_compat.zeek b/testing/btest/broker/remote_event_ts_compat.zeek index 36a00a74c9..ec71d07272 100644 --- a/testing/btest/broker/remote_event_ts_compat.zeek +++ b/testing/btest/broker/remote_event_ts_compat.zeek @@ -7,8 +7,7 @@ # @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check # # @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output" -# Leave room for Zeek to start up, which can be slow when using -O ZAM -# @TEST-EXEC: sleep 15 +# @TEST-EXEC: wait-for-file server/ready 20 # @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output" # # @TEST-EXEC: btest-bg-wait 45 @@ -29,6 +28,8 @@ event zeek_init() Broker::subscribe(getenv("TOPIC")); Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); set_network_time(double_to_time(42.0)); + + system("touch ready"); } event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) diff --git a/testing/btest/broker/remote_log.zeek b/testing/btest/broker/remote_log.zeek index 9a030b2d9e..c3f0cc9303 100644 --- a/testing/btest/broker/remote_log.zeek +++ b/testing/btest/broker/remote_log.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_batch.zeek b/testing/btest/broker/remote_log_batch.zeek index d0f3b44ce8..f8b2694bb2 100644 --- a/testing/btest/broker/remote_log_batch.zeek +++ b/testing/btest/broker/remote_log_batch.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_late_join.zeek b/testing/btest/broker/remote_log_late_join.zeek index bc5660f424..294605385e 100644 --- a/testing/btest/broker/remote_log_late_join.zeek +++ b/testing/btest/broker/remote_log_late_join.zeek @@ -13,6 +13,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; module Test; diff --git a/testing/btest/broker/remote_log_types.zeek b/testing/btest/broker/remote_log_types.zeek index cb4e74251c..1817fe4cf6 100644 --- a/testing/btest/broker/remote_log_types.zeek +++ b/testing/btest/broker/remote_log_types.zeek @@ -16,6 +16,9 @@ # @TEST-START-FILE common.zeek +# Relies on Cluster::backend being properly set for logging. +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; global quit_receiver: event(); diff --git a/testing/btest/cluster/generic/cluster-publish-errors.zeek b/testing/btest/cluster/generic/cluster-publish-errors.zeek index 5e106012a0..25cc5cd733 100644 --- a/testing/btest/cluster/generic/cluster-publish-errors.zeek +++ b/testing/btest/cluster/generic/cluster-publish-errors.zeek @@ -1,7 +1,7 @@ # @TEST-DOC: Test errors of cluster bifs # # @TEST-EXEC: zeek --parse-only -b %INPUT -# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: zeek -b %INPUT frameworks/cluster/backend/broker # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek index 787f02671a..1a057ec983 100644 --- a/testing/btest/cluster/websocket/terminate-while-queuing.zeek +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -19,6 +19,9 @@ # @TEST-EXEC: btest-diff ./client/.stderr # @TEST-START-FILE manager.zeek + +@load frameworks/cluster/backend/broker + redef exit_only_after_terminate = T; # Force dispatcher queue being full quickly! diff --git a/testing/btest/core/suspend_processing/websocket.zeek b/testing/btest/core/suspend_processing/websocket.zeek index 631d57dd95..4201014673 100644 --- a/testing/btest/core/suspend_processing/websocket.zeek +++ b/testing/btest/core/suspend_processing/websocket.zeek @@ -15,6 +15,8 @@ # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff worker/.stdout # @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff client/.stdout +@load frameworks/cluster/backend/broker + global my_new_connection: event(uid: string, c: count); event zeek_init() diff --git a/testing/btest/plugins/publish-event-hook.zeek b/testing/btest/plugins/publish-event-hook.zeek index 704a1ccc59..c5efda7caa 100644 --- a/testing/btest/plugins/publish-event-hook.zeek +++ b/testing/btest/plugins/publish-event-hook.zeek @@ -4,8 +4,9 @@ # @TEST-EXEC: ZEEK_PLUGIN_PATH=`pwd` zeek -b Demo::PublishEvent %INPUT > output # @TEST-EXEC: btest-diff output -redef allow_network_time_forward = F; +@load frameworks/cluster/backend/broker +redef allow_network_time_forward = F; module App; diff --git a/testing/btest/scripts/base/frameworks/control/configuration_update.zeek b/testing/btest/scripts/base/frameworks/control/configuration_update.zeek index d2507f7ee3..2afe60c0a8 100644 --- a/testing/btest/scripts/base/frameworks/control/configuration_update.zeek +++ b/testing/btest/scripts/base/frameworks/control/configuration_update.zeek @@ -5,6 +5,8 @@ # @TEST-EXEC: btest-bg-wait 30 # @TEST-EXEC: btest-diff controllee/.stdout +@load frameworks/cluster/backend/broker + @load base/frameworks/control const test_var = "ORIGINAL VALUE (this should be printed out first)" &redef; diff --git a/testing/btest/scripts/base/frameworks/control/id_value.zeek b/testing/btest/scripts/base/frameworks/control/id_value.zeek index c12d137c6a..33e8b525cf 100644 --- a/testing/btest/scripts/base/frameworks/control/id_value.zeek +++ b/testing/btest/scripts/base/frameworks/control/id_value.zeek @@ -9,6 +9,8 @@ # @TEST-EXEC: btest-bg-wait 30 # @TEST-EXEC: btest-diff controller/.stdout +@load frameworks/cluster/backend/broker + @load base/frameworks/control # This value shouldn't ever be printed to the controllers stdout. diff --git a/testing/btest/scripts/base/frameworks/control/shutdown.zeek b/testing/btest/scripts/base/frameworks/control/shutdown.zeek index 832ca8a591..3741cf98df 100644 --- a/testing/btest/scripts/base/frameworks/control/shutdown.zeek +++ b/testing/btest/scripts/base/frameworks/control/shutdown.zeek @@ -4,3 +4,5 @@ # @TEST-EXEC: btest-bg-run controller ZEEKPATH=$ZEEKPATH:.. zeek -b %INPUT frameworks/control/controller Control::host=127.0.0.1 Control::host_port=$BROKER_PORT Control::cmd=shutdown # @TEST-EXEC: btest-bg-wait 20 +@load frameworks/cluster/backend/broker + diff --git a/testing/btest/scripts/base/frameworks/logging/length-checking.zeek b/testing/btest/scripts/base/frameworks/logging/length-checking.zeek index 95e966c3d6..cee6e8cc25 100644 --- a/testing/btest/scripts/base/frameworks/logging/length-checking.zeek +++ b/testing/btest/scripts/base/frameworks/logging/length-checking.zeek @@ -17,6 +17,9 @@ # @TEST-START-FILE common.zeek @load base/frameworks/notice/weird +# Ensure logging is done via Broker +@load frameworks/cluster/backend/broker + module Test; # Disable the string and container length filtering. diff --git a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek index fcc9f7e3e9..a762121483 100644 --- a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek +++ b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_manager.zeek @@ -12,6 +12,7 @@ # @TEST-EXEC: btest-diff zeek/worker-1/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away. diff --git a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek index 457d34db11..112559c10c 100644 --- a/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek +++ b/testing/btest/scripts/policy/frameworks/cluster/cluster_started_restart_worker.zeek @@ -13,6 +13,7 @@ # @TEST-EXEC: btest-diff zeek/worker-2/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away. diff --git a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek index 26f049a6e6..c441f0a9e5 100644 --- a/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek +++ b/testing/btest/scripts/policy/frameworks/management/controller/agent-checkin.zeek @@ -11,6 +11,7 @@ # @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-diff zeek/nodes/controller/stdout +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/management/agent @load policy/frameworks/management/controller diff --git a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek index 9b09784e1f..aeef3df959 100644 --- a/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek +++ b/testing/btest/scripts/policy/frameworks/telemetry/prometheus.zeek @@ -55,6 +55,7 @@ for host in $(echo ${services_data} | jq -r '.[0].targets[]' | sort); do done # @TEST-END-FILE +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental @load base/frameworks/telemetry diff --git a/testing/btest/supervisor/config-cluster.zeek b/testing/btest/supervisor/config-cluster.zeek index f551ec5044..ebdaaf7019 100644 --- a/testing/btest/supervisor/config-cluster.zeek +++ b/testing/btest/supervisor/config-cluster.zeek @@ -11,6 +11,7 @@ # @TEST-EXEC: btest-diff zeek/worker-1/stdout # @TEST-EXEC: btest-diff zeek/proxy-1/stdout +@load policy/frameworks/cluster/backend/broker @load policy/frameworks/cluster/experimental # So the supervised node doesn't terminate right away.