Management framework: consistency fixes around event() vs Broker::publish()

Switch to using Broker::publish() for any event we only send to a peered entity,
and not to drive local processing.

Also minor indentation cleanup.
This commit is contained in:
Christian Kreibich 2022-04-18 17:52:56 -07:00
parent 4d24b9d9b8
commit b23d292410
4 changed files with 97 additions and 111 deletions

View file

@ -57,6 +57,12 @@ global g_nodes: table[string] of Management::Node;
global g_cluster: table[string] of Supervisor::ClusterEndpoint; global g_cluster: table[string] of Supervisor::ClusterEndpoint;
function agent_topic(): string
{
local epi = Management::Agent::endpoint_info();
return Management::Agent::topic_prefix + "/" + epi$id;
}
event SupervisorControl::create_response(reqid: string, result: string) event SupervisorControl::create_response(reqid: string, result: string)
{ {
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
@ -69,7 +75,9 @@ event SupervisorControl::create_response(reqid: string, result: string)
{ {
local msg = fmt("failed to create node %s: %s", name, result); local msg = fmt("failed to create node %s: %s", name, result);
Management::Log::error(msg); Management::Log::error(msg);
event Management::Agent::API::notify_error(Management::Agent::name, msg, name); Broker::publish(agent_topic(),
Management::Agent::API::notify_error,
Management::Agent::name, msg, name);
} }
Management::Request::finish(reqid); Management::Request::finish(reqid);
@ -87,7 +95,9 @@ event SupervisorControl::destroy_response(reqid: string, result: bool)
{ {
local msg = fmt("failed to destroy node %s, %s", name, reqid); local msg = fmt("failed to destroy node %s, %s", name, reqid);
Management::Log::error(msg); Management::Log::error(msg);
event Management::Agent::API::notify_error(Management::Agent::name, msg, name); Broker::publish(agent_topic(),
Management::Agent::API::notify_error,
Management::Agent::name, msg, name);
} }
Management::Request::finish(reqid); Management::Request::finish(reqid);
@ -97,7 +107,8 @@ function supervisor_create(nc: Supervisor::NodeConfig)
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state = SupervisorState($node = nc$name); req$supervisor_state = SupervisorState($node = nc$name);
event SupervisorControl::create_request(req$id, nc); Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::create_request, req$id, nc);
Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id));
} }
@ -105,7 +116,8 @@ function supervisor_destroy(node: string)
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state = SupervisorState($node = node); req$supervisor_state = SupervisorState($node = node);
event SupervisorControl::destroy_request(req$id, node); Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::destroy_request, req$id, node);
Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id));
} }
@ -205,8 +217,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
$instance = Management::Agent::name); $instance = Management::Agent::name);
Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Agent::API::set_configuration_response(reqid, res); Broker::publish(agent_topic(),
Management::Agent::API::set_configuration_response, reqid, res);
} }
} }
@ -294,8 +307,9 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
res$data = node_statuses; res$data = node_statuses;
Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Agent::API::get_nodes_response(req$parent_id, res); Broker::publish(agent_topic(),
Management::Agent::API::get_nodes_response, req$parent_id, res);
} }
event Management::Agent::API::get_nodes_request(reqid: string) event Management::Agent::API::get_nodes_request(reqid: string)
@ -305,7 +319,8 @@ event Management::Agent::API::get_nodes_request(reqid: string)
local req = Management::Request::create(); local req = Management::Request::create();
req$parent_id = reqid; req$parent_id = reqid;
event SupervisorControl::status_request(req$id, ""); Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, req$id, "");
Management::Log::info(fmt("issued supervisor status, %s", req$id)); Management::Log::info(fmt("issued supervisor status, %s", req$id));
} }
@ -375,7 +390,8 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag
# Send response event back to controller and clean up main request state. # Send response event back to controller and clean up main request state.
Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s", Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Agent::API::node_dispatch_response(req$id, req$results); Broker::publish(agent_topic(),
Management::Agent::API::node_dispatch_response, req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
} }
@ -405,7 +421,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Agent::API::node_dispatch_response %s, no node overlap", "tx Management::Agent::API::node_dispatch_response %s, no node overlap",
reqid)); reqid));
event Management::Agent::API::node_dispatch_response(reqid, vector()); Broker::publish(agent_topic(),
Management::Agent::API::node_dispatch_response, reqid, vector());
return; return;
} }
} }
@ -417,7 +434,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Agent::API::node_dispatch_response %s, no nodes registered", "tx Management::Agent::API::node_dispatch_response %s, no nodes registered",
reqid)); reqid));
event Management::Agent::API::node_dispatch_response(reqid, vector()); Broker::publish(agent_topic(),
Management::Agent::API::node_dispatch_response, reqid, vector());
return; return;
} }
else else
@ -453,7 +471,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Agent::API::node_dispatch_response %s, no nodes running", "tx Management::Agent::API::node_dispatch_response %s, no nodes running",
reqid)); reqid));
event Management::Agent::API::node_dispatch_response(reqid, req$results); Broker::publish(agent_topic(),
Management::Agent::API::node_dispatch_response, reqid, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
return; return;
} }
@ -478,8 +497,9 @@ event Management::Agent::API::agent_welcome_request(reqid: string)
$instance = Management::Agent::name); $instance = Management::Agent::name);
Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_response %s", Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Agent::API::agent_welcome_response(reqid, res); Broker::publish(agent_topic(),
Management::Agent::API::agent_welcome_response, reqid, res);
} }
event Management::Agent::API::agent_standby_request(reqid: string) event Management::Agent::API::agent_standby_request(reqid: string)
@ -498,8 +518,9 @@ event Management::Agent::API::agent_standby_request(reqid: string)
$instance = Management::Agent::name); $instance = Management::Agent::name);
Management::Log::info(fmt("tx Management::Agent::API::agent_standby_response %s", Management::Log::info(fmt("tx Management::Agent::API::agent_standby_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Agent::API::agent_standby_response(reqid, res); Broker::publish(agent_topic(),
Management::Agent::API::agent_standby_response, reqid, res);
} }
event Management::Node::API::notify_node_hello(node: string) event Management::Node::API::notify_node_hello(node: string)
@ -518,8 +539,10 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
local epi = Management::Agent::endpoint_info(); local epi = Management::Agent::endpoint_info();
event Management::Agent::API::notify_agent_hello(epi$id, Broker::publish(agent_topic(),
to_addr(epi$network$address), Management::Agent::API::version); Management::Agent::API::notify_agent_hello,
epi$id, to_addr(epi$network$address),
Management::Agent::API::version);
} }
# XXX We may want a request timeout event handler here. It's arguably cleaner to # XXX We may want a request timeout event handler here. It's arguably cleaner to
@ -529,7 +552,6 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
event zeek_init() event zeek_init()
{ {
local epi = Management::Agent::endpoint_info(); local epi = Management::Agent::endpoint_info();
local agent_topic = Management::Agent::topic_prefix + "/" + epi$id;
# The agent needs to peer with the supervisor -- this doesn't currently # The agent needs to peer with the supervisor -- this doesn't currently
# happen automatically. The address defaults to Broker's default, which # happen automatically. The address defaults to Broker's default, which
@ -543,45 +565,17 @@ event zeek_init()
# Agents need receive communication targeted at it, any responses # Agents need receive communication targeted at it, any responses
# from the supervisor, and any responses from cluster nodes. # from the supervisor, and any responses from cluster nodes.
Broker::subscribe(agent_topic); Broker::subscribe(agent_topic());
Broker::subscribe(SupervisorControl::topic_prefix); Broker::subscribe(SupervisorControl::topic_prefix);
Broker::subscribe(Management::Node::node_topic); Broker::subscribe(Management::Node::node_topic);
# Auto-publish a bunch of events. Glob patterns or module-level
# auto-publish would be helpful here.
local agent_to_controller_events: vector of any = [
Management::Agent::API::get_nodes_response,
Management::Agent::API::set_configuration_response,
Management::Agent::API::agent_welcome_response,
Management::Agent::API::agent_standby_response,
Management::Agent::API::node_dispatch_response,
Management::Agent::API::notify_agent_hello,
Management::Agent::API::notify_change,
Management::Agent::API::notify_error,
Management::Agent::API::notify_log
];
for ( i in agent_to_controller_events )
Broker::auto_publish(agent_topic, agent_to_controller_events[i]);
local agent_to_sup_events: vector of any = [
SupervisorControl::create_request,
SupervisorControl::status_request,
SupervisorControl::destroy_request,
SupervisorControl::restart_request,
SupervisorControl::stop_request
];
for ( i in agent_to_sup_events )
Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]);
# Establish connectivity with the controller. # Establish connectivity with the controller.
if ( Management::Agent::controller$address != "0.0.0.0" ) if ( Management::Agent::controller$address != "0.0.0.0" )
{ {
# We connect to the controller. # We connect to the controller.
Broker::peer(Management::Agent::controller$address, Broker::peer(Management::Agent::controller$address,
Management::Agent::controller$bound_port, Management::Agent::controller$bound_port,
Management::connect_retry); Management::connect_retry);
} }
# The agent always listens, to allow cluster nodes to peer with it. # The agent always listens, to allow cluster nodes to peer with it.

