mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Management framework: node restart support
This adds restart request/response event pairs that restart nodes in the running Zeek cluster. The implementation is very similar to get_id_value, which also involves distributing a list of nodes to agents and aggregating the responses.
This commit is contained in:
parent
bd39207772
commit
b9879a50a0
4 changed files with 354 additions and 0 deletions
|
@ -146,6 +146,35 @@ export {
|
||||||
result: Management::Result);
|
result: Management::Result);
|
||||||
|
|
||||||
|
|
||||||
|
## The controller sends this event to ask the agent to restart currently
|
||||||
|
## running Zeek cluster nodes. For nodes currently running, the agent
|
||||||
|
## places these nodes into PENDING state and sends restart events to the
|
||||||
|
## Supervisor, rendering its responses into a list of
|
||||||
|
## :zeek:see:`Management::Result` records summarizing each node restart.
|
||||||
|
## When restarted nodes check in with the agent, they switch back to
|
||||||
|
## RUNNING state. The agent ignores nodes not currently running.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
## nodes: a set of cluster node names (e.g. "worker-01") to restart. An
|
||||||
|
## empty set, supplied by default, means restart of all of the
|
||||||
|
## agent's current cluster nodes.
|
||||||
|
##
|
||||||
|
global restart_request: event(reqid: string, nodes: set[string] &default=set());
|
||||||
|
|
||||||
|
## Response to a :zeek:see:`Management::Agent::API::restart_request`
|
||||||
|
## event. The agent sends this back to the controller when the
|
||||||
|
## Supervisor has restarted all nodes affected, or a timoeut occurs.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, one
|
||||||
|
## for each Supervisor transaction. Each such result identifies both
|
||||||
|
## the instance and node.
|
||||||
|
##
|
||||||
|
global restart_response: event(reqid: string, results: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
# Notification events, agent -> controller
|
# Notification events, agent -> controller
|
||||||
|
|
||||||
## The agent sends this event upon peering as a "check-in", informing
|
## The agent sends this event upon peering as a "check-in", informing
|
||||||
|
|
|
@ -27,6 +27,8 @@ export {
|
||||||
node: string &default="";
|
node: string &default="";
|
||||||
## The result of a status request.
|
## The result of a status request.
|
||||||
status: Supervisor::Status &optional;
|
status: Supervisor::Status &optional;
|
||||||
|
## The result of a restart request.
|
||||||
|
restart_result: bool &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
## Request state for deploy requests.
|
## Request state for deploy requests.
|
||||||
|
@ -47,6 +49,13 @@ export {
|
||||||
requests: set[string] &default=set();
|
requests: set[string] &default=set();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Request state for restart requests, tracking eceived responses.
|
||||||
|
type RestartState: record {
|
||||||
|
## Request state for every node the agent asks the Supervisor
|
||||||
|
## to restart.
|
||||||
|
requests: set[string] &default=set();
|
||||||
|
};
|
||||||
|
|
||||||
# When Management::Agent::archive_logs is T (the default) and the
|
# When Management::Agent::archive_logs is T (the default) and the
|
||||||
# logging configuration doesn't permanently prevent archival
|
# logging configuration doesn't permanently prevent archival
|
||||||
# (e.g. because log rotation isn't configured), the agent triggers this
|
# (e.g. because log rotation isn't configured), the agent triggers this
|
||||||
|
@ -67,6 +76,7 @@ redef record Management::Request::Request += {
|
||||||
supervisor_state_agent: SupervisorState &optional;
|
supervisor_state_agent: SupervisorState &optional;
|
||||||
deploy_state_agent: DeployState &optional;
|
deploy_state_agent: DeployState &optional;
|
||||||
node_dispatch_state_agent: NodeDispatchState &optional;
|
node_dispatch_state_agent: NodeDispatchState &optional;
|
||||||
|
restart_state_agent: RestartState &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
# Tag our logs correctly
|
# Tag our logs correctly
|
||||||
|
@ -110,6 +120,10 @@ global send_deploy_response: function(req: Management::Request::Request);
|
||||||
# a status response.
|
# a status response.
|
||||||
global deploy_request_finish: function(req: Management::Request::Request);
|
global deploy_request_finish: function(req: Management::Request::Request);
|
||||||
|
|
||||||
|
# Callback completing a restart_request after the Supervisor has delivered
|
||||||
|
# a restart response.
|
||||||
|
global restart_request_finish: function(req: Management::Request::Request);
|
||||||
|
|
||||||
# Callback completing a get_nodes_request after the Supervisor has delivered
|
# Callback completing a get_nodes_request after the Supervisor has delivered
|
||||||
# a status response.
|
# a status response.
|
||||||
global get_nodes_request_finish: function(req: Management::Request::Request);
|
global get_nodes_request_finish: function(req: Management::Request::Request);
|
||||||
|
@ -858,6 +872,124 @@ event Management::Agent::API::agent_standby_request(reqid: string)
|
||||||
Management::Agent::API::agent_standby_response, reqid, res);
|
Management::Agent::API::agent_standby_response, reqid, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function restart_request_finish(sreq: Management::Request::Request)
|
||||||
|
{
|
||||||
|
# This is the finish callback we set on requests to the Supervisor to
|
||||||
|
# restart a node. We look up the parent request (the one sent to us by
|
||||||
|
# the controller), mark the node in question as done, and respond to the
|
||||||
|
# controller if we've handled all required nodes.
|
||||||
|
|
||||||
|
local req = Management::Request::lookup(sreq$parent_id);
|
||||||
|
if ( Management::Request::is_null(req) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
local node = sreq$supervisor_state_agent$node;
|
||||||
|
|
||||||
|
local res = Management::Result(
|
||||||
|
$reqid = req$id,
|
||||||
|
$instance = Management::Agent::get_name(),
|
||||||
|
$node = node);
|
||||||
|
|
||||||
|
if ( ! sreq$supervisor_state_agent$restart_result )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = fmt("could not restart node %s", node);
|
||||||
|
}
|
||||||
|
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
if ( node in req$restart_state_agent$requests )
|
||||||
|
{
|
||||||
|
delete req$restart_state_agent$requests[node];
|
||||||
|
if ( |req$restart_state_agent$requests| > 0 )
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(agent_topic(),
|
||||||
|
Management::Agent::API::restart_response,
|
||||||
|
req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Agent::API::restart_request(reqid: string, nodes: set[string])
|
||||||
|
{
|
||||||
|
# This is very similar to node_dispatch_request, because it too works
|
||||||
|
# with a list of nodes that needs to be dispatched to agents.
|
||||||
|
|
||||||
|
Management::Log::info(fmt("rx Management::Agent::API::restart_request %s %s",
|
||||||
|
reqid, Management::Util::set_to_vector(nodes)));
|
||||||
|
|
||||||
|
local node: string;
|
||||||
|
local cluster_nodes: set[string];
|
||||||
|
local nodes_final: set[string];
|
||||||
|
|
||||||
|
for ( node in g_nodes )
|
||||||
|
add cluster_nodes[node];
|
||||||
|
|
||||||
|
# If this request includes cluster nodes to query, check if this agent
|
||||||
|
# manages any of those nodes. If it doesn't, respond with an empty
|
||||||
|
# results vector immediately. Note that any globally unknown nodes
|
||||||
|
# that the client might have requested already got filtered by the
|
||||||
|
# controller, so we don't need to worry about them here.
|
||||||
|
|
||||||
|
if ( |nodes| > 0 )
|
||||||
|
{
|
||||||
|
nodes_final = nodes & cluster_nodes;
|
||||||
|
|
||||||
|
if ( |nodes_final| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::restart_response %s, no node overlap",
|
||||||
|
reqid));
|
||||||
|
Broker::publish(agent_topic(),
|
||||||
|
Management::Agent::API::restart_response, reqid, vector());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ( |g_nodes| == 0 )
|
||||||
|
{
|
||||||
|
# Special case: the client did not request specific nodes. If
|
||||||
|
# we aren't running any nodes, respond right away, since there's
|
||||||
|
# nothing to restart.
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::restart_response %s, no nodes registered",
|
||||||
|
reqid));
|
||||||
|
Broker::publish(agent_topic(),
|
||||||
|
Management::Agent::API::restart_response, reqid, vector());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
# We restart all nodes.
|
||||||
|
nodes_final = cluster_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
local res: Management::Result;
|
||||||
|
local req = Management::Request::create(reqid);
|
||||||
|
|
||||||
|
req$restart_state_agent = RestartState();
|
||||||
|
|
||||||
|
# Build up state for tracking responses.
|
||||||
|
for ( node in nodes_final )
|
||||||
|
add req$restart_state_agent$requests[node];
|
||||||
|
|
||||||
|
# Ask the Supervisor to restart nodes. We need to enumerate the nodes
|
||||||
|
# because restarting all (via "") would hit the agent (and the
|
||||||
|
# controller, if co-located).
|
||||||
|
for ( node in nodes_final )
|
||||||
|
{
|
||||||
|
local sreq = supervisor_restart(node);
|
||||||
|
sreq$parent_id = reqid;
|
||||||
|
sreq$finish = restart_request_finish;
|
||||||
|
|
||||||
|
if ( node in g_nodes )
|
||||||
|
g_nodes[node]$state = Management::PENDING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
event Management::Node::API::notify_node_hello(node: string)
|
event Management::Node::API::notify_node_hello(node: string)
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node));
|
Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node));
|
||||||
|
@ -900,6 +1032,14 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
# This timeout means we no longer have a pending request.
|
# This timeout means we no longer have a pending request.
|
||||||
g_config_reqid_pending = "";
|
g_config_reqid_pending = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( req?$restart_state_agent )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("tx Management::Agent::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(agent_topic(),
|
||||||
|
Management::Agent::API::restart_response, req$id, req$results);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
|
|
|
@ -170,6 +170,36 @@ export {
|
||||||
global get_id_value_response: event(reqid: string, results: Management::ResultVec);
|
global get_id_value_response: event(reqid: string, results: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
|
## The client sends this event to restart currently running Zeek cluster
|
||||||
|
## nodes. The controller relays the request to its agents, which respond
|
||||||
|
## with a list of :zeek:see:`Management::Result` records summarizing
|
||||||
|
## each node restart. The controller combines these lists, and sends a
|
||||||
|
## :zeek:see:`Management::Controller::API::restart_response` event with
|
||||||
|
## the result.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
## nodes: a set of cluster node names (e.g. "worker-01") to restart. An
|
||||||
|
## empty set, supplied by default, means restart of all current
|
||||||
|
## cluster nodes.
|
||||||
|
##
|
||||||
|
global restart_request: event(reqid: string, nodes: set[string] &default=set());
|
||||||
|
|
||||||
|
## Response to a :zeek:see:`Management::Controller::API::restart_request`
|
||||||
|
## event. The controller sends this back to the client when it has received
|
||||||
|
## responses from all agents involved, or a timeout occurs.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`,
|
||||||
|
## combining the restart results from all agents. Each such result
|
||||||
|
## identifies both the instance and node in question. Results that
|
||||||
|
## do not identify an instance are generated by the controller,
|
||||||
|
## flagging corner cases, including absence of a deployed cluster
|
||||||
|
## or unknown nodes.
|
||||||
|
##
|
||||||
|
global restart_response: event(reqid: string, results: Management::ResultVec);
|
||||||
|
|
||||||
# Testing events. These don't provide operational value but expose
|
# Testing events. These don't provide operational value but expose
|
||||||
# internal functionality, triggered by test cases.
|
# internal functionality, triggered by test cases.
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,14 @@ export {
|
||||||
requests: set[string] &default=set();
|
requests: set[string] &default=set();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Request state specific to
|
||||||
|
## :zeek:see:`Management::Controller::API::restart_request` and
|
||||||
|
## :zeek:see:`Management::Controller::API::restart_response`.
|
||||||
|
type RestartState: record {
|
||||||
|
## Request state for every controller/agent transaction.
|
||||||
|
requests: set[string] &default=set();
|
||||||
|
};
|
||||||
|
|
||||||
## Dummy state for internal state-keeping test cases.
|
## Dummy state for internal state-keeping test cases.
|
||||||
type TestState: record { };
|
type TestState: record { };
|
||||||
}
|
}
|
||||||
|
@ -78,6 +86,7 @@ redef record Management::Request::Request += {
|
||||||
deploy_state: DeployState &optional;
|
deploy_state: DeployState &optional;
|
||||||
get_nodes_state: GetNodesState &optional;
|
get_nodes_state: GetNodesState &optional;
|
||||||
node_dispatch_state: NodeDispatchState &optional;
|
node_dispatch_state: NodeDispatchState &optional;
|
||||||
|
restart_state: RestartState &optional;
|
||||||
test_state: TestState &optional;
|
test_state: TestState &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1238,6 +1247,141 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Management::Agent::API::restart_response(reqid: string, results: Management::ResultVec)
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Agent::API::restart_response %s", reqid));
|
||||||
|
|
||||||
|
# Retrieve state for the request we just got a response to
|
||||||
|
local areq = Management::Request::lookup(reqid);
|
||||||
|
if ( Management::Request::is_null(areq) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Release the request, since this agent is now done.
|
||||||
|
Management::Request::finish(areq$id);
|
||||||
|
|
||||||
|
# Find the original request from the client
|
||||||
|
local req = Management::Request::lookup(areq$parent_id);
|
||||||
|
if ( Management::Request::is_null(req) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Add this agent's results to the overall response
|
||||||
|
for ( i in results )
|
||||||
|
req$results += results[i];
|
||||||
|
|
||||||
|
# Mark this request as done:
|
||||||
|
if ( areq$id in req$restart_state$requests )
|
||||||
|
delete req$restart_state$requests[areq$id];
|
||||||
|
|
||||||
|
# If we still have pending queries out to the agents, do nothing: we'll
|
||||||
|
# handle this soon, or our request will time out and we respond with
|
||||||
|
# error.
|
||||||
|
if ( |req$restart_state$requests| > 0 )
|
||||||
|
return;
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::restart_response,
|
||||||
|
req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Controller::API::restart_request(reqid: string, nodes: set[string])
|
||||||
|
{
|
||||||
|
# This works almost exactly like get_id_value_request, because it too
|
||||||
|
# works with a list of nodes that needs to be dispatched to agents.
|
||||||
|
|
||||||
|
local send_error_response = function(req: Management::Request::Request, error: string)
|
||||||
|
{
|
||||||
|
local res = Management::Result($reqid=req$id, $success=F, $error=error);
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::restart_response, req$id, req$results);
|
||||||
|
};
|
||||||
|
|
||||||
|
Management::Log::info(fmt("rx Management::Controller::API::restart_request %s %s",
|
||||||
|
reqid, Management::Util::set_to_vector(nodes)));
|
||||||
|
|
||||||
|
local res: Management::Result;
|
||||||
|
local req = Management::Request::create(reqid);
|
||||||
|
req$restart_state = RestartState();
|
||||||
|
|
||||||
|
# Special cases: if we have no instances or no deployment, respond right away.
|
||||||
|
if ( |g_instances_known| == 0 )
|
||||||
|
{
|
||||||
|
send_error_response(req, "no instances connected");
|
||||||
|
Management::Request::finish(reqid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( DEPLOYED !in g_configs )
|
||||||
|
{
|
||||||
|
send_error_response(req, "no active cluster deployment");
|
||||||
|
Management::Request::finish(reqid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
local nodes_final: set[string];
|
||||||
|
local node: string;
|
||||||
|
|
||||||
|
# Input sanitization: check for any requested nodes that aren't part of
|
||||||
|
# the deployed configuration. We send back error results for those and
|
||||||
|
# don't propagate them to the agents.
|
||||||
|
if ( |nodes| > 0 )
|
||||||
|
{
|
||||||
|
# Requested nodes that are in the deployed configuration:
|
||||||
|
nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes);
|
||||||
|
# Requested nodes that are not in current configuration:
|
||||||
|
local nodes_invalid = nodes - nodes_final;
|
||||||
|
|
||||||
|
# Assemble error results for all invalid nodes
|
||||||
|
for ( node in nodes_invalid )
|
||||||
|
{
|
||||||
|
res = Management::Result($reqid=reqid, $node=node);
|
||||||
|
res$success = F;
|
||||||
|
res$error = "unknown cluster node";
|
||||||
|
req$results += res;
|
||||||
|
}
|
||||||
|
|
||||||
|
# If only invalid nodes got requested, we're now done.
|
||||||
|
if ( |nodes_final| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::restart_response,
|
||||||
|
req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for ( name in g_instances )
|
||||||
|
{
|
||||||
|
if ( name !in g_instances_ready )
|
||||||
|
next;
|
||||||
|
|
||||||
|
local agent_topic = Management::Agent::topic_prefix + "/" + name;
|
||||||
|
local areq = Management::Request::create();
|
||||||
|
|
||||||
|
areq$parent_id = req$id;
|
||||||
|
add req$restart_state$requests[areq$id];
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::restart_request %s to %s",
|
||||||
|
areq$id, name));
|
||||||
|
|
||||||
|
Broker::publish(agent_topic,
|
||||||
|
Management::Agent::API::restart_request,
|
||||||
|
areq$id, nodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
event Management::Request::request_expired(req: Management::Request::Request)
|
event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
{
|
{
|
||||||
# Various handlers for timed-out request state. We use the state members
|
# Various handlers for timed-out request state. We use the state members
|
||||||
|
@ -1296,6 +1440,17 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( req?$restart_state )
|
||||||
|
{
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::restart_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::restart_response,
|
||||||
|
req$id, req$results);
|
||||||
|
}
|
||||||
|
|
||||||
if ( req?$test_state )
|
if ( req?$test_state )
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
|
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue