Merge branch 'topic/christian/management-restart'

* topic/christian/management-restart:
  Management framework: bump external cluster testsuite
  Management framework: bump zeek-client
  Management framework: edit pass over docstrings
  Management framework: node restart support
  Management framework: more consistent Supervisor interaction in the agent
  Management framework: log the controller's startup deployment attempt
  Management framework: bugfix for a get_id_value corner case
  Management framework: minor timeout bugfix
  Management framework: make "result" argument plural in multi-result response events
This commit is contained in:
Christian Kreibich 2022-06-23 12:25:38 -07:00
commit 3287b8b793
8 changed files with 542 additions and 82 deletions

14
CHANGES
View file

@ -1,3 +1,17 @@
5.1.0-dev.108 | 2022-06-23 12:26:26 -0700
* Management framework: support for cluster node restarts (Christian Kreibich, Corelight)
- bump external cluster testsuite
- bump zeek-client
- edit pass over docstrings
- node restart support
- more consistent Supervisor interaction in the agent
- log the controller's startup deployment attempt
- bugfix for a get_id_value corner case
- minor timeout bugfix
- make "result" argument plural in multi-result response events
5.1.0-dev.98 | 2022-06-22 22:39:32 -0700 5.1.0-dev.98 | 2022-06-22 22:39:32 -0700
* Management framework: separate config staging and deployment (Christian Kreibich, Corelight) * Management framework: separate config staging and deployment (Christian Kreibich, Corelight)

View file

@ -1 +1 @@
5.1.0-dev.98 5.1.0-dev.108

@ -1 +1 @@
Subproject commit d92f17cfd882dd1edbd3f560181d84f69fbc8037 Subproject commit c1f09a084668cfc8d58d4be91e006d5a14386c0e

View file

