From 788348f9d68b43c8d115f129395953156e57b21e Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 12 Apr 2022 21:56:55 -0700 Subject: [PATCH] Management framework: allow dispatching "actions" on cluster nodes. This adds request/response event pairs to enable the controller to dispatch "actions" (pre-implemented Zeek script actions) on subsets of Zeek cluster nodes and collect the results. Using generic events to carry multiple such "run X on the nodes" scenarios simplifies adding these in the future. --- .../frameworks/management/agent/api.zeek | 28 +++- .../frameworks/management/agent/main.zeek | 123 ++++++++++++++++-- .../frameworks/management/controller/api.zeek | 2 +- .../management/controller/main.zeek | 105 +++++++++++++-- .../frameworks/management/node/api.zeek | 21 +++ .../frameworks/management/node/main.zeek | 50 ++++++- 6 files changed, 297 insertions(+), 32 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 876f121500..6607c27144 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -60,8 +60,32 @@ export { ## records, covering the nodes at this instance. The result may also ## indicate failure, with error messages indicating what went wrong. ## - global get_nodes_response: event(reqid: string, - result: Management::Result); + global get_nodes_response: event(reqid: string, result: Management::Result); + + + ## The controller sends this to every agent to request a dispatch (the + ## execution of a pre-implemented activity) to all cluster nodes. This + ## is the generic controller-agent "back-end" implementation of explicit + ## client-controller "front-end" interactions. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + global node_dispatch_request: event(reqid: string, action: vector of string); + + ## Response to a node_dispatch_request event. Each agent sends this back + ## to the controller to report the dispatch outcomes on all nodes managed + ## by that agent. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one Zeek cluster node managed by this + ## agent. Upon success, each :zeek:see:`Management::Result` record's + ## data member contains the dispatches' response in a data type + ## appropriate for the respective dispatch. + global node_dispatch_response: event(reqid: string, result: Management::ResultVec); + ## The controller sends this event to confirm to the agent that it is ## part of the current cluster topology. The agent acknowledges with the diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index d57fd7b166..2dafd0414d 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -21,10 +21,22 @@ export { type SupervisorState: record { node: string; ##< Name of the node the Supervisor is acting on. }; + + ## Request state for node dispatches, tracking the requested action + ## as well as received responses. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every node managed by this agent. + requests: set[string] &default=set(); + }; } redef record Management::Request::Request += { supervisor_state: SupervisorState &optional; + node_dispatch_state: NodeDispatchState &optional; }; # Tag our logs correctly @@ -285,6 +297,81 @@ event Management::Agent::API::get_nodes_request(reqid: string) Management::Log::info(fmt("issued supervisor status, %s", req$id)); } +event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) + { + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local nreq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(nreq) ) + return; + + # Find the original request from the controller + local req = Management::Request::lookup(nreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Mark the responding node as done. Nodes normally fill their own name + # into the result; we only double-check for resilience. Nodes failing to + # report themselves would eventually lead to request timeout. + if ( result?$node && result$node in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[result$node]; + + # The usual special treatment for Broker values that are of type "any": + # confirm their type here based on the requested dispatch command. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + # The result has the reporting node filled in but not the agent/instance + # (which the node doesn't know about), so add it now. + result$instance = Management::Agent::instance()$name; + + # Add this result to the overall response + req$results[|req$results|] = result; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Release the agent-nodes request state, since we now have all responses. + Management::Request::finish(nreq$id); + + # Send response event back to controller and clean up main request state. + Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s", + Management::Request::to_string(req))); + event Management::Agent::API::node_dispatch_response(req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s", reqid, action)); + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$node_dispatch_state = NodeDispatchState($action=action); + + for ( node in g_nodes ) + add req$node_dispatch_state$requests[node]; + + # We use a single request record to track all node responses. We know + # when all nodes have responded via the requests set we built up above. + local nreq = Management::Request::create(); + nreq$parent_id = reqid; + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action)); + Broker::publish(Management::Node::node_topic, + Management::Node::API::node_dispatch_request, nreq$id, action); + } + event Management::Agent::API::agent_welcome_request(reqid: string) { Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid)); @@ -357,21 +444,31 @@ event zeek_init() # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. - Broker::auto_publish(agent_topic, Management::Agent::API::get_nodes_response); - Broker::auto_publish(agent_topic, Management::Agent::API::set_configuration_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_welcome_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_standby_response); + local agent_to_controller_events: vector of any = [ + Management::Agent::API::get_nodes_response, + Management::Agent::API::set_configuration_response, + Management::Agent::API::agent_welcome_response, + Management::Agent::API::agent_standby_response, + Management::Agent::API::node_dispatch_response, + Management::Agent::API::notify_agent_hello, + Management::Agent::API::notify_change, + Management::Agent::API::notify_error, + Management::Agent::API::notify_log + ]; - Broker::auto_publish(agent_topic, Management::Agent::API::notify_agent_hello); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_change); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_error); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_log); + for ( i in agent_to_controller_events ) + Broker::auto_publish(agent_topic, agent_to_controller_events[i]); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + local agent_to_sup_events: vector of any = [ + SupervisorControl::create_request, + SupervisorControl::status_request, + SupervisorControl::destroy_request, + SupervisorControl::restart_request, + SupervisorControl::stop_request + ]; + + for ( i in agent_to_sup_events ) + Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]); # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 575f90009a..9b374d54f4 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -71,7 +71,7 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type`vector` of :zeek:see:`Management::Result` + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one cluster instance. Each record's data ## member is a vector of :zeek:see:`Management::NodeStatus` ## records, covering the nodes at that instance. Results may also indicate diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index cac27ac814..d7a292d7f4 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -36,6 +36,25 @@ export { requests: set[string] &default=set(); }; + ## Request state for node dispatch requests, to track the requested + ## action and received responses. Node dispatches are requests to + ## execute pre-implemented actions on every node in the cluster, + ## and report their outcomes. See + ## :zeek:see:`Management::Agent::API::node_dispatch_request` and + ## :zeek:see:`Management::Agent::API::node_dispatch_response` for the + ## agent/controller interaction. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every controller/agent transaction. + ## The set of strings tracks the node names from which + ## we still expect responses, before we can respond back + ## to the client. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -43,6 +62,7 @@ export { redef record Management::Request::Request += { set_configuration_state: SetConfigurationState &optional; get_nodes_state: GetNodesState &optional; + node_dispatch_state: NodeDispatchState &optional; test_state: TestState &optional; }; @@ -485,7 +505,7 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme if ( Management::Request::is_null(areq) ) return; - # Release the request, which is now done. + # Release the request, since this agent is now done. Management::Request::finish(areq$id); # Find the original request from the client @@ -554,22 +574,75 @@ event Management::Controller::API::get_nodes_request(reqid: string) } } +event Management::Agent::API::node_dispatch_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + { + # Same special treatment for Broker values that are of + # type "any": confirm their (known) type here. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + req$results[|req$results|] = results[i]; + } + + # Mark this request as done + if ( areq$id in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Send response event to the client based upon the dispatch type. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + Management::Request::finish(req$id); + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members # to identify how to respond. No need to clean up the request itself, # since we're getting here via the request module's expiration # mechanism that handles the cleanup. - local res: Management::Result; + local res = Management::Result($reqid=req$id, + $success = F, + $error = "request timed out"); if ( req?$set_configuration_state ) { # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; - - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", @@ -579,9 +652,6 @@ event Management::Request::request_expired(req: Management::Request::Request) if ( req?$get_nodes_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", @@ -589,12 +659,21 @@ event Management::Request::request_expired(req: Management::Request::Request) event Management::Controller::API::get_nodes_response(req$id, req$results); } + if ( req?$node_dispatch_state ) + { + req$results += res; + + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + } + if ( req?$test_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; - Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); event Management::Controller::API::test_timeout_response(req$id, res); } diff --git a/scripts/policy/frameworks/management/node/api.zeek b/scripts/policy/frameworks/management/node/api.zeek index 4eac9d0771..85cb010463 100644 --- a/scripts/policy/frameworks/management/node/api.zeek +++ b/scripts/policy/frameworks/management/node/api.zeek @@ -7,6 +7,27 @@ module Management::Node::API; export { + ## Management agents send this event to every Zeek cluster node to run a + ## "dispatch" -- a particular, pre-implemented action. This is the agent-node + ## complement to :zeek:see:`Management::Agent::API::node_dispatch_request`. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + global node_dispatch_request: event(reqid: string, action: vector of string); + + ## Response to a node_dispatch_request event. The nodes send this back + ## to the agent. This is the agent-node equivalent of + ## :zeek:see:`Management::Agent::API::node_dispatch_response`. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:see:`Management::Result` record covering one Zeek + ## cluster node managed by the agent. Upon success, the data field + ## contains a value appropriate for the requested dispatch. + global node_dispatch_response: event(reqid: string, result: Management::Result); + + # Notification events, node -> agent ## The cluster nodes send this event upon peering as a "check-in" to diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek index 92d41acfbe..68749c11b5 100644 --- a/scripts/policy/frameworks/management/node/main.zeek +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -1,10 +1,12 @@ -##! This module provides Management framework functionality that needs to be -##! present in every cluster node to allow Management agents to interact with -##! the cluster nodes they manage. +##! This module provides Management framework functionality present in every +##! cluster node, to allowing Management agents to interact with the nodes. + +@load base/frameworks/cluster @load policy/frameworks/management/agent/config @load policy/frameworks/management/log +@load ./api @load ./config module Management::Node; @@ -12,6 +14,47 @@ module Management::Node; # Tag our logs correctly redef Management::Log::role = Management::NODE; +## The type of dispatch callbacks. These implement a particular dispatch action, +## using the provided string vector as arguments, filling results into the +## provided result record. +type DispatchCallback: function(args: vector of string, res: Management::Result); + +global g_dispatch_table: table[string] of DispatchCallback = { +}; + +event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string) + { + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s", reqid, action)); + + local res = Management::Result( + $reqid = reqid, $node = Cluster::node); + + if ( |action| == 0 ) + { + res$success = F; + res$error = "no dispatch arguments provided"; + } + else if ( action[0] !in g_dispatch_table ) + { + res$success = F; + res$error = fmt("dispatch %s unknown", action[0]); + } + + if ( ! res$success ) + { + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + return; + } + + g_dispatch_table[action[0]](action[1:], res); + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + } + event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { local epi = Management::Agent::endpoint_info(); @@ -31,6 +74,7 @@ event zeek_init() # Events automatically sent to the Management agent. local events: vector of any = [ + Management::Node::API::node_dispatch_response, Management::Node::API::notify_node_hello ];