cluster: Move Broker connect logic into policy package

Place a package into policy/frameworks/cluster/backend/broker and move
Broker's connection specific logic there.
This commit is contained in:
Arne Welzel 2025-09-25 10:10:17 +02:00
parent 2f15f5ce6a
commit 351bd7b81b
5 changed files with 165 additions and 130 deletions

View file

@ -263,6 +263,15 @@ export {
## ##
## Returns: T on success, else F. ## Returns: T on success, else F.
global listen_websocket: function(options: WebSocketServerOptions): bool; global listen_websocket: function(options: WebSocketServerOptions): bool;
## This hook is called when the local node connects to other nodes based on
## the given cluster layout. Breaking from the hook will prevent connection
## establishment.
##
## This hook only applies to the Broker cluster backend.
##
## connectee: The node to connect to.
global connect_node_hook: hook(connectee: NamedNode);
} }
@load base/bif/cluster.bif @load base/bif/cluster.bif
@ -383,27 +392,6 @@ event Cluster::hello(name: string, id: string) &priority=10
add active_node_ids[n$node_type][id]; add active_node_ids[n$node_type][id];
} }
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Cluster::node_id());
Broker::publish(nodeid_topic(endpoint$id), e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
for ( node_name, n in nodes )
{
if ( n?$id && n$id == endpoint$id )
{
event Cluster::node_down(node_name, endpoint$id);
break;
}
}
}
event node_down(name: string, id: string) &priority=10 event node_down(name: string, id: string) &priority=10
{ {
local found = F; local found = F;

View file

@ -3,63 +3,10 @@
@load ./main @load ./main
@load ./pools @load ./pools
@load base/frameworks/broker
module Cluster; module Cluster;
export { event zeek_init() &priority=-5
## This hook is called when the local node connects to other nodes based on
## the given cluster layout. Breaking from the hook will prevent connection
## establishment.
##
## connectee: The node to connect to.
global connect_node_hook: hook(connectee: NamedNode);
}
function connect_peer(node_type: NodeType, node_name: string)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( n$name != node_name )
next;
if ( ! hook connect_node_hook(n) )
return;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
return;
}
Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type));
}
function connect_peers_with_type(node_type: NodeType)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( ! hook connect_node_hook(n) )
next;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
}
}
event zeek_init() &priority=-10
{ {
if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" )
return; return;
@ -98,59 +45,4 @@ event zeek_init() &priority=-10
Cluster::subscribe(nodeid_topic(Cluster::node_id())); Cluster::subscribe(nodeid_topic(Cluster::node_id()));
Cluster::subscribe(node_topic(node)); Cluster::subscribe(node_topic(node));
# Listening and connecting to other peers is broker specific,
# short circuit if Zeek is configured with a different
# cluster backend.
#
# In the future, this could move into a policy script, but
# for the time being it's easier for backwards compatibility
# to keep this here.
if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER )
return;
# Logging setup: Anything handling logging additionally subscribes
# to Broker::default_log_topic_prefix.
switch ( self$node_type ) {
case LOGGER:
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
case MANAGER:
if ( Cluster::manager_is_logger )
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
}
if ( self$p != 0/unknown )
{
Broker::listen(Broker::default_listen_address,
self$p,
Broker::default_listen_retry);
Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p));
}
switch ( self$node_type ) {
case MANAGER:
connect_peers_with_type(LOGGER);
break;
case PROXY:
connect_peers_with_type(LOGGER);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
case WORKER:
connect_peers_with_type(LOGGER);
connect_peers_with_type(PROXY);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
}
} }

View file

@ -0,0 +1 @@
@load ./main

View file

@ -0,0 +1,152 @@
##! Broker cluster backend support.
##!
##! The Broker cluster backend is a peer-to-peer backend that has been
##! in use since Bro 2.6 until Zeek 8.1. Individual Zeek cluster nodes
##! peer with each other using a fixed connection strategy using the information
##! stored in :zeek:see:`Cluster::nodes` populated by the cluster-layout.zeek
##! file or via supervisor internal functionality.
##!
##! Conceptually:
##!
##! * All nodes peer with all logger nodes
##! * All worker nodes peer with all proxy nodes and the manager node
##! * All proxy nodes peer with the manager
##!
##! This implies that logger, manager and proxy nodes are all listening
##! on the ports defined in the cluster layout.
##!
##! Note that publish-subscribe visibility with Broker is limited to nodes
##! that are directly peered. A worker publishing a message to a topic another
##! worker node is subscribed to will not be visible by the other worker.
module Cluster;
redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER;
function connect_peer(node_type: NodeType, node_name: string)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( n$name != node_name )
next;
if ( ! hook connect_node_hook(n) )
return;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
return;
}
Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type));
}
function connect_peers_with_type(node_type: NodeType)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( ! hook connect_node_hook(n) )
next;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
}
}
# Whenever a node adds a Broker peer, it sends Cluster::hello() identifying
# itself to the peer. The other peer then raises Cluster::node_up(), upon
# seeing the Cluster::hello()
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Cluster::node_id());
Broker::publish(nodeid_topic(endpoint$id), e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
for ( node_name, n in nodes )
{
if ( n?$id && n$id == endpoint$id )
{
event Cluster::node_down(node_name, endpoint$id);
break;
}
}
}
# The event handler setting up subscriptions has priority -5. It runs
# before this handler. Priority -10 also means that a user can fiddle
# with the cluster-layout in zeek_init() for testing.
event zeek_init() &priority=-10
{
if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" )
return;
if ( ! Cluster::is_enabled() )
return;
if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER )
return;
local self = Cluster::nodes[Cluster::node];
# Logging setup: Anything handling logging additionally subscribes
# to Broker::default_log_topic_prefix.
switch ( self$node_type ) {
case LOGGER:
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
case MANAGER:
if ( Cluster::manager_is_logger )
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
}
if ( self$p != 0/unknown )
{
Broker::listen(Broker::default_listen_address,
self$p,
Broker::default_listen_retry);
Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p));
}
switch ( self$node_type ) {
case MANAGER:
connect_peers_with_type(LOGGER);
break;
case PROXY:
connect_peers_with_type(LOGGER);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
case WORKER:
connect_peers_with_type(LOGGER);
connect_peers_with_type(PROXY);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
}
}

View file

@ -14,6 +14,8 @@
@load frameworks/analyzer/packet-segment-logging.zeek @load frameworks/analyzer/packet-segment-logging.zeek
# @load frameworks/control/controllee.zeek # @load frameworks/control/controllee.zeek
# @load frameworks/control/controller.zeek # @load frameworks/control/controller.zeek
@load frameworks/cluster/backend/broker/__load__.zeek
@load frameworks/cluster/backend/broker/main.zeek
@ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ ) @ifdef ( Cluster::CLUSTER_BACKEND_ZEROMQ )
@load frameworks/cluster/backend/zeromq/__load__.zeek @load frameworks/cluster/backend/zeromq/__load__.zeek
# @load frameworks/cluster/backend/zeromq/connect.zeek # @load frameworks/cluster/backend/zeromq/connect.zeek