zeek/scripts/base/frameworks/cluster/setup-connections.zeek
2025-03-13 16:34:02 +01:00

189 lines
4.5 KiB
Text

##! This script establishes communication among all nodes in a cluster
##! as defined by :zeek:id:`Cluster::nodes`.
@load ./main
@load ./pools
@load base/frameworks/broker
module Cluster;
export {
## This hook is called when the local node connects to other nodes based on
## the given cluster layout. Breaking from the hook will prevent connection
## establishment.
##
## connectee: The node to connect to.
global connect_node_hook: hook(connectee: NamedNode);
}
function connect_peer(node_type: NodeType, node_name: string)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( n$name != node_name )
next;
if ( ! hook connect_node_hook(n) )
return;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
return;
}
Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type));
}
function connect_peers_with_type(node_type: NodeType)
{
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
if ( ! hook connect_node_hook(n) )
next;
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
}
}
# Connect to all nodes that have the same type.
#
# To limit the number of connections within a cluster, the logic is to
# connect to all nodes of the same type with a name sorted higher than
# this nodes name itself.
#
# With 3 workers, worker-1 connects to worker-2 and worker-3, worker-2
# connects to worker-3 and worker-3 establishes no extra connections.
function connect_peers_same_type(self_name: string, self_type: Cluster::NodeType)
{
# nnodes is already sorted by name.
local nnodes = nodes_with_type(self_type);
local idx = -1;
# nnodes.indexOf(self_name)
for ( i, nn in nnodes )
if ( nn$name == self_name )
idx = i;
assert idx >= 0, fmt("%s not in %s", self_name, nnodes);
# Establish a connection to all nodes at higher indices.
idx += 1;
while ( idx < |nnodes| )
{
connect_peer(self_type, nnodes[idx]$name);
idx += 1;
}
}
event zeek_init() &priority=-10
{
if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" )
return;
local self = nodes[node];
for ( i in registered_pools )
{
local pool = registered_pools[i];
if ( node in pool$nodes )
Cluster::subscribe(pool$spec$topic);
}
switch ( self$node_type ) {
case NONE:
return;
case CONTROL:
break;
case LOGGER:
Cluster::subscribe(Cluster::logger_topic);
break;
case MANAGER:
Cluster::subscribe(Cluster::manager_topic);
break;
case PROXY:
Cluster::subscribe(Cluster::proxy_topic);
break;
case WORKER:
Cluster::subscribe(Cluster::worker_topic);
break;
default:
Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type));
return;
}
Cluster::subscribe(nodeid_topic(Cluster::node_id()));
Cluster::subscribe(node_topic(node));
# Listening and connecting to other peers is broker specific,
# short circuit if Zeek is configured with a different
# cluster backend.
#
# In the future, this could move into a policy script, but
# for the time being it's easier for backwards compatibility
# to keep this here.
if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER )
return;
# Logging setup: Anything handling logging additionally subscribes
# to Broker::default_log_topic_prefix.
switch ( self$node_type ) {
case LOGGER:
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
case MANAGER:
if ( Cluster::manager_is_logger )
Cluster::subscribe(Broker::default_log_topic_prefix);
break;
}
if ( self$p != 0/unknown )
{
Broker::listen(Broker::default_listen_address,
self$p,
Broker::default_listen_retry);
Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p));
}
switch ( self$node_type ) {
case MANAGER:
connect_peers_with_type(LOGGER);
break;
case PROXY:
connect_peers_with_type(LOGGER);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
case WORKER:
connect_peers_with_type(LOGGER);
connect_peers_with_type(PROXY);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
}
if ( Cluster::enable_global_pub_sub )
connect_peers_same_type(node, self$node_type);
}