@ -33,15 +33,18 @@ export {
global deploy_request: event(reqid: string, global deploy_request: event(reqid: string,
config: Management::Configuration, force: bool &default=F); config: Management::Configuration, force: bool &default=F);
## Response to a deploy_request event. The agent sends ## Response to a :zeek:see:`Management::Agent::API::deploy_request`
## this back to the controller. ## event. The agent sends this back to the controller.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: the result record. ## results: A vector of :zeek:see:`Management::Result` records, each
## capturing the outcome of a single launched node. For failing
## nodes, the result's data field is a
## :zeek:see:`Management::NodeOutputs` record.
## ##
global deploy_response: event(reqid: string, global deploy_response: event(reqid: string,
result: Management::ResultVec); results: Management::ResultVec);
## The controller sends this event to request a list of ## The controller sends this event to request a list of
@ -53,8 +56,8 @@ export {
## ##
global get_nodes_request: event(reqid: string); global get_nodes_request: event(reqid: string);
## Response to a get_nodes_request event. The agent sends this back to the ## Response to a :zeek:see:`Management::Agent::API::get_nodes_request`
## controller. ## event. The agent sends this back to the controller.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
@ -86,31 +89,35 @@ export {
global node_dispatch_request: event(reqid: string, action: vector of string, global node_dispatch_request: event(reqid: string, action: vector of string,
nodes: set[string] &default=set()); nodes: set[string] &default=set());
## Response to a node_dispatch_request event. Each agent sends this back ## Response to a
## to the controller to report the dispatch outcomes on all nodes managed ## :zeek:see:`Management::Agent::API::node_dispatch_request` event. Each
## by that agent. ## agent sends this back to the controller to report the dispatch
## outcomes on all nodes managed by that agent.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`
## records. Each record covers one Zeek cluster node managed by this ## records. Each record covers one Zeek cluster node managed by this
## agent. Upon success, each :zeek:see:`Management::Result` record's ## agent. Upon success, each :zeek:see:`Management::Result` record's
## data member contains the dispatches' response in a data type ## data member contains the dispatches' response in a data type
## appropriate for the respective dispatch. ## appropriate for the respective dispatch.
## ##
global node_dispatch_response: event(reqid: string, result: Management::ResultVec); global node_dispatch_response: event(reqid: string, results: Management::ResultVec);
## The controller sends this event to confirm to the agent that it is ## The controller sends this event to confirm to the agent that it is
## part of the current cluster topology. The agent acknowledges with the ## part of the current cluster topology. The agent acknowledges with a
## corresponding response event. ## :zeek:see:`Management::Agent::API::agent_welcome_response` event,
## upon which the controller may proceed with a cluster deployment to
## this agent.
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
global agent_welcome_request: event(reqid: string); global agent_welcome_request: event(reqid: string);
## Response to an agent_welcome_request event. The agent sends this ## Response to a
## back to the controller. ## :zeek:see:`Management::Agent::API::agent_welcome_request` event. The
## agent sends this back to the controller.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
@ -132,8 +139,9 @@ export {
## ##
global agent_standby_request: event(reqid: string); global agent_standby_request: event(reqid: string);
## Response to an agent_standby_request event. The agent sends this ## Response to a
## back to the controller. ## :zeek:see:`Management::Agent::API::agent_standby_request` event. The
## agent sends this back to the controller.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
@ -143,6 +151,35 @@ export {
result: Management::Result); result: Management::Result);
## The controller sends this event to ask the agent to restart currently
## running Zeek cluster nodes. For nodes currently running, the agent
## places these nodes into PENDING state and sends restart events to the
## Supervisor, rendering its responses into a list of
## :zeek:see:`Management::Result` records summarizing each node restart.
## When restarted nodes check in with the agent, they switch back to
## RUNNING state. The agent ignores nodes not currently running.
##
## reqid: a request identifier string, echoed in the response event.
##
## nodes: a set of cluster node names (e.g. "worker-01") to restart. An
## empty set, supplied by default, means restart of all of the
## agent's current cluster nodes.
##
global restart_request: event(reqid: string, nodes: set[string] &default=set());
## Response to a :zeek:see:`Management::Agent::API::restart_request`
## event. The agent sends this back to the controller when the
## Supervisor has restarted all nodes affected, or a timoeut occurs.
##
## reqid: the request identifier used in the request event.
##
## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, one
## for each Supervisor transaction. Each such result identifies both
## the instance and node.
##
global restart_response: event(reqid: string, results: Management::ResultVec);
# Notification events, agent -> controller # Notification events, agent -> controller
## The agent sends this event upon peering as a "check-in", informing ## The agent sends this event upon peering as a "check-in", informing
@ -164,7 +201,7 @@ export {
connecting: bool, api_version: count); connecting: bool, api_version: count);
# The following are not yet implemented. # The following are not yet meaningfully implemented.
# Report node state changes. # Report node state changes.
global notify_change: event(instance: string, global notify_change: event(instance: string,

View file

@ -27,6 +27,8 @@ export {
node: string &default=""; node: string &default="";
## The result of a status request. ## The result of a status request.
status: Supervisor::Status &optional; status: Supervisor::Status &optional;
## The result of a restart request.
restart_result: bool &optional;
}; };
## Request state for deploy requests. ## Request state for deploy requests.
@ -47,6 +49,13 @@ export {
requests: set[string] &default=set(); requests: set[string] &default=set();
}; };
## Request state for restart requests, tracking eceived responses.
type RestartState: record {
## Request state for every node the agent asks the Supervisor
## to restart.
requests: set[string] &default=set();
};
# When Management::Agent::archive_logs is T (the default) and the # When Management::Agent::archive_logs is T (the default) and the
# logging configuration doesn't permanently prevent archival # logging configuration doesn't permanently prevent archival
# (e.g. because log rotation isn't configured), the agent triggers this # (e.g. because log rotation isn't configured), the agent triggers this
@ -67,6 +76,7 @@ redef record Management::Request::Request += {
supervisor_state_agent: SupervisorState &optional; supervisor_state_agent: SupervisorState &optional;
deploy_state_agent: DeployState &optional; deploy_state_agent: DeployState &optional;
node_dispatch_state_agent: NodeDispatchState &optional; node_dispatch_state_agent: NodeDispatchState &optional;
restart_state_agent: RestartState &optional;
}; };
# Tag our logs correctly # Tag our logs correctly
@ -86,6 +96,22 @@ global agent_topic: function(): string;
# Returns the effective supervisor's address and port, to peer with # Returns the effective supervisor's address and port, to peer with
global supervisor_network_info: function(): Broker::NetworkInfo; global supervisor_network_info: function(): Broker::NetworkInfo;
# Wrapper for sending a SupervisorControl::status_request to the Supervisor.
# Establishes a request object for the transaction, and returns it.
global supervisor_status: function(node: string): Management::Request::Request;
# Wrapper for sending a SupervisorControl::create_request to the Supervisor.
# Establishes a request object for the transaction, and returns it.
global supervisor_create: function(nc: Supervisor::NodeConfig): Management::Request::Request;
# Wrapper for sending a SupervisorControl::destroy_request to the Supervisor.
# Establishes a request object for the transaction, and returns it.
global supervisor_destroy: function(node: string): Management::Request::Request;
# Wrapper for sending a SupervisorControl::restart_request to the Supervisor.
# Establishes a request object for the transaction, and returns it.
global supervisor_restart: function(node: string): Management::Request::Request;
# Finalizes a deploy_request transaction: cleans up remaining state # Finalizes a deploy_request transaction: cleans up remaining state
# and sends response event. # and sends response event.
global send_deploy_response: function(req: Management::Request::Request); global send_deploy_response: function(req: Management::Request::Request);
@ -94,6 +120,10 @@ global send_deploy_response: function(req: Management::Request::Request);
# a status response. # a status response.
global deploy_request_finish: function(req: Management::Request::Request); global deploy_request_finish: function(req: Management::Request::Request);
# Callback completing a restart_request after the Supervisor has delivered
# a restart response.
global restart_request_finish: function(req: Management::Request::Request);
# Callback completing a get_nodes_request after the Supervisor has delivered # Callback completing a get_nodes_request after the Supervisor has delivered
# a status response. # a status response.
global get_nodes_request_finish: function(req: Management::Request::Request); global get_nodes_request_finish: function(req: Management::Request::Request);
@ -231,6 +261,21 @@ event Management::Supervisor::API::notify_node_exit(node: string, outputs: Manag
g_outputs[node] = outputs; g_outputs[node] = outputs;
} }
event SupervisorControl::status_response(reqid: string, result: Supervisor::Status)
{
Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid));
local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) )
return;
if ( ! req?$supervisor_state_agent )
return;
req$supervisor_state_agent$status = result;
Management::Request::finish(reqid);
}
event SupervisorControl::create_response(reqid: string, result: string) event SupervisorControl::create_response(reqid: string, result: string)
{ {
Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result)); Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result));
@ -238,6 +283,8 @@ event SupervisorControl::create_response(reqid: string, result: string)
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
if ( ! req?$supervisor_state_agent )
return;
local name = req$supervisor_state_agent$node; local name = req$supervisor_state_agent$node;
@ -260,6 +307,8 @@ event SupervisorControl::destroy_response(reqid: string, result: bool)
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
if ( ! req?$supervisor_state_agent )
return;
local name = req$supervisor_state_agent$node; local name = req$supervisor_state_agent$node;
@ -275,7 +324,44 @@ event SupervisorControl::destroy_response(reqid: string, result: bool)
Management::Request::finish(reqid); Management::Request::finish(reqid);
} }
function supervisor_create(nc: Supervisor::NodeConfig) event SupervisorControl::restart_response(reqid: string, result: bool)
{
Management::Log::info(fmt("rx SupervisorControl::restart_response %s %s", reqid, result));
local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) )
return;
if ( ! req?$supervisor_state_agent )
return;
local name = req$supervisor_state_agent$node;
req$supervisor_state_agent$restart_result = result;
if ( ! result )
{
local msg = fmt("failed to restart node %s", name);
Management::Log::error(msg);
Broker::publish(agent_topic(),
Management::Agent::API::notify_error,
Management::Agent::get_name(), msg, name);
}
Management::Request::finish(reqid);
}
function supervisor_status(node: string): Management::Request::Request
{
local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = node);
Management::Log::info(fmt("tx SupervisorControl::status_request %s %s", req$id, node));
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, req$id, node);
return req;
}
function supervisor_create(nc: Supervisor::NodeConfig): Management::Request::Request
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = nc$name); req$supervisor_state_agent = SupervisorState($node = nc$name);
@ -283,9 +369,11 @@ function supervisor_create(nc: Supervisor::NodeConfig)
Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name)); Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name));
Broker::publish(SupervisorControl::topic_prefix, Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::create_request, req$id, nc); SupervisorControl::create_request, req$id, nc);
return req;
} }
function supervisor_destroy(node: string) function supervisor_destroy(node: string): Management::Request::Request
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = node); req$supervisor_state_agent = SupervisorState($node = node);
@ -293,6 +381,20 @@ function supervisor_destroy(node: string)
Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node)); Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node));
Broker::publish(SupervisorControl::topic_prefix, Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::destroy_request, req$id, node); SupervisorControl::destroy_request, req$id, node);
return req;
}
function supervisor_restart(node: string): Management::Request::Request
{
local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = node);
Management::Log::info(fmt("tx SupervisorControl::restart_request %s %s", req$id, node));
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::restart_request, req$id, node);
return req;
} }
event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool) event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool)
@ -332,14 +434,9 @@ event Management::Agent::API::deploy_request(reqid: string, config: Management::
for ( inst in config$instances ) for ( inst in config$instances )
g_instances[inst$name] = inst; g_instances[inst$name] = inst;
local areq = Management::Request::create(); local sreq = supervisor_status("");
areq$parent_id = req$id; sreq$parent_id = reqid;
areq$finish = deploy_request_finish; sreq$finish = deploy_request_finish;
areq$supervisor_state_agent = SupervisorState();
Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id));
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
} }
function deploy_request_finish(areq: Management::Request::Request) function deploy_request_finish(areq: Management::Request::Request)
@ -475,34 +572,15 @@ function deploy_request_finish(areq: Management::Request::Request)
# the deploy_response event back to the controller. # the deploy_response event back to the controller.
} }
event SupervisorControl::status_response(reqid: string, result: Supervisor::Status)
{
Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid));
local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) )
return;
if ( ! req?$supervisor_state_agent )
return;
req$supervisor_state_agent$status = result;
Management::Request::finish(reqid);
}
event Management::Agent::API::get_nodes_request(reqid: string) event Management::Agent::API::get_nodes_request(reqid: string)
{ {
Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = Management::Request::create(reqid); local req = Management::Request::create(reqid);
local areq = Management::Request::create(); local sreq = supervisor_status("");
areq$parent_id = req$id; sreq$parent_id = reqid;
areq$finish = get_nodes_request_finish; sreq$finish = get_nodes_request_finish;
areq$supervisor_state_agent = SupervisorState();
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
Management::Log::info(fmt("issued supervisor status, %s", areq$id));
} }
function get_nodes_request_finish(areq: Management::Request::Request) function get_nodes_request_finish(areq: Management::Request::Request)
@ -794,6 +872,124 @@ event Management::Agent::API::agent_standby_request(reqid: string)
Management::Agent::API::agent_standby_response, reqid, res); Management::Agent::API::agent_standby_response, reqid, res);
} }
function restart_request_finish(sreq: Management::Request::Request)
{
# This is the finish callback we set on requests to the Supervisor to
# restart a node. We look up the parent request (the one sent to us by
# the controller), mark the node in question as done, and respond to the
# controller if we've handled all required nodes.
local req = Management::Request::lookup(sreq$parent_id);
if ( Management::Request::is_null(req) )
return;
local node = sreq$supervisor_state_agent$node;
local res = Management::Result(
$reqid = req$id,
$instance = Management::Agent::get_name(),
$node = node);
if ( ! sreq$supervisor_state_agent$restart_result )
{
res$success = F;
res$error = fmt("could not restart node %s", node);
}
req$results += res;
if ( node in req$restart_state_agent$requests )
{
delete req$restart_state_agent$requests[node];
if ( |req$restart_state_agent$requests| > 0 )
return;
}
Management::Log::info(fmt(
"tx Management::Agent::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(agent_topic(),
Management::Agent::API::restart_response,
req$id, req$results);
Management::Request::finish(req$id);
}
event Management::Agent::API::restart_request(reqid: string, nodes: set[string])
{
# This is very similar to node_dispatch_request, because it too works
# with a list of nodes that needs to be dispatched to agents.
Management::Log::info(fmt("rx Management::Agent::API::restart_request %s %s",
reqid, Management::Util::set_to_vector(nodes)));
local node: string;
local cluster_nodes: set[string];
local nodes_final: set[string];
for ( node in g_nodes )
add cluster_nodes[node];
# If this request includes cluster nodes to query, check if this agent
# manages any of those nodes. If it doesn't, respond with an empty
# results vector immediately. Note that any globally unknown nodes
# that the client might have requested already got filtered by the
# controller, so we don't need to worry about them here.
if ( |nodes| > 0 )
{
nodes_final = nodes & cluster_nodes;
if ( |nodes_final| == 0 )
{
Management::Log::info(fmt(
"tx Management::Agent::API::restart_response %s, no node overlap",
reqid));
Broker::publish(agent_topic(),
Management::Agent::API::restart_response, reqid, vector());
return;
}
}
else if ( |g_nodes| == 0 )
{
# Special case: the client did not request specific nodes. If
# we aren't running any nodes, respond right away, since there's
# nothing to restart.
Management::Log::info(fmt(
"tx Management::Agent::API::restart_response %s, no nodes registered",
reqid));
Broker::publish(agent_topic(),
Management::Agent::API::restart_response, reqid, vector());
return;
}
else
{
# We restart all nodes.
nodes_final = cluster_nodes;
}
local res: Management::Result;
local req = Management::Request::create(reqid);
req$restart_state_agent = RestartState();
# Build up state for tracking responses.
for ( node in nodes_final )
add req$restart_state_agent$requests[node];
# Ask the Supervisor to restart nodes. We need to enumerate the nodes
# because restarting all (via "") would hit the agent (and the
# controller, if co-located).
for ( node in nodes_final )
{
local sreq = supervisor_restart(node);
sreq$parent_id = reqid;
sreq$finish = restart_request_finish;
if ( node in g_nodes )
g_nodes[node]$state = Management::PENDING;
}
}
event Management::Node::API::notify_node_hello(node: string) event Management::Node::API::notify_node_hello(node: string)
{ {
Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node));
@ -821,17 +1017,29 @@ event Management::Node::API::notify_node_hello(node: string)
event Management::Request::request_expired(req: Management::Request::Request) event Management::Request::request_expired(req: Management::Request::Request)
{ {
Management::Log::info(fmt("request %s timed out", req$id));
local res = Management::Result($reqid=req$id, local res = Management::Result($reqid=req$id,
$instance = Management::Agent::get_name(), $instance = Management::Agent::get_name(),
$success = F, $success = F,
$error = "request timed out"); $error = "request timed out");
req$results += res;
if ( req?$deploy_state_agent ) if ( req?$deploy_state_agent )
{ {
send_deploy_response(req); send_deploy_response(req);
# This timeout means we no longer have a pending request. # This timeout means we no longer have a pending request.
g_config_reqid_pending = ""; g_config_reqid_pending = "";
} }
if ( req?$restart_state_agent )
{
Management::Log::info(fmt("tx Management::Agent::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(agent_topic(),
Management::Agent::API::restart_response, req$id, req$results);
}
} }
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)

View file

@ -20,13 +20,14 @@ export {
## ##
global get_instances_request: event(reqid: string); global get_instances_request: event(reqid: string);
## Response to a get_instances_request event. The controller sends ## Response to a
## this back to the client. ## :zeek:see:`Management::Controller::API::get_instances_request`
## event. The controller sends this back to the client.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: the result record. Its data member is a ## result: a :zeek:see:`Management::Result`. Its data member is a vector
## :zeek:see:`Management::Instance` record. ## of :zeek:see:`Management::Instance` records.
## ##
global get_instances_response: event(reqid: string, global get_instances_response: event(reqid: string,
result: Management::Result); result: Management::Result);
@ -47,18 +48,20 @@ export {
global stage_configuration_request: event(reqid: string, global stage_configuration_request: event(reqid: string,
config: Management::Configuration); config: Management::Configuration);
## Response to a stage_configuration_request event. The controller sends ## Response to a
## this back to the client, conveying validation results. ## :zeek:see:`Management::Controller::API::stage_configuration_request`
## event. The controller sends this back to the client, conveying
## validation results.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:see:`Management::Result` vector, indicating whether ## results: a :zeek:see:`Management::Result` vector, indicating whether
## the controller accepts the configuration. In case of a success, ## the controller accepts the configuration. In case of a success,
## a single result record indicates so. Otherwise, the sequence is ## a single result record indicates so. Otherwise, the sequence is
## all errors, each indicating a configuration validation error. ## all errors, each indicating a configuration validation error.
## ##
global stage_configuration_response: event(reqid: string, global stage_configuration_response: event(reqid: string,
result: Management::ResultVec); results: Management::ResultVec);
## The client sends this event to retrieve the controller's current ## The client sends this event to retrieve the controller's current
@ -71,8 +74,10 @@ export {
## ##
global get_configuration_request: event(reqid: string, deployed: bool); global get_configuration_request: event(reqid: string, deployed: bool);
## Response to a get_configuration_request event. The controller sends ## Response to a
## this back to the client. ## :zeek:see:`Management::Controller::API::get_configuration_request`
## event. The controller sends this back to the client, with the
## requested configuration.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
@ -100,18 +105,19 @@ export {
## ##
global deploy_request: event(reqid: string); global deploy_request: event(reqid: string);
## Response to a deploy_request event. The controller sends this ## Response to a :zeek:see:`Management::Controller::API::deploy_request`
## back to the client. ## event. The controller sends this back to the client, conveying the
## outcome of the deployment.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a vector of :zeek:see:`Management::Result` records. ## results: a vector of :zeek:see:`Management::Result` records.
## Each member captures the result of launching one cluster ## Each member captures the result of launching one cluster
## node captured in the configuration, or an agent-wide error ## node captured in the configuration, or an agent-wide error
## when the result does not indicate a particular node. ## when the result does not indicate a particular node.
## ##
global deploy_response: event(reqid: string, global deploy_response: event(reqid: string,
result: Management::ResultVec); results: Management::ResultVec);
## The client sends this event to request a list of ## The client sends this event to request a list of
@ -123,19 +129,23 @@ export {
## ##
global get_nodes_request: event(reqid: string); global get_nodes_request: event(reqid: string);
## Response to a get_nodes_request event. The controller sends this ## Response to a
## back to the client. ## :zeek:see:`Management::Controller::API::get_nodes_request` event. The
## controller sends this back to the client, with a description of the
## nodes currently managed by the Supervisors on all connected
## instances. This includes agents and possibly the controller, if it
## runs jointly with an agent.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`
## records. Each record covers one cluster instance. Each record's data ## records. Each record covers one cluster instance. Each record's
## member is a vector of :zeek:see:`Management::NodeStatus` ## data member is a vector of :zeek:see:`Management::NodeStatus`
## records, covering the nodes at that instance. Results may also indicate ## records, covering the nodes at that instance. Results may also
## failure, with error messages indicating what went wrong. ## indicate failure, with error messages indicating what went wrong.
## ##
global get_nodes_response: event(reqid: string, global get_nodes_response: event(reqid: string,
result: Management::ResultVec); results: Management::ResultVec);
## The client sends this event to retrieve the current value of a ## The client sends this event to retrieve the current value of a
@ -156,20 +166,52 @@ export {
global get_id_value_request: event(reqid: string, id: string, global get_id_value_request: event(reqid: string, id: string,
nodes: set[string] &default=set()); nodes: set[string] &default=set());
## Response to a get_id_value_request event. The controller sends this ## Response to a
## back to the client. ## :zeek:see:`Management::Controller::API::get_id_value_request`
## event. The controller sends this back to the client, with a JSON
## representation of the requested global ID on all relevant instances.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`
## records. Each record covers one Zeek cluster node. Each record's ## records. Each record covers one Zeek cluster node. Each record's
## data field contains a string with the JSON rendering (as produced ## data field contains a string with the JSON rendering (as produced
## by :zeek:id:`to_json`, including the error strings it potentially ## by :zeek:id:`to_json`, including the error strings it potentially
## returns). ## returns).
## ##
global get_id_value_response: event(reqid: string, result: Management::ResultVec); global get_id_value_response: event(reqid: string, results: Management::ResultVec);
## The client sends this event to restart currently running Zeek cluster
## nodes. The controller relays the request to its agents, which respond
## with a list of :zeek:see:`Management::Result` records summarizing
## each node restart. The controller combines these lists, and sends a
## :zeek:see:`Management::Controller::API::restart_response` event with
## the result.
##
## reqid: a request identifier string, echoed in the response event.
##
## nodes: a set of cluster node names (e.g. "worker-01") to restart. An
## empty set, supplied by default, means restart of all current
## cluster nodes.
##
global restart_request: event(reqid: string, nodes: set[string] &default=set());
## Response to a :zeek:see:`Management::Controller::API::restart_request`
## event. The controller sends this back to the client when it has received
## responses from all agents involved, or a timeout occurs.
##
## reqid: the request identifier used in the request event.
##
## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`,
## combining the restart results from all agents. Each such result
## identifies both the instance and node in question. Results that
## do not identify an instance are generated by the controller,
## flagging corner cases, including absence of a deployed cluster
## or unknown nodes.
##
global restart_response: event(reqid: string, results: Management::ResultVec);
# Testing events. These don't provide operational value but expose # Testing events. These don't provide operational value but expose
# internal functionality, triggered by test cases. # internal functionality, triggered by test cases.
@ -186,8 +228,10 @@ export {
## ##
global test_timeout_request: event(reqid: string, with_state: bool); global test_timeout_request: event(reqid: string, with_state: bool);
## Response to a test_timeout_request event. The controller sends this ## Response to a
## back to the client if the original request had the with_state flag. ## :zeek:see:`Management::Controller::API::test_timeout_request`
## event. The controller sends this back to the client if the original
## request had the with_state flag.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##

View file

@ -70,6 +70,14 @@ export {
requests: set[string] &default=set(); requests: set[string] &default=set();
}; };
## Request state specific to
## :zeek:see:`Management::Controller::API::restart_request` and
## :zeek:see:`Management::Controller::API::restart_response`.
type RestartState: record {
## Request state for every controller/agent transaction.
requests: set[string] &default=set();
};
## Dummy state for internal state-keeping test cases. ## Dummy state for internal state-keeping test cases.
type TestState: record { }; type TestState: record { };
} }
@ -78,6 +86,7 @@ redef record Management::Request::Request += {
deploy_state: DeployState &optional; deploy_state: DeployState &optional;
get_nodes_state: GetNodesState &optional; get_nodes_state: GetNodesState &optional;
node_dispatch_state: NodeDispatchState &optional; node_dispatch_state: NodeDispatchState &optional;
restart_state: RestartState &optional;
test_state: TestState &optional; test_state: TestState &optional;
}; };
@ -1166,7 +1175,7 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
local res: Management::Result; local res: Management::Result;
# Special case: if we have no instances, respond right away. # Special case: if we have no instances, respond right away.
if ( |g_instances| == 0 ) if ( |g_instances_known| == 0 )
{ {
Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid));
res = Management::Result($reqid=reqid, $success=F, $error="no instances connected"); res = Management::Result($reqid=reqid, $success=F, $error="no instances connected");
@ -1238,6 +1247,141 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
} }
} }
event Management::Agent::API::restart_response(reqid: string, results: Management::ResultVec)
{
Management::Log::info(fmt("rx Management::Agent::API::restart_response %s", reqid));
# Retrieve state for the request we just got a response to
local areq = Management::Request::lookup(reqid);
if ( Management::Request::is_null(areq) )
return;
# Release the request, since this agent is now done.
Management::Request::finish(areq$id);
# Find the original request from the client
local req = Management::Request::lookup(areq$parent_id);
if ( Management::Request::is_null(req) )
return;
# Add this agent's results to the overall response
for ( i in results )
req$results += results[i];
# Mark this request as done:
if ( areq$id in req$restart_state$requests )
delete req$restart_state$requests[areq$id];
# If we still have pending queries out to the agents, do nothing: we'll
# handle this soon, or our request will time out and we respond with
# error.
if ( |req$restart_state$requests| > 0 )
return;
Management::Log::info(fmt(
"tx Management::Controller::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::restart_response,
req$id, req$results);
Management::Request::finish(req$id);
}
event Management::Controller::API::restart_request(reqid: string, nodes: set[string])
{
# This works almost exactly like get_id_value_request, because it too
# works with a list of nodes that needs to be dispatched to agents.
local send_error_response = function(req: Management::Request::Request, error: string)
{
local res = Management::Result($reqid=req$id, $success=F, $error=error);
req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::restart_response, req$id, req$results);
};
Management::Log::info(fmt("rx Management::Controller::API::restart_request %s %s",
reqid, Management::Util::set_to_vector(nodes)));
local res: Management::Result;
local req = Management::Request::create(reqid);
req$restart_state = RestartState();
# Special cases: if we have no instances or no deployment, respond right away.
if ( |g_instances_known| == 0 )
{
send_error_response(req, "no instances connected");
Management::Request::finish(reqid);
return;
}
if ( DEPLOYED !in g_configs )
{
send_error_response(req, "no active cluster deployment");
Management::Request::finish(reqid);
return;
}
local nodes_final: set[string];
local node: string;
# Input sanitization: check for any requested nodes that aren't part of
# the deployed configuration. We send back error results for those and
# don't propagate them to the agents.
if ( |nodes| > 0 )
{
# Requested nodes that are in the deployed configuration:
nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes);
# Requested nodes that are not in current configuration:
local nodes_invalid = nodes - nodes_final;
# Assemble error results for all invalid nodes
for ( node in nodes_invalid )
{
res = Management::Result($reqid=reqid, $node=node);
res$success = F;
res$error = "unknown cluster node";
req$results += res;
}
# If only invalid nodes got requested, we're now done.
if ( |nodes_final| == 0 )
{
Management::Log::info(fmt(
"tx Management::Controller::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::restart_response,
req$id, req$results);
Management::Request::finish(req$id);
return;
}
}
for ( name in g_instances )
{
if ( name !in g_instances_ready )
next;
local agent_topic = Management::Agent::topic_prefix + "/" + name;
local areq = Management::Request::create();
areq$parent_id = req$id;
add req$restart_state$requests[areq$id];
Management::Log::info(fmt(
"tx Management::Agent::API::restart_request %s to %s",
areq$id, name));
Broker::publish(agent_topic,
Management::Agent::API::restart_request,
areq$id, nodes);
}
}
event Management::Request::request_expired(req: Management::Request::Request) event Management::Request::request_expired(req: Management::Request::Request)
{ {
# Various handlers for timed-out request state. We use the state members # Various handlers for timed-out request state. We use the state members
@ -1296,6 +1440,17 @@ event Management::Request::request_expired(req: Management::Request::Request)
} }
} }
if ( req?$restart_state )
{
req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::restart_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::restart_response,
req$id, req$results);
}
if ( req?$test_state ) if ( req?$test_state )
{ {
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
@ -1366,6 +1521,8 @@ event zeek_init()
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T); req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T);
Management::Log::info(fmt("deploying persisted configuration %s, request %s",
g_configs[DEPLOYED]$id, req$id));
deploy(req); deploy(req);
} }
} }

View file

@ -1 +1 @@
e01ffdcd799d3ca2851225994108d500c540fbe2 566b3325abec336c7540feac22a9d543f2629b82