diff --git a/scripts/base/frameworks/cluster/setup-connections.zeek b/scripts/base/frameworks/cluster/setup-connections.zeek index ba3010129c..453658067f 100644 --- a/scripts/base/frameworks/cluster/setup-connections.zeek +++ b/scripts/base/frameworks/cluster/setup-connections.zeek @@ -69,7 +69,7 @@ event zeek_init() &priority=-10 local pool = registered_pools[i]; if ( node in pool$nodes ) - Broker::subscribe(pool$spec$topic); + Cluster::subscribe(pool$spec$topic); } switch ( self$node_type ) { @@ -78,29 +78,47 @@ event zeek_init() &priority=-10 case CONTROL: break; case LOGGER: - Broker::subscribe(Cluster::logger_topic); - Broker::subscribe(Broker::default_log_topic_prefix); + Cluster::subscribe(Cluster::logger_topic); break; case MANAGER: - Broker::subscribe(Cluster::manager_topic); - - if ( Cluster::manager_is_logger ) - Broker::subscribe(Broker::default_log_topic_prefix); - + Cluster::subscribe(Cluster::manager_topic); break; case PROXY: - Broker::subscribe(Cluster::proxy_topic); + Cluster::subscribe(Cluster::proxy_topic); break; case WORKER: - Broker::subscribe(Cluster::worker_topic); + Cluster::subscribe(Cluster::worker_topic); break; default: Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type)); return; } - Broker::subscribe(nodeid_topic(Broker::node_id())); - Broker::subscribe(node_topic(node)); + Cluster::subscribe(nodeid_topic(Broker::node_id())); + Cluster::subscribe(node_topic(node)); + + + # Listening and connecting to other peers is broker specific, + # short circuit if Zeek is configured with a different + # cluster backend. + # + # In the future, this could move into a policy script, but + # for the time being it's easier for backwards compatibility + # to keep this here. + if ( Cluster::backend != Cluster::CLUSTER_BACKEND_BROKER ) + return; + + # Logging setup: Anything handling logging additionally subscribes + # to Broker::default_log_topic_prefix. + switch ( self$node_type ) { + case LOGGER: + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + case MANAGER: + if ( Cluster::manager_is_logger ) + Cluster::subscribe(Broker::default_log_topic_prefix); + break; + } if ( self$p != 0/unknown ) {