From b23d29241086b133e3cba7d96f8fe7584cd6b7ee Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 18 Apr 2022 17:52:56 -0700 Subject: [PATCH] Management framework: consistency fixes around event() vs Broker::publish() Switch to using Broker::publish() for any event we only send to a peered entity, and not to drive local processing. Also minor indentation cleanup. --- .../frameworks/management/agent/main.zeek | 96 +++++++++---------- .../frameworks/management/controller/api.zeek | 2 +- .../management/controller/main.zeek | 87 +++++++++-------- .../frameworks/management/node/main.zeek | 23 ++--- 4 files changed, 97 insertions(+), 111 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index f3a8bd2809..45993b29c4 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -57,6 +57,12 @@ global g_nodes: table[string] of Management::Node; global g_cluster: table[string] of Supervisor::ClusterEndpoint; +function agent_topic(): string + { + local epi = Management::Agent::endpoint_info(); + return Management::Agent::topic_prefix + "/" + epi$id; + } + event SupervisorControl::create_response(reqid: string, result: string) { local req = Management::Request::lookup(reqid); @@ -69,7 +75,9 @@ event SupervisorControl::create_response(reqid: string, result: string) { local msg = fmt("failed to create node %s: %s", name, result); Management::Log::error(msg); - event Management::Agent::API::notify_error(Management::Agent::name, msg, name); + Broker::publish(agent_topic(), + Management::Agent::API::notify_error, + Management::Agent::name, msg, name); } Management::Request::finish(reqid); @@ -87,7 +95,9 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) { local msg = fmt("failed to destroy node %s, %s", name, reqid); Management::Log::error(msg); - event Management::Agent::API::notify_error(Management::Agent::name, msg, name); + Broker::publish(agent_topic(), + Management::Agent::API::notify_error, + Management::Agent::name, msg, name); } Management::Request::finish(reqid); @@ -97,7 +107,8 @@ function supervisor_create(nc: Supervisor::NodeConfig) { local req = Management::Request::create(); req$supervisor_state = SupervisorState($node = nc$name); - event SupervisorControl::create_request(req$id, nc); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::create_request, req$id, nc); Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); } @@ -105,7 +116,8 @@ function supervisor_destroy(node: string) { local req = Management::Request::create(); req$supervisor_state = SupervisorState($node = node); - event SupervisorControl::destroy_request(req$id, node); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::destroy_request, req$id, node); Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); } @@ -205,8 +217,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M $instance = Management::Agent::name); Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", - Management::result_to_string(res))); - event Management::Agent::API::set_configuration_response(reqid, res); + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::set_configuration_response, reqid, res); } } @@ -294,8 +307,9 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat res$data = node_statuses; Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s", - Management::result_to_string(res))); - event Management::Agent::API::get_nodes_response(req$parent_id, res); + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::get_nodes_response, req$parent_id, res); } event Management::Agent::API::get_nodes_request(reqid: string) @@ -305,7 +319,8 @@ event Management::Agent::API::get_nodes_request(reqid: string) local req = Management::Request::create(); req$parent_id = reqid; - event SupervisorControl::status_request(req$id, ""); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::status_request, req$id, ""); Management::Log::info(fmt("issued supervisor status, %s", req$id)); } @@ -375,7 +390,8 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # Send response event back to controller and clean up main request state. Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s", Management::Request::to_string(req))); - event Management::Agent::API::node_dispatch_response(req$id, req$results); + Broker::publish(agent_topic(), + Management::Agent::API::node_dispatch_response, req$id, req$results); Management::Request::finish(req$id); } @@ -405,7 +421,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto Management::Log::info(fmt( "tx Management::Agent::API::node_dispatch_response %s, no node overlap", reqid)); - event Management::Agent::API::node_dispatch_response(reqid, vector()); + Broker::publish(agent_topic(), + Management::Agent::API::node_dispatch_response, reqid, vector()); return; } } @@ -417,7 +434,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto Management::Log::info(fmt( "tx Management::Agent::API::node_dispatch_response %s, no nodes registered", reqid)); - event Management::Agent::API::node_dispatch_response(reqid, vector()); + Broker::publish(agent_topic(), + Management::Agent::API::node_dispatch_response, reqid, vector()); return; } else @@ -453,7 +471,8 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto Management::Log::info(fmt( "tx Management::Agent::API::node_dispatch_response %s, no nodes running", reqid)); - event Management::Agent::API::node_dispatch_response(reqid, req$results); + Broker::publish(agent_topic(), + Management::Agent::API::node_dispatch_response, reqid, req$results); Management::Request::finish(req$id); return; } @@ -478,8 +497,9 @@ event Management::Agent::API::agent_welcome_request(reqid: string) $instance = Management::Agent::name); Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_response %s", - Management::result_to_string(res))); - event Management::Agent::API::agent_welcome_response(reqid, res); + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::agent_welcome_response, reqid, res); } event Management::Agent::API::agent_standby_request(reqid: string) @@ -498,8 +518,9 @@ event Management::Agent::API::agent_standby_request(reqid: string) $instance = Management::Agent::name); Management::Log::info(fmt("tx Management::Agent::API::agent_standby_response %s", - Management::result_to_string(res))); - event Management::Agent::API::agent_standby_response(reqid, res); + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::agent_standby_response, reqid, res); } event Management::Node::API::notify_node_hello(node: string) @@ -518,8 +539,10 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) local epi = Management::Agent::endpoint_info(); - event Management::Agent::API::notify_agent_hello(epi$id, - to_addr(epi$network$address), Management::Agent::API::version); + Broker::publish(agent_topic(), + Management::Agent::API::notify_agent_hello, + epi$id, to_addr(epi$network$address), + Management::Agent::API::version); } # XXX We may want a request timeout event handler here. It's arguably cleaner to @@ -529,7 +552,6 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event zeek_init() { local epi = Management::Agent::endpoint_info(); - local agent_topic = Management::Agent::topic_prefix + "/" + epi$id; # The agent needs to peer with the supervisor -- this doesn't currently # happen automatically. The address defaults to Broker's default, which @@ -543,45 +565,17 @@ event zeek_init() # Agents need receive communication targeted at it, any responses # from the supervisor, and any responses from cluster nodes. - Broker::subscribe(agent_topic); + Broker::subscribe(agent_topic()); Broker::subscribe(SupervisorControl::topic_prefix); Broker::subscribe(Management::Node::node_topic); - # Auto-publish a bunch of events. Glob patterns or module-level - # auto-publish would be helpful here. - local agent_to_controller_events: vector of any = [ - Management::Agent::API::get_nodes_response, - Management::Agent::API::set_configuration_response, - Management::Agent::API::agent_welcome_response, - Management::Agent::API::agent_standby_response, - Management::Agent::API::node_dispatch_response, - Management::Agent::API::notify_agent_hello, - Management::Agent::API::notify_change, - Management::Agent::API::notify_error, - Management::Agent::API::notify_log - ]; - - for ( i in agent_to_controller_events ) - Broker::auto_publish(agent_topic, agent_to_controller_events[i]); - - local agent_to_sup_events: vector of any = [ - SupervisorControl::create_request, - SupervisorControl::status_request, - SupervisorControl::destroy_request, - SupervisorControl::restart_request, - SupervisorControl::stop_request - ]; - - for ( i in agent_to_sup_events ) - Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]); - # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) { # We connect to the controller. Broker::peer(Management::Agent::controller$address, - Management::Agent::controller$bound_port, - Management::connect_retry); + Management::Agent::controller$bound_port, + Management::connect_retry); } # The agent always listens, to allow cluster nodes to peer with it. diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 0ce3259a4c..ad9b22cc26 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -135,7 +135,7 @@ export { result: Management::Result); - # Notification events, agent -> controller + # Notification events ## The controller triggers this event when the operational cluster ## instances align with the ones desired by the cluster diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 441e03517c..72ecf2be30 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -113,8 +113,7 @@ global g_config_reqid_pending: string = ""; # the one we send whenever the client requests it. global g_config_current: Management::Configuration; -function send_config_to_agents(req: Management::Request::Request, - config: Management::Configuration) +function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration) { for ( name in g_instances ) { @@ -156,7 +155,7 @@ function add_instance(inst: Management::Instance) if ( inst?$listen_port ) Broker::peer(cat(inst$host), inst$listen_port, - Management::connect_retry); + Management::connect_retry); if ( inst$name in g_instances_known ) { @@ -169,7 +168,7 @@ function add_instance(inst: Management::Instance) Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", inst$name)); Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, - Management::Agent::API::agent_welcome_request, req$id); + Management::Agent::API::agent_welcome_request, req$id); } } @@ -181,7 +180,7 @@ function drop_instance(inst: Management::Instance) # Send the agent a standby so it shuts down its cluster nodes & state Management::Log::info(fmt("tx Management::Agent::API::agent_standby_request to %s", inst$name)); Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, - Management::Agent::API::agent_standby_request, ""); + Management::Agent::API::agent_standby_request, ""); delete g_instances[inst$name]; @@ -292,7 +291,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance)); Broker::publish(Management::Agent::topic_prefix + "/" + instance, - Management::Agent::API::agent_welcome_request, req$id); + Management::Agent::API::agent_welcome_request, req$id); } } @@ -315,7 +314,7 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana Management::Log::info(fmt( "tx Management::Agent::API::agent_standby_request to %s", result$instance)); Broker::publish(Management::Agent::topic_prefix + "/" + result$instance, - Management::Agent::API::agent_standby_request, ""); + Management::Agent::API::agent_standby_request, ""); return; } @@ -326,8 +325,7 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana } event Management::Agent::API::notify_change(instance: string, n: Management::Node, - old: Management::State, - new: Management::State) + old: Management::State, new: Management::State) { # XXX TODO } @@ -379,8 +377,9 @@ event Management::Agent::API::set_configuration_response(reqid: string, result: g_config_reqid_pending = ""; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - event Management::Controller::API::set_configuration_response(req$id, req$results); + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::set_configuration_response, req$id, req$results); Management::Request::finish(req$id); } @@ -402,8 +401,9 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf req$results += res; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - event Management::Controller::API::set_configuration_response(req$id, req$results); + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::set_configuration_response, req$id, req$results); Management::Request::finish(req$id); return; } @@ -508,7 +508,8 @@ event Management::Controller::API::get_instances_request(reqid: string) res$data = insts; Management::Log::info(fmt("tx Management::Controller::API::get_instances_response %s", reqid)); - event Management::Controller::API::get_instances_response(reqid, res); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_instances_response, reqid, res); } event Management::Agent::API::get_nodes_response(reqid: string, result: Management::Result) @@ -551,8 +552,9 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme return; Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", - Management::Request::to_string(req))); - event Management::Controller::API::get_nodes_response(req$id, req$results); + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_nodes_response, req$id, req$results); Management::Request::finish(req$id); } @@ -564,9 +566,10 @@ event Management::Controller::API::get_nodes_request(reqid: string) if ( |g_instances| == 0 ) { Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid)); - event Management::Controller::API::get_nodes_response(reqid, vector( - Management::Result($reqid=reqid, $success=F, - $error="no instances connected"))); + local res = Management::Result($reqid=reqid, $success=F, + $error="no instances connected"); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_nodes_response, reqid, vector(res)); return; } @@ -643,7 +646,9 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man Management::Log::info(fmt( "tx Management::Controller::API::get_id_value_response %s", Management::Request::to_string(req))); - event Management::Controller::API::get_id_value_response(req$id, req$results); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_id_value_response, + req$id, req$results); break; default: Management::Log::error(fmt("unexpected dispatch command %s", @@ -658,12 +663,16 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin { Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id)); + local res: Management::Result; + # Special case: if we have no instances, respond right away. if ( |g_instances| == 0 ) { Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); - event Management::Controller::API::get_id_value_response(reqid, vector( - Management::Result($reqid=reqid, $success=F, $error="no instances connected"))); + res = Management::Result($reqid=reqid, $success=F, $error="no instances connected"); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_id_value_response, + reqid, vector(res)); return; } @@ -673,7 +682,6 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin local nodes_final: set[string]; local node: string; - local res: Management::Result; # Input sanitization: check for any requested nodes that aren't part of # the current configuration. We send back error results for those and @@ -700,7 +708,9 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin Management::Log::info(fmt( "tx Management::Controller::API::get_id_value_response %s", Management::Request::to_string(req))); - event Management::Controller::API::get_id_value_response(req$id, req$results); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_id_value_response, + req$id, req$results); Management::Request::finish(req$id); return; } @@ -745,8 +755,9 @@ event Management::Request::request_expired(req: Management::Request::Request) req$results += res; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - event Management::Controller::API::set_configuration_response(req$id, req$results); + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::set_configuration_response, req$id, req$results); } if ( req?$get_nodes_state ) @@ -754,8 +765,9 @@ event Management::Request::request_expired(req: Management::Request::Request) req$results += res; Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", - Management::Request::to_string(req))); - event Management::Controller::API::get_nodes_response(req$id, req$results); + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_nodes_response, req$id, req$results); } if ( req?$node_dispatch_state ) @@ -768,7 +780,9 @@ event Management::Request::request_expired(req: Management::Request::Request) Management::Log::info(fmt( "tx Management::Controller::API::get_id_value_response %s", Management::Request::to_string(req))); - event Management::Controller::API::get_id_value_response(req$id, req$results); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_id_value_response, + req$id, req$results); break; default: Management::Log::error(fmt("unexpected dispatch command %s", @@ -780,7 +794,8 @@ event Management::Request::request_expired(req: Management::Request::Request) if ( req?$test_state ) { Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); - event Management::Controller::API::test_timeout_response(req$id, res); + Broker::publish(Management::Controller::topic, + Management::Controller::API::test_timeout_response, req$id, res); } } @@ -817,17 +832,5 @@ event zeek_init() Broker::subscribe(Management::Agent::topic_prefix); Broker::subscribe(Management::Controller::topic); - # Events sent to the client: - local events: vector of any = [ - Management::Controller::API::get_instances_response, - Management::Controller::API::set_configuration_response, - Management::Controller::API::get_nodes_response, - Management::Controller::API::get_id_value_response, - Management::Controller::API::test_timeout_response - ]; - - for ( i in events ) - Broker::auto_publish(Management::Controller::topic, events[i]); - Management::Log::info("controller is live"); } diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek index cd9819ff3d..52aa5f1e96 100644 --- a/scripts/policy/frameworks/management/node/main.zeek +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -51,8 +51,7 @@ global g_dispatch_table: table[string] of DispatchCallback = { ["get_id_value"] = dispatch_get_id_value, }; -event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, - nodes: set[string]) +event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string]) { Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes)); @@ -64,8 +63,7 @@ event Management::Node::API::node_dispatch_request(reqid: string, action: vector return; } - local res = Management::Result( - $reqid = reqid, $node = Cluster::node); + local res = Management::Result($reqid = reqid, $node = Cluster::node); if ( |action| == 0 ) { @@ -82,15 +80,15 @@ event Management::Node::API::node_dispatch_request(reqid: string, action: vector { Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", Management::result_to_string(res))); - event Management::Node::API::node_dispatch_response(reqid, res); + Broker::publish(node_topic, Management::Node::API::node_dispatch_response, reqid, res); return; } g_dispatch_table[action[0]](action[1:], res); Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", - Management::result_to_string(res))); - event Management::Node::API::node_dispatch_response(reqid, res); + Management::result_to_string(res))); + Broker::publish(node_topic, Management::Node::API::node_dispatch_response, reqid, res); } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) @@ -100,7 +98,7 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) # If this is the agent peering, notify it that we're ready if ( peer$network$address == epi$network$address && peer$network$bound_port == epi$network$bound_port ) - event Management::Node::API::notify_node_hello(Cluster::node); + Broker::publish(node_topic, Management::Node::API::notify_node_hello, Cluster::node); } event zeek_init() @@ -109,13 +107,4 @@ event zeek_init() Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry); Broker::subscribe(node_topic); - - # Events automatically sent to the Management agent. - local events: vector of any = [ - Management::Node::API::node_dispatch_response, - Management::Node::API::notify_node_hello - ]; - - for ( i in events ) - Broker::auto_publish(node_topic, events[i]); }