experimental/cluster_started: Support fully connected clusters

For ZeroMQ or a fully connected Broker, await node_up events from all
nodes in a cluster instead of relying on broker specific connection
establishment.

Also switch from Broker:: to Cluster::.
This commit is contained in:
Arne Welzel 2025-03-13 13:47:01 +01:00
parent 2fde2f18d6
commit f8ed62cae5
2 changed files with 18 additions and 5 deletions

View file

@ -53,6 +53,19 @@ hook Cluster::connect_node_hook(connectee: Cluster::NamedNode)
add connectees_pending[connectee$name]; add connectees_pending[connectee$name];
} }
event zeek_init()
{
## If global publish subscribe is enabled, every cluster node
## can expect a Cluster::node_up() from other nodes.
if ( Cluster::enable_global_pub_sub )
{
for ( name, _ in Cluster::nodes )
if ( name != Cluster::node )
add connectees_pending[name];
}
}
event Cluster::node_up(name: string, id: string) &priority=-10 event Cluster::node_up(name: string, id: string) &priority=-10
{ {
# Track pending connectees to trigger node_fully_connected, which will be # Track pending connectees to trigger node_fully_connected, which will be
@ -67,9 +80,9 @@ event Cluster::node_up(name: string, id: string) &priority=-10
delete connectees_pending[name]; delete connectees_pending[name];
if ( |connectees_pending| == 0 ) if ( |connectees_pending| == 0 )
{ {
event node_fully_connected(Cluster::node, Broker::node_id(), is_cluster_started); event node_fully_connected(Cluster::node, Cluster::node_id(), is_cluster_started);
Broker::publish(Cluster::manager_topic, node_fully_connected, Cluster::publish(Cluster::manager_topic, node_fully_connected,
Cluster::node, Broker::node_id(), is_cluster_started); Cluster::node, Cluster::node_id(), is_cluster_started);
} }
} }

View file

@ -26,7 +26,7 @@ event node_fully_connected(name: string, id: string, resending: bool)
event cluster_started(); event cluster_started();
for ( topic in Cluster::broadcast_topics ) for ( topic in Cluster::broadcast_topics )
Broker::publish(topic, Cluster::Experimental::cluster_started); Cluster::publish(topic, Cluster::Experimental::cluster_started);
} }
} }
@ -42,7 +42,7 @@ event zeek_init() &priority=-15
# Make sure the manager recognizes itself as ready if no # Make sure the manager recognizes itself as ready if no
# connections have to be initiated. # connections have to be initiated.
if ( |connectees_pending| == 0 ) if ( |connectees_pending| == 0 )
event node_fully_connected(Cluster::node, Broker::node_id(), F); event node_fully_connected(Cluster::node, Cluster::node_id(), F);
} }
event Cluster::node_up(name: string, id: string) event Cluster::node_up(name: string, id: string)