diff --git a/scripts/policy/frameworks/control/controllee.zeek b/scripts/policy/frameworks/control/controllee.zeek index 0fa7a4495f..4b32996d0c 100644 --- a/scripts/policy/frameworks/control/controllee.zeek +++ b/scripts/policy/frameworks/control/controllee.zeek @@ -14,17 +14,20 @@ module Control; event zeek_init() &priority=-10 { - Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); + if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) + { + Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); - if ( Control::controllee_listen ) - Broker::listen(); + if ( Control::controllee_listen ) + Broker::listen(); + } } event Control::id_value_request(id: string) { local val = lookup_ID(id); - local reply_topic = Control::topic_prefix + "/id_value_response"; - Broker::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); + local reply_topic = Control::topic_prefix + "/id_value_response/" + Cluster::node; + Cluster::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); } event Control::peer_status_request() @@ -44,8 +47,8 @@ event Control::peer_status_request() bpeer$status); } - local topic = Control::topic_prefix + "/peer_status_response"; - Broker::publish(topic, Control::peer_status_response, status); + local topic = Control::topic_prefix + "/peer_status_response/" + Cluster::node; + Cluster::publish(topic, Control::peer_status_response, status); } event Control::net_stats_request() @@ -53,8 +56,8 @@ event Control::net_stats_request() local ns = get_net_stats(); local reply = fmt("%.6f recvd=%d dropped=%d link=%d\n", network_time(), ns$pkts_recvd, ns$pkts_dropped, ns$pkts_link); - local topic = Control::topic_prefix + "/net_stats_response"; - Broker::publish(topic, Control::net_stats_response, reply); + local topic = Control::topic_prefix + "/net_stats_response/" + Cluster::node; + Cluster::publish(topic, Control::net_stats_response, reply); } event Control::configuration_update_request() @@ -66,15 +69,15 @@ event Control::configuration_update_request() # the configuration is going to be updated. This event could be handled # by other scripts if they need to do some ancillary processing if # redef-able consts are modified at runtime. - local topic = Control::topic_prefix + "/configuration_update_response"; - Broker::publish(topic, Control::configuration_update_response); + local topic = Control::topic_prefix + "/configuration_update_response/" + Cluster::node; + Cluster::publish(topic, Control::configuration_update_response); } event Control::shutdown_request() { # Send the acknowledgement event. - local topic = Control::topic_prefix + "/shutdown_response"; - Broker::publish(topic, Control::shutdown_response); + local topic = Control::topic_prefix + "/shutdown_response/" + Cluster::node; + Cluster::publish(topic, Control::shutdown_response); # Schedule the shutdown to let the current event queue flush itself first. schedule 1sec { terminate_event() }; }