From b9879a50a07a0544c93846e3f1a4c110ffa15949 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 17:11:58 -0700 Subject: [PATCH] Management framework: node restart support This adds restart request/response event pairs that restart nodes in the running Zeek cluster. The implementation is very similar to get_id_value, which also involves distributing a list of nodes to agents and aggregating the responses. --- .../frameworks/management/agent/api.zeek | 29 ++++ .../frameworks/management/agent/main.zeek | 140 ++++++++++++++++ .../frameworks/management/controller/api.zeek | 30 ++++ .../management/controller/main.zeek | 155 ++++++++++++++++++ 4 files changed, 354 insertions(+) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 880a075194..fab289a8e5 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -146,6 +146,35 @@ export { result: Management::Result); + ## The controller sends this event to ask the agent to restart currently + ## running Zeek cluster nodes. For nodes currently running, the agent + ## places these nodes into PENDING state and sends restart events to the + ## Supervisor, rendering its responses into a list of + ## :zeek:see:`Management::Result` records summarizing each node restart. + ## When restarted nodes check in with the agent, they switch back to + ## RUNNING state. The agent ignores nodes not currently running. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all of the + ## agent's current cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Agent::API::restart_request` + ## event. The agent sends this back to the controller when the + ## Supervisor has restarted all nodes affected, or a timoeut occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, one + ## for each Supervisor transaction. Each such result identifies both + ## the instance and node. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + + # Notification events, agent -> controller ## The agent sends this event upon peering as a "check-in", informing diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 63ca551c20..4111828770 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -27,6 +27,8 @@ export { node: string &default=""; ## The result of a status request. status: Supervisor::Status &optional; + ## The result of a restart request. + restart_result: bool &optional; }; ## Request state for deploy requests. @@ -47,6 +49,13 @@ export { requests: set[string] &default=set(); }; + ## Request state for restart requests, tracking eceived responses. + type RestartState: record { + ## Request state for every node the agent asks the Supervisor + ## to restart. + requests: set[string] &default=set(); + }; + # When Management::Agent::archive_logs is T (the default) and the # logging configuration doesn't permanently prevent archival # (e.g. because log rotation isn't configured), the agent triggers this @@ -67,6 +76,7 @@ redef record Management::Request::Request += { supervisor_state_agent: SupervisorState &optional; deploy_state_agent: DeployState &optional; node_dispatch_state_agent: NodeDispatchState &optional; + restart_state_agent: RestartState &optional; }; # Tag our logs correctly @@ -110,6 +120,10 @@ global send_deploy_response: function(req: Management::Request::Request); # a status response. global deploy_request_finish: function(req: Management::Request::Request); +# Callback completing a restart_request after the Supervisor has delivered +# a restart response. +global restart_request_finish: function(req: Management::Request::Request); + # Callback completing a get_nodes_request after the Supervisor has delivered # a status response. global get_nodes_request_finish: function(req: Management::Request::Request); @@ -858,6 +872,124 @@ event Management::Agent::API::agent_standby_request(reqid: string) Management::Agent::API::agent_standby_response, reqid, res); } +function restart_request_finish(sreq: Management::Request::Request) + { + # This is the finish callback we set on requests to the Supervisor to + # restart a node. We look up the parent request (the one sent to us by + # the controller), mark the node in question as done, and respond to the + # controller if we've handled all required nodes. + + local req = Management::Request::lookup(sreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + local node = sreq$supervisor_state_agent$node; + + local res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name(), + $node = node); + + if ( ! sreq$supervisor_state_agent$restart_result ) + { + res$success = F; + res$error = fmt("could not restart node %s", node); + } + + req$results += res; + + if ( node in req$restart_state_agent$requests ) + { + delete req$restart_state_agent$requests[node]; + if ( |req$restart_state_agent$requests| > 0 ) + return; + } + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::restart_request(reqid: string, nodes: set[string]) + { + # This is very similar to node_dispatch_request, because it too works + # with a list of nodes that needs to be dispatched to agents. + + Management::Log::info(fmt("rx Management::Agent::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local node: string; + local cluster_nodes: set[string]; + local nodes_final: set[string]; + + for ( node in g_nodes ) + add cluster_nodes[node]; + + # If this request includes cluster nodes to query, check if this agent + # manages any of those nodes. If it doesn't, respond with an empty + # results vector immediately. Note that any globally unknown nodes + # that the client might have requested already got filtered by the + # controller, so we don't need to worry about them here. + + if ( |nodes| > 0 ) + { + nodes_final = nodes & cluster_nodes; + + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no node overlap", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + } + else if ( |g_nodes| == 0 ) + { + # Special case: the client did not request specific nodes. If + # we aren't running any nodes, respond right away, since there's + # nothing to restart. + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no nodes registered", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + else + { + # We restart all nodes. + nodes_final = cluster_nodes; + } + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$restart_state_agent = RestartState(); + + # Build up state for tracking responses. + for ( node in nodes_final ) + add req$restart_state_agent$requests[node]; + + # Ask the Supervisor to restart nodes. We need to enumerate the nodes + # because restarting all (via "") would hit the agent (and the + # controller, if co-located). + for ( node in nodes_final ) + { + local sreq = supervisor_restart(node); + sreq$parent_id = reqid; + sreq$finish = restart_request_finish; + + if ( node in g_nodes ) + g_nodes[node]$state = Management::PENDING; + } + } + event Management::Node::API::notify_node_hello(node: string) { Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); @@ -900,6 +1032,14 @@ event Management::Request::request_expired(req: Management::Request::Request) # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; } + + if ( req?$restart_state_agent ) + { + Management::Log::info(fmt("tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, req$id, req$results); + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index bb6a2dec30..d97773b75d 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -170,6 +170,36 @@ export { global get_id_value_response: event(reqid: string, results: Management::ResultVec); + ## The client sends this event to restart currently running Zeek cluster + ## nodes. The controller relays the request to its agents, which respond + ## with a list of :zeek:see:`Management::Result` records summarizing + ## each node restart. The controller combines these lists, and sends a + ## :zeek:see:`Management::Controller::API::restart_response` event with + ## the result. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all current + ## cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Controller::API::restart_request` + ## event. The controller sends this back to the client when it has received + ## responses from all agents involved, or a timeout occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, + ## combining the restart results from all agents. Each such result + ## identifies both the instance and node in question. Results that + ## do not identify an instance are generated by the controller, + ## flagging corner cases, including absence of a deployed cluster + ## or unknown nodes. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index d2063e3496..c88e3127d0 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -70,6 +70,14 @@ export { requests: set[string] &default=set(); }; + ## Request state specific to + ## :zeek:see:`Management::Controller::API::restart_request` and + ## :zeek:see:`Management::Controller::API::restart_response`. + type RestartState: record { + ## Request state for every controller/agent transaction. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -78,6 +86,7 @@ redef record Management::Request::Request += { deploy_state: DeployState &optional; get_nodes_state: GetNodesState &optional; node_dispatch_state: NodeDispatchState &optional; + restart_state: RestartState &optional; test_state: TestState &optional; }; @@ -1238,6 +1247,141 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin } } +event Management::Agent::API::restart_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::restart_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + req$results += results[i]; + + # Mark this request as done: + if ( areq$id in req$restart_state$requests ) + delete req$restart_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$restart_state$requests| > 0 ) + return; + + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Controller::API::restart_request(reqid: string, nodes: set[string]) + { + # This works almost exactly like get_id_value_request, because it too + # works with a list of nodes that needs to be dispatched to agents. + + local send_error_response = function(req: Management::Request::Request, error: string) + { + local res = Management::Result($reqid=req$id, $success=F, $error=error); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, req$id, req$results); + }; + + Management::Log::info(fmt("rx Management::Controller::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local res: Management::Result; + local req = Management::Request::create(reqid); + req$restart_state = RestartState(); + + # Special cases: if we have no instances or no deployment, respond right away. + if ( |g_instances_known| == 0 ) + { + send_error_response(req, "no instances connected"); + Management::Request::finish(reqid); + return; + } + + if ( DEPLOYED !in g_configs ) + { + send_error_response(req, "no active cluster deployment"); + Management::Request::finish(reqid); + return; + } + + local nodes_final: set[string]; + local node: string; + + # Input sanitization: check for any requested nodes that aren't part of + # the deployed configuration. We send back error results for those and + # don't propagate them to the agents. + if ( |nodes| > 0 ) + { + # Requested nodes that are in the deployed configuration: + nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes); + # Requested nodes that are not in current configuration: + local nodes_invalid = nodes - nodes_final; + + # Assemble error results for all invalid nodes + for ( node in nodes_invalid ) + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = "unknown cluster node"; + req$results += res; + } + + # If only invalid nodes got requested, we're now done. + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + return; + } + } + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = Management::Agent::topic_prefix + "/" + name; + local areq = Management::Request::create(); + + areq$parent_id = req$id; + add req$restart_state$requests[areq$id]; + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_request %s to %s", + areq$id, name)); + + Broker::publish(agent_topic, + Management::Agent::API::restart_request, + areq$id, nodes); + } + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -1296,6 +1440,17 @@ event Management::Request::request_expired(req: Management::Request::Request) } } + if ( req?$restart_state ) + { + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + } + if ( req?$test_state ) { Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));