control: Use Cluster::publish() for replying

Switching to ZeroMQ as cluster backend and dabbling with zeekctl
and WebSocket, replies didn't arrive due to the usage of
Broker::publish() rather than Cluster::publish(). Additionally,
add the node name to the topic on which we reply so that the
receiver can figure out which node sent the reply. It could've
been a separate event parameter, but the topic appears just fine.
This commit is contained in:
Arne Welzel 2025-07-17 17:53:39 +00:00
parent 55cdb707e9
commit 3f2fe6fc3d

View file

@ -13,18 +13,21 @@
module Control; module Control;
event zeek_init() &priority=-10 event zeek_init() &priority=-10
{
if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER )
{ {
Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id()); Broker::subscribe(Control::topic_prefix + "/" + Broker::node_id());
if ( Control::controllee_listen ) if ( Control::controllee_listen )
Broker::listen(); Broker::listen();
} }
}
event Control::id_value_request(id: string) event Control::id_value_request(id: string)
{ {
local val = lookup_ID(id); local val = lookup_ID(id);
local reply_topic = Control::topic_prefix + "/id_value_response"; local reply_topic = Control::topic_prefix + "/id_value_response/" + Cluster::node;
Broker::publish(reply_topic, Control::id_value_response, id, fmt("%s", val)); Cluster::publish(reply_topic, Control::id_value_response, id, fmt("%s", val));
} }
event Control::peer_status_request() event Control::peer_status_request()
@ -44,8 +47,8 @@ event Control::peer_status_request()
bpeer$status); bpeer$status);
} }
local topic = Control::topic_prefix + "/peer_status_response"; local topic = Control::topic_prefix + "/peer_status_response/" + Cluster::node;
Broker::publish(topic, Control::peer_status_response, status); Cluster::publish(topic, Control::peer_status_response, status);
} }
event Control::net_stats_request() event Control::net_stats_request()
@ -53,8 +56,8 @@ event Control::net_stats_request()
local ns = get_net_stats(); local ns = get_net_stats();
local reply = fmt("%.6f recvd=%d dropped=%d link=%d\n", network_time(), local reply = fmt("%.6f recvd=%d dropped=%d link=%d\n", network_time(),
ns$pkts_recvd, ns$pkts_dropped, ns$pkts_link); ns$pkts_recvd, ns$pkts_dropped, ns$pkts_link);
local topic = Control::topic_prefix + "/net_stats_response"; local topic = Control::topic_prefix + "/net_stats_response/" + Cluster::node;
Broker::publish(topic, Control::net_stats_response, reply); Cluster::publish(topic, Control::net_stats_response, reply);
} }
event Control::configuration_update_request() event Control::configuration_update_request()
@ -66,15 +69,15 @@ event Control::configuration_update_request()
# the configuration is going to be updated. This event could be handled # the configuration is going to be updated. This event could be handled
# by other scripts if they need to do some ancillary processing if # by other scripts if they need to do some ancillary processing if
# redef-able consts are modified at runtime. # redef-able consts are modified at runtime.
local topic = Control::topic_prefix + "/configuration_update_response"; local topic = Control::topic_prefix + "/configuration_update_response/" + Cluster::node;
Broker::publish(topic, Control::configuration_update_response); Cluster::publish(topic, Control::configuration_update_response);
} }
event Control::shutdown_request() event Control::shutdown_request()
{ {
# Send the acknowledgement event. # Send the acknowledgement event.
local topic = Control::topic_prefix + "/shutdown_response"; local topic = Control::topic_prefix + "/shutdown_response/" + Cluster::node;
Broker::publish(topic, Control::shutdown_response); Cluster::publish(topic, Control::shutdown_response);
# Schedule the shutdown to let the current event queue flush itself first. # Schedule the shutdown to let the current event queue flush itself first.
schedule 1sec { terminate_event() }; schedule 1sec { terminate_event() };
} }