From 351bd7b81b1428d3afcf974032324aab7b444aac Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 25 Sep 2025 10:10:17 +0200 Subject: [PATCH] cluster: Move Broker connect logic into policy package Place a package into policy/frameworks/cluster/backend/broker and move Broker's connection specific logic there. --- scripts/base/frameworks/cluster/main.zeek | 30 ++-- .../frameworks/cluster/setup-connections.zeek | 110 +------------ .../cluster/backend/broker/__load__.zeek | 1 + .../cluster/backend/broker/main.zeek | 152 ++++++++++++++++++ scripts/test-all-policy.zeek | 2 + 5 files changed, 165 insertions(+), 130 deletions(-) create mode 100644 scripts/policy/frameworks/cluster/backend/broker/__load__.zeek create mode 100644 scripts/policy/frameworks/cluster/backend/broker/main.zeek diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 8b86188996..f4e2a17808 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -263,6 +263,15 @@ export { ## ## Returns: T on success, else F. global listen_websocket: function(options: WebSocketServerOptions): bool; + + ## This hook is called when the local node connects to other nodes based on + ## the given cluster layout. Breaking from the hook will prevent connection + ## establishment. + ## + ## This hook only applies to the Broker cluster backend. + ## + ## connectee: The node to connect to. + global connect_node_hook: hook(connectee: NamedNode); } @load base/bif/cluster.bif @@ -383,27 +392,6 @@ event Cluster::hello(name: string, id: string) &priority=10 add active_node_ids[n$node_type][id]; } -event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - if ( ! Cluster::is_enabled() ) - return; - - local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); - Broker::publish(nodeid_topic(endpoint$id), e); - } - -event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 - { - for ( node_name, n in nodes ) - { - if ( n?$id && n$id == endpoint$id ) - { - event Cluster::node_down(node_name, endpoint$id); - break; - } - } - } - event node_down(name: string, id: string) &priority=10 { local found = F; diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index 84c2a7d2fe..ea67b5115a 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -3,63 +3,10 @@ @load ./main @load ./pools -@load base/frameworks/broker module Cluster; -export { - ## This hook is called when the local node connects to other nodes based on - ## the given cluster layout. Breaking from the hook will prevent connection - ## establishment. - ## - ## connectee: The node to connect to. - global connect_node_hook: hook(connectee: NamedNode); -} - -function connect_peer(node_type: NodeType, node_name: string) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( n$name != node_name ) - next; - if ( ! hook connect_node_hook(n) ) - return; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - return; - } - - Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); - } - -function connect_peers_with_type(node_type: NodeType) - { - local nn = nodes_with_type(node_type); - - for ( i in nn ) - { - local n = nn[i]; - - if ( ! hook connect_node_hook(n) ) - next; - - local status = Broker::peer(cat(n$node$ip), n$node$p, - Cluster::retry_interval); - Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", - n$node$ip, n$node$p, Cluster::retry_interval, - status)); - } - } - -event zeek_init() &priority=-10 +event zeek_init() &priority=-5 { if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) return; @@ -98,59 +45,4 @@ event zeek_init() &priority=-10 Cluster::subscribe(nodeid_topic(Cluster::node_id())); Cluster::subscribe(node_topic(node)); - - - # Listening and connecting to other peers is broker specific, - # short circuit if Zeek is configured with a different - # cluster backend. - # - # In the future, this could move into a policy script, but - # for the time being it's easier for backwards compatibility - # to keep this here. - if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) - return; - - # Logging setup: Anything handling logging additionally subscribes - # to Broker::default_log_topic_prefix. - switch ( self$node_type ) { - case LOGGER: - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - case MANAGER: - if ( Cluster::manager_is_logger ) - Cluster::subscribe(Broker::default_log_topic_prefix); - break; - } - - if ( self$p != 0/unknown ) - { - Broker::listen(Broker::default_listen_address, - self$p, - Broker::default_listen_retry); - - Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); - } - - - switch ( self$node_type ) { - case MANAGER: - connect_peers_with_type(LOGGER); - - break; - case PROXY: - connect_peers_with_type(LOGGER); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - case WORKER: - connect_peers_with_type(LOGGER); - connect_peers_with_type(PROXY); - - if ( self?$manager ) - connect_peer(MANAGER, self$manager); - - break; - } } diff --git a/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/cluster/backend/broker/main.zeek b/scripts/policy/frameworks/cluster/backend/broker/main.zeek new file mode 100644 index 0000000000..91da149856 --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/broker/main.zeek @@ -0,0 +1,152 @@ +##! Broker cluster backend support. +##! +##! The Broker cluster backend is a peer-to-peer backend that has been +##! in use since Bro 2.6 until Zeek 8.1. Individual Zeek cluster nodes +##! peer with each other using a fixed connection strategy using the information +##! stored in :zeek:see:`Cluster::nodes` populated by the cluster-layout.zeek +##! file or via supervisor internal functionality. +##! +##! Conceptually: +##! +##! * All nodes peer with all logger nodes +##! * All worker nodes peer with all proxy nodes and the manager node +##! * All proxy nodes peer with the manager +##! +##! This implies that logger, manager and proxy nodes are all listening +##! on the ports defined in the cluster layout. +##! +##! Note that publish-subscribe visibility with Broker is limited to nodes +##! that are directly peered. A worker publishing a message to a topic another +##! worker node is subscribed to will not be visible by the other worker. + +module Cluster; + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER; + +function connect_peer(node_type: NodeType, node_name: string) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( n$name != node_name ) + next; + if ( ! hook connect_node_hook(n) ) + return; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + return; + } + + Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type)); + } + +function connect_peers_with_type(node_type: NodeType) + { + local nn = nodes_with_type(node_type); + + for ( i in nn ) + { + local n = nn[i]; + + if ( ! hook connect_node_hook(n) ) + next; + + local status = Broker::peer(cat(n$node$ip), n$node$p, + Cluster::retry_interval); + Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s", + n$node$ip, n$node$p, Cluster::retry_interval, + status)); + } + } + +# Whenever a node adds a Broker peer, it sends Cluster::hello() identifying +# itself to the peer. The other peer then raises Cluster::node_up(), upon +# seeing the Cluster::hello() +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + if ( ! Cluster::is_enabled() ) + return; + + local e = Broker::make_event(Cluster::hello, node, Cluster::node_id()); + Broker::publish(nodeid_topic(endpoint$id), e); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10 + { + for ( node_name, n in nodes ) + { + if ( n?$id && n$id == endpoint$id ) + { + event Cluster::node_down(node_name, endpoint$id); + break; + } + } + } + +# The event handler setting up subscriptions has priority -5. It runs +# before this handler. Priority -10 also means that a user can fiddle +# with the cluster-layout in zeek_init() for testing. +event zeek_init() &priority=-10 + { + if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) + return; + + if ( ! Cluster::is_enabled() ) + return; + + if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) + return; + + local self = Cluster::nodes[Cluster::node]; + + # Logging setup: Anything handling logging additionally subscribes + # to Broker::default_log_topic_prefix. + switch ( self$node_type ) { + case LOGGER: + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + case MANAGER: + if ( Cluster::manager_is_logger ) + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + } + + if ( self$p != 0/unknown ) + { + Broker::listen(Broker::default_listen_address, + self$p, + Broker::default_listen_retry); + + Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p)); + } + + + switch ( self$node_type ) { + case MANAGER: + connect_peers_with_type(LOGGER); + + break; + case PROXY: + connect_peers_with_type(LOGGER); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + case WORKER: + connect_peers_with_type(LOGGER); + connect_peers_with_type(PROXY); + + if ( self?$manager ) + connect_peer(MANAGER, self$manager); + + break; + } + } diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 3292921e86..6f1f5cd879 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -14,6 +14,8 @@ @load frameworks/analyzer/packet-segment-logging.zeek # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/backend/broker/__load__.zeek +@load frameworks/cluster/backend/broker/main.zeek @ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ ) @load frameworks/cluster/backend/zeromq/__load__.zeek # @load frameworks/cluster/backend/zeromq/connect.zeek