View file

@ -135,7 +135,7 @@ export {
result: Management::Result); result: Management::Result);
# Notification events, agent -> controller # Notification events
## The controller triggers this event when the operational cluster ## The controller triggers this event when the operational cluster
## instances align with the ones desired by the cluster ## instances align with the ones desired by the cluster

View file

@ -113,8 +113,7 @@ global g_config_reqid_pending: string = "";
# the one we send whenever the client requests it. # the one we send whenever the client requests it.
global g_config_current: Management::Configuration; global g_config_current: Management::Configuration;
function send_config_to_agents(req: Management::Request::Request, function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration)
config: Management::Configuration)
{ {
for ( name in g_instances ) for ( name in g_instances )
{ {
@ -156,7 +155,7 @@ function add_instance(inst: Management::Instance)
if ( inst?$listen_port ) if ( inst?$listen_port )
Broker::peer(cat(inst$host), inst$listen_port, Broker::peer(cat(inst$host), inst$listen_port,
Management::connect_retry); Management::connect_retry);
if ( inst$name in g_instances_known ) if ( inst$name in g_instances_known )
{ {
@ -169,7 +168,7 @@ function add_instance(inst: Management::Instance)
Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", inst$name)); Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", inst$name));
Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, Broker::publish(Management::Agent::topic_prefix + "/" + inst$name,
Management::Agent::API::agent_welcome_request, req$id); Management::Agent::API::agent_welcome_request, req$id);
} }
} }
@ -181,7 +180,7 @@ function drop_instance(inst: Management::Instance)
# Send the agent a standby so it shuts down its cluster nodes & state # Send the agent a standby so it shuts down its cluster nodes & state
Management::Log::info(fmt("tx Management::Agent::API::agent_standby_request to %s", inst$name)); Management::Log::info(fmt("tx Management::Agent::API::agent_standby_request to %s", inst$name));
Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, Broker::publish(Management::Agent::topic_prefix + "/" + inst$name,
Management::Agent::API::agent_standby_request, ""); Management::Agent::API::agent_standby_request, "");
delete g_instances[inst$name]; delete g_instances[inst$name];
@ -292,7 +291,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a
Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance)); Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance));
Broker::publish(Management::Agent::topic_prefix + "/" + instance, Broker::publish(Management::Agent::topic_prefix + "/" + instance,
Management::Agent::API::agent_welcome_request, req$id); Management::Agent::API::agent_welcome_request, req$id);
} }
} }
@ -315,7 +314,7 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Agent::API::agent_standby_request to %s", result$instance)); "tx Management::Agent::API::agent_standby_request to %s", result$instance));
Broker::publish(Management::Agent::topic_prefix + "/" + result$instance, Broker::publish(Management::Agent::topic_prefix + "/" + result$instance,
Management::Agent::API::agent_standby_request, ""); Management::Agent::API::agent_standby_request, "");
return; return;
} }
@ -326,8 +325,7 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana
} }
event Management::Agent::API::notify_change(instance: string, n: Management::Node, event Management::Agent::API::notify_change(instance: string, n: Management::Node,
old: Management::State, old: Management::State, new: Management::State)
new: Management::State)
{ {
# XXX TODO # XXX TODO
} }
@ -379,8 +377,9 @@ event Management::Agent::API::set_configuration_response(reqid: string, result:
g_config_reqid_pending = ""; g_config_reqid_pending = "";
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::set_configuration_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
} }
@ -402,8 +401,9 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
req$results += res; req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::set_configuration_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
return; return;
} }
@ -508,7 +508,8 @@ event Management::Controller::API::get_instances_request(reqid: string)
res$data = insts; res$data = insts;
Management::Log::info(fmt("tx Management::Controller::API::get_instances_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_instances_response %s", reqid));
event Management::Controller::API::get_instances_response(reqid, res); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_instances_response, reqid, res);
} }
event Management::Agent::API::get_nodes_response(reqid: string, result: Management::Result) event Management::Agent::API::get_nodes_response(reqid: string, result: Management::Result)
@ -551,8 +552,9 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme
return; return;
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::get_nodes_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_nodes_response, req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
} }
@ -564,9 +566,10 @@ event Management::Controller::API::get_nodes_request(reqid: string)
if ( |g_instances| == 0 ) if ( |g_instances| == 0 )
{ {
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid));
event Management::Controller::API::get_nodes_response(reqid, vector( local res = Management::Result($reqid=reqid, $success=F,
Management::Result($reqid=reqid, $success=F, $error="no instances connected");
$error="no instances connected"))); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_nodes_response, reqid, vector(res));
return; return;
} }
@ -643,7 +646,9 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Controller::API::get_id_value_response %s", "tx Management::Controller::API::get_id_value_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::get_id_value_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_id_value_response,
req$id, req$results);
break; break;
default: default:
Management::Log::error(fmt("unexpected dispatch command %s", Management::Log::error(fmt("unexpected dispatch command %s",
@ -658,12 +663,16 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
{ {
Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id)); Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id));
local res: Management::Result;
# Special case: if we have no instances, respond right away. # Special case: if we have no instances, respond right away.
if ( |g_instances| == 0 ) if ( |g_instances| == 0 )
{ {
Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid));
event Management::Controller::API::get_id_value_response(reqid, vector( res = Management::Result($reqid=reqid, $success=F, $error="no instances connected");
Management::Result($reqid=reqid, $success=F, $error="no instances connected"))); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_id_value_response,
reqid, vector(res));
return; return;
} }
@ -673,7 +682,6 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
local nodes_final: set[string]; local nodes_final: set[string];
local node: string; local node: string;
local res: Management::Result;
# Input sanitization: check for any requested nodes that aren't part of # Input sanitization: check for any requested nodes that aren't part of
# the current configuration. We send back error results for those and # the current configuration. We send back error results for those and
@ -700,7 +708,9 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Controller::API::get_id_value_response %s", "tx Management::Controller::API::get_id_value_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::get_id_value_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_id_value_response,
req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
return; return;
} }
@ -745,8 +755,9 @@ event Management::Request::request_expired(req: Management::Request::Request)
req$results += res; req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::set_configuration_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results);
} }
if ( req?$get_nodes_state ) if ( req?$get_nodes_state )
@ -754,8 +765,9 @@ event Management::Request::request_expired(req: Management::Request::Request)
req$results += res; req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::get_nodes_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_nodes_response, req$id, req$results);
} }
if ( req?$node_dispatch_state ) if ( req?$node_dispatch_state )
@ -768,7 +780,9 @@ event Management::Request::request_expired(req: Management::Request::Request)
Management::Log::info(fmt( Management::Log::info(fmt(
"tx Management::Controller::API::get_id_value_response %s", "tx Management::Controller::API::get_id_value_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
event Management::Controller::API::get_id_value_response(req$id, req$results); Broker::publish(Management::Controller::topic,
Management::Controller::API::get_id_value_response,
req$id, req$results);
break; break;
default: default:
Management::Log::error(fmt("unexpected dispatch command %s", Management::Log::error(fmt("unexpected dispatch command %s",
@ -780,7 +794,8 @@ event Management::Request::request_expired(req: Management::Request::Request)
if ( req?$test_state ) if ( req?$test_state )
{ {
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
event Management::Controller::API::test_timeout_response(req$id, res); Broker::publish(Management::Controller::topic,
Management::Controller::API::test_timeout_response, req$id, res);
} }
} }
@ -817,17 +832,5 @@ event zeek_init()
Broker::subscribe(Management::Agent::topic_prefix); Broker::subscribe(Management::Agent::topic_prefix);
Broker::subscribe(Management::Controller::topic); Broker::subscribe(Management::Controller::topic);
# Events sent to the client:
local events: vector of any = [
Management::Controller::API::get_instances_response,
Management::Controller::API::set_configuration_response,
Management::Controller::API::get_nodes_response,
Management::Controller::API::get_id_value_response,
Management::Controller::API::test_timeout_response
];
for ( i in events )
Broker::auto_publish(Management::Controller::topic, events[i]);
Management::Log::info("controller is live"); Management::Log::info("controller is live");
} }

View file

@ -51,8 +51,7 @@ global g_dispatch_table: table[string] of DispatchCallback = {
["get_id_value"] = dispatch_get_id_value, ["get_id_value"] = dispatch_get_id_value,
}; };
event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string])
nodes: set[string])
{ {
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes)); Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes));
@ -64,8 +63,7 @@ event Management::Node::API::node_dispatch_request(reqid: string, action: vector
return; return;
} }
local res = Management::Result( local res = Management::Result($reqid = reqid, $node = Cluster::node);
$reqid = reqid, $node = Cluster::node);
if ( |action| == 0 ) if ( |action| == 0 )
{ {
@ -82,15 +80,15 @@ event Management::Node::API::node_dispatch_request(reqid: string, action: vector
{ {
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Node::API::node_dispatch_response(reqid, res); Broker::publish(node_topic, Management::Node::API::node_dispatch_response, reqid, res);
return; return;
} }
g_dispatch_table[action[0]](action[1:], res); g_dispatch_table[action[0]](action[1:], res);
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
event Management::Node::API::node_dispatch_response(reqid, res); Broker::publish(node_topic, Management::Node::API::node_dispatch_response, reqid, res);
} }
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
@ -100,7 +98,7 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
# If this is the agent peering, notify it that we're ready # If this is the agent peering, notify it that we're ready
if ( peer$network$address == epi$network$address && if ( peer$network$address == epi$network$address &&
peer$network$bound_port == epi$network$bound_port ) peer$network$bound_port == epi$network$bound_port )
event Management::Node::API::notify_node_hello(Cluster::node); Broker::publish(node_topic, Management::Node::API::notify_node_hello, Cluster::node);
} }
event zeek_init() event zeek_init()
@ -109,13 +107,4 @@ event zeek_init()
Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry); Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry);
Broker::subscribe(node_topic); Broker::subscribe(node_topic);
# Events automatically sent to the Management agent.
local events: vector of any = [
Management::Node::API::node_dispatch_response,
Management::Node::API::notify_node_hello
];
for ( i in events )
Broker::auto_publish(node_topic, events[i]);
} }