From 3f2fe6fc3d7078442ae507f177277b2224cfbad9 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 17 Jul 2025 17:53:39 +0000 Subject: [PATCH] control: Use Cluster::publish() for replying Switching to ZeroMQ as cluster backend and dabbling with zeekctl and WebSocket, replies didn't arrive due to the usage of Broker::publish() rather than Cluster::publish(). Additionally, add the node name to the topic on which we reply so that the receiver can figure out which node sent the reply. It could've been a separate event parameter, but the topic appears just fine. --- .../policy/frameworks/control/controllee.zeek | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/scripts/policy/frameworks/control/controllee.zeek b/scripts/policy/frameworks/control/controllee.zeek index 0fa7a4495f..4b32996d0c 100644 --- a/scripts/policy/frameworks/control/controllee.zeek +++ b/scripts/policy/frameworks/control/controllee.zeek @@ -14,17 +14,20 @@ module Control; event zeek_init() &priority=-10 { - Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); + if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) + { + Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); - if ( Control::controllee_listen ) - Broker::listen(); + if ( Control::controllee_listen ) + Broker::listen(); + } } event Control::id_value_request(id: string) { local val = lookup_ID(id); - local reply_topic = Control::topic_prefix + "/id_value_response"; - Broker::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); + local reply_topic = Control::topic_prefix + "/id_value_response/" + Cluster::node; + Cluster::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); } event Control::peer_status_request() @@ -44,8 +47,8 @@ event Control::peer_status_request() bpeer$status); } - local topic = Control::topic_prefix + "/peer_status_response"; - Broker::publish(topic, Control::peer_status_response, status); + local topic = Control::topic_prefix + "/peer_status_response/" + Cluster::node; + Cluster::publish(topic, Control::peer_status_response, status); } event Control::net_stats_request() @@ -53,8 +56,8 @@ event Control::net_stats_request() local ns = get_net_stats(); local reply = fmt("%.6f recvd=%d dropped=%d link=%d\n", network_time(), ns$pkts_recvd, ns$pkts_dropped, ns$pkts_link); - local topic = Control::topic_prefix + "/net_stats_response"; - Broker::publish(topic, Control::net_stats_response, reply); + local topic = Control::topic_prefix + "/net_stats_response/" + Cluster::node; + Cluster::publish(topic, Control::net_stats_response, reply); } event Control::configuration_update_request() @@ -66,15 +69,15 @@ event Control::configuration_update_request() # the configuration is going to be updated. This event could be handled # by other scripts if they need to do some ancillary processing if # redef-able consts are modified at runtime. - local topic = Control::topic_prefix + "/configuration_update_response"; - Broker::publish(topic, Control::configuration_update_response); + local topic = Control::topic_prefix + "/configuration_update_response/" + Cluster::node; + Cluster::publish(topic, Control::configuration_update_response); } event Control::shutdown_request() { # Send the acknowledgement event. - local topic = Control::topic_prefix + "/shutdown_response"; - Broker::publish(topic, Control::shutdown_response); + local topic = Control::topic_prefix + "/shutdown_response/" + Cluster::node; + Cluster::publish(topic, Control::shutdown_response); # Schedule the shutdown to let the current event queue flush itself first. schedule 1sec { terminate_event() }; }