From f8ed62cae5c549c0d7d60fe0f4830c98a2991632 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 13 Mar 2025 13:47:01 +0100 Subject: [PATCH] experimental/cluster_started: Support fully connected clusters For ZeroMQ or a fully connected Broker, await node_up events from all nodes in a cluster instead of relying on broker specific connection establishment. Also switch from Broker:: to Cluster::. --- .../frameworks/cluster/experimental.zeek | 19 ++++++++++++++++--- .../cluster/nodes-experimental/manager.zeek | 4 ++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/scripts/policy/frameworks/cluster/experimental.zeek b/scripts/policy/frameworks/cluster/experimental.zeek index 83e6febdca..ba50199e8b 100644 --- a/scripts/policy/frameworks/cluster/experimental.zeek +++ b/scripts/policy/frameworks/cluster/experimental.zeek @@ -53,6 +53,19 @@ hook Cluster::connect_node_hook(connectee: Cluster::NamedNode) add connectees_pending[connectee$name]; } +event zeek_init() + { + ## If global publish subscribe is enabled, every cluster node + ## can expect a Cluster::node_up() from other nodes. + if ( Cluster::enable_global_pub_sub ) + { + for ( name, _ in Cluster::nodes ) + if ( name != Cluster::node ) + add connectees_pending[name]; + + } + } + event Cluster::node_up(name: string, id: string) &priority=-10 { # Track pending connectees to trigger node_fully_connected, which will be @@ -67,9 +80,9 @@ event Cluster::node_up(name: string, id: string) &priority=-10 delete connectees_pending[name]; if ( |connectees_pending| == 0 ) { - event node_fully_connected(Cluster::node, Broker::node_id(), is_cluster_started); - Broker::publish(Cluster::manager_topic, node_fully_connected, - Cluster::node, Broker::node_id(), is_cluster_started); + event node_fully_connected(Cluster::node, Cluster::node_id(), is_cluster_started); + Cluster::publish(Cluster::manager_topic, node_fully_connected, + Cluster::node, Cluster::node_id(), is_cluster_started); } } diff --git a/scripts/policy/frameworks/cluster/nodes-experimental/manager.zeek b/scripts/policy/frameworks/cluster/nodes-experimental/manager.zeek index 274a15aeea..9b4a33c640 100644 --- a/scripts/policy/frameworks/cluster/nodes-experimental/manager.zeek +++ b/scripts/policy/frameworks/cluster/nodes-experimental/manager.zeek @@ -26,7 +26,7 @@ event node_fully_connected(name: string, id: string, resending: bool) event cluster_started(); for ( topic in Cluster::broadcast_topics ) - Broker::publish(topic, Cluster::Experimental::cluster_started); + Cluster::publish(topic, Cluster::Experimental::cluster_started); } } @@ -42,7 +42,7 @@ event zeek_init() &priority=-15 # Make sure the manager recognizes itself as ready if no # connections have to be initiated. if ( |connectees_pending| == 0 ) - event node_fully_connected(Cluster::node, Broker::node_id(), F); + event node_fully_connected(Cluster::node, Cluster::node_id(), F); } event Cluster::node_up(name: string, id: string)