diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 36a19885d6..d73093a8cf 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -24,6 +24,18 @@ export { ## Whether to distribute log messages among available logging nodes. const enable_round_robin_logging = T &redef; + ## Enable global publish/subscribe functionality. + ## + ## With the Broker cluster backend, by default, nodes of the same + ## type do not peer with each other, resulting in limited publish + ## subscribe functionality. Setting this value to *T* changes the + ## behavior to enable global publish subscribe visibility, where all + ## nodes observe all messages published by other cluster nodes. + ## + ## Non-Broker backends should provide global publish subscribe behavior + ## by default and can redef this to *T* in their policy scripts. + const enable_global_pub_sub = F &redef; + ## The topic name used for exchanging messages that are relevant to ## logger nodes in a cluster. Used with broker-enabled cluster communication. const logger_topic = "zeek/cluster/logger" &redef; diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index 84c2a7d2fe..86313d90c5 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -59,6 +59,36 @@ function connect_peers_with_type(node_type: NodeType) } } +# Connect to all nodes that have the same type. +# +# To limit the number of connections within a cluster, the logic is to +# connect to all nodes of the same type with a name sorted higher than +# this nodes name itself. +# +# With 3 workers, worker-1 connects to worker-2 and worker-3, worker-2 +# connects to worker-3 and worker-3 establishes no extra connections. +function connect_peers_same_type(self_name: string, self_type: Cluster::NodeType) + { + # nnodes is already sorted by name. + local nnodes = nodes_with_type(self_type); + + local idx = -1; + # nnodes.indexOf(self_name) + for ( i, nn in nnodes ) + if ( nn$name == self_name ) + idx = i; + + assert idx >= 0, fmt("%s not in %s", self_name, nnodes); + + # Establish a connection to all nodes at higher indices. + idx += 1; + while ( idx < |nnodes| ) + { + connect_peer(self_type, nnodes[idx]$name); + idx += 1; + } + } + event zeek_init() &priority=-10 { if ( getenv("ZEEKCTL_CHECK_CONFIG") != "" ) @@ -153,4 +183,7 @@ event zeek_init() &priority=-10 break; } + + if ( Cluster::enable_global_pub_sub ) + connect_peers_same_type(node, self$node_type); }