Management framework: more resilient node shutdown upon deployment

When agents had to terminate existing Zeek cluster nodes at the beginning of a
new deployment, they so far used their internal state to look up the nodes and
fired off requests to the Supervisor to shut these down. This has a problem:
when an agent restarts unexpectedly, it has no internal state, and when it then
tries to create nodes that already exist, the Supervisor complains with error
messages.

To avoid this, the agent now tears down all Supervised nodes other than agents
and controllers. In order to do so, it first needs to query the Supervisor for
the current node status, which means there are now two such status requests: one
upon deployment, and one during get_nodes requests. In order to disambiguate
these contexts in the SupervisorControl::status_request/response transactions,
we use the finish() callback in the corresponding request state to continue
execution as needed.
This commit is contained in:
Christian Kreibich 2022-06-19 17:29:50 -07:00
parent 1faf1ab8b7
commit a622e28eab

View file

@ -23,7 +23,10 @@ module Management::Agent::Runtime;
export { export {
## Request state specific to the agent's Supervisor interactions. ## Request state specific to the agent's Supervisor interactions.
type SupervisorState: record { type SupervisorState: record {
node: string; ##< Name of the node the Supervisor is acting on. ## Name of the node the Supervisor is acting on, if applicable.
node: string &default="";
## The result of a status request.
status: Supervisor::Status &optional;
}; };
## Request state for deploy requests. ## Request state for deploy requests.
@ -87,6 +90,14 @@ global supervisor_network_info: function(): Broker::NetworkInfo;
# and sends response event. # and sends response event.
global send_deploy_response: function(req: Management::Request::Request); global send_deploy_response: function(req: Management::Request::Request);
# Callback completing a deploy_request after the Supervisor has delivered
# a status response.
global deploy_request_finish: function(req: Management::Request::Request);
# Callback completing a get_nodes_request after the Supervisor has delivered
# a status response.
global get_nodes_request_finish: function(req: Management::Request::Request);
# The global configuration, as deployed by the controller. # The global configuration, as deployed by the controller.
global g_config: Management::Configuration; global g_config: Management::Configuration;
@ -308,6 +319,9 @@ event Management::Agent::API::deploy_request(reqid: string, config: Management::
return; return;
} }
local req = Management::Request::create(reqid);
req$deploy_state_agent = DeployState();
# Adopt the global configuration provided. The act of trying to launch # Adopt the global configuration provided. The act of trying to launch
# the requested nodes perturbs any existing ones one way or another, so # the requested nodes perturbs any existing ones one way or another, so
# even if the launch fails it effectively is our new configuration. # even if the launch fails it effectively is our new configuration.
@ -318,37 +332,59 @@ event Management::Agent::API::deploy_request(reqid: string, config: Management::
for ( inst in config$instances ) for ( inst in config$instances )
g_instances[inst$name] = inst; g_instances[inst$name] = inst;
# Terminate existing nodes local areq = Management::Request::create();
for ( nodename in g_nodes ) areq$parent_id = req$id;
supervisor_destroy(nodename); areq$finish = deploy_request_finish;
areq$supervisor_state_agent = SupervisorState();
Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id));
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
}
function deploy_request_finish(areq: Management::Request::Request)
{
local status = areq$supervisor_state_agent$status;
for ( nodename in status$nodes )
{
if ( "ZEEK_MANAGEMENT_NODE" in status$nodes[nodename]$node$env )
next;
supervisor_destroy(status$nodes[nodename]$node$name);
}
local req = Management::Request::lookup(areq$parent_id);
if ( Management::Request::is_null(req) )
return;
local res: Management::Result;
local nc: Supervisor::NodeConfig;
local node: Management::Node;
# Refresh the cluster and nodes tables # Refresh the cluster and nodes tables
g_nodes = table(); g_nodes = table();
g_cluster = table(); g_cluster = table();
# Special case: the config contains no nodes. We can respond right away. # Special case: the config contains no nodes. We can respond right away.
if ( |config$nodes| == 0 ) if ( |g_config$nodes| == 0 )
{ {
g_config_reqid_pending = ""; g_config_reqid_pending = "";
res = Management::Result( res = Management::Result(
$reqid = reqid, $reqid = req$id,
$instance = Management::Agent::get_name()); $instance = Management::Agent::get_name());
Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s", Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
Management::Agent::API::deploy_response, reqid, vector(res)); Management::Agent::API::deploy_response, req$id, vector(res));
return; return;
} }
local req = Management::Request::create(reqid);
req$deploy_state_agent = DeployState();
# Establish this request as the pending one: # Establish this request as the pending one:
g_config_reqid_pending = reqid; g_config_reqid_pending = req$id;
for ( node in config$nodes ) for ( node in g_config$nodes )
{ {
# Filter the node set down to the ones this agent manages. # Filter the node set down to the ones this agent manages.
if ( node$instance == Management::Agent::get_name() ) if ( node$instance == Management::Agent::get_name() )
@ -446,17 +482,43 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
if ( ! req?$supervisor_state_agent )
return;
req$supervisor_state_agent$status = result;
Management::Request::finish(reqid); Management::Request::finish(reqid);
}
local res = Management::Result( event Management::Agent::API::get_nodes_request(reqid: string)
$reqid = req$parent_id, $instance = Management::Agent::get_name()); {
Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = Management::Request::create(reqid);
local areq = Management::Request::create();
areq$parent_id = req$id;
areq$finish = get_nodes_request_finish;
areq$supervisor_state_agent = SupervisorState();
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
Management::Log::info(fmt("issued supervisor status, %s", areq$id));
}
function get_nodes_request_finish(areq: Management::Request::Request)
{
local req = Management::Request::lookup(areq$parent_id);
if ( Management::Request::is_null(req) )
return;
local res = Management::Result($reqid=req$id,
$instance=Management::Agent::get_name());
local node_statuses: Management::NodeStatusVec; local node_statuses: Management::NodeStatusVec;
for ( node in result$nodes ) for ( node in areq$supervisor_state_agent$status$nodes )
{ {
local sns = result$nodes[node]; # Supervisor node status local sns = areq$supervisor_state_agent$status$nodes[node]; # Supervisor node status
local cns = Management::NodeStatus( local cns = Management::NodeStatus(
$node=node, $state=Management::PENDING); $node=node, $state=Management::PENDING);
@ -527,19 +589,8 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
Management::Agent::API::get_nodes_response, req$parent_id, res); Management::Agent::API::get_nodes_response, req$id, res);
} Management::Request::finish(req$id);
event Management::Agent::API::get_nodes_request(reqid: string)
{
Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = Management::Request::create();
req$parent_id = reqid;
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, req$id, "");
Management::Log::info(fmt("issued supervisor status, %s", req$id));
} }
event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result)