Merge branch 'topic/christian/management-deploy'

* topic/christian/management-deploy: (21 commits)
  Management framework: bump external cluster testsuite
  Management framework: bump zeek-client
  Management framework: rename set_configuration events to stage_configuration
  Management framework: trigger deployment upon when instances are ready
  Management framework: more resilient node shutdown upon deployment
  Management framework: re-trigger deployment upon controller launch
  Management framework: move most deployment handling to internal function
  Management framework: distinguish internally and externally requested deployments
  Management framework: track instances by their Broker IDs
  Management framework: tweak Supervisor event logging
  Management framework: make helper function a local
  Management framework: rename "log_level" to "level"
  Management framework: add "finish" callback to requests
  Management framework: add a helper for rendering result vectors to a string
  Management framework: agents now skip re-deployment of current config
  Management framework: suppress notify_agent_hello upon Supervisor peering
  Management framework: introduce state machine for configs and persist them
  Management framework: introduce deployment API in controller
  Management framework: rename agent "set_configuration" to "deploy"
  Management framework: consistency fixes to the Result record
  ...
This commit is contained in:
Christian Kreibich 2022-06-22 22:38:43 -07:00
commit 54f2f28047
12 changed files with 641 additions and 350 deletions

25
CHANGES
View file

@ -1,3 +1,28 @@
5.1.0-dev.98 | 2022-06-22 22:39:32 -0700
* Management framework: separate config staging and deployment (Christian Kreibich, Corelight)
- bump external cluster testsuite
- bump zeek-client
- rename set_configuration events to stage_configuration
- trigger deployment upon when instances are ready
- more resilient node shutdown upon deployment
- re-trigger deployment upon controller launch
- move most deployment handling to internal function
- distinguish internally and externally requested deployments
- track instances by their Broker IDs
- tweak Supervisor event logging
- make helper function a local
- rename "log_level" to "level"
- add "finish" callback to requests
- add a helper for rendering result vectors to a string
- agents now skip re-deployment of current config
- suppress notify_agent_hello upon Supervisor peering
- introduce state machine for configs and persist them
- introduce deployment API in controller
- rename agent "set_configuration" to "deploy"
- consistency fixes to the Result record
5.1.0-dev.75 | 2022-06-22 12:06:00 -0700 5.1.0-dev.75 | 2022-06-22 12:06:00 -0700
* Provide zeek-client by default (Christian Kreibich, Corelight) * Provide zeek-client by default (Christian Kreibich, Corelight)

View file

@ -1 +1 @@
5.1.0-dev.75 5.1.0-dev.98

@ -1 +1 @@
Subproject commit 0f1ee6489ecc07767cb1f37beb312eae69e5a6d8 Subproject commit d92f17cfd882dd1edbd3f560181d84f69fbc8037

View file

@ -15,29 +15,32 @@ export {
# Agent API events # Agent API events
## The controller sends this event to convey a new cluster configuration ## The controller sends this event to deploy a cluster configuration to
## to the agent. Once processed, the agent responds with the response ## this instance. Once processed, the agent responds with a
## event. ## :zeek:see:`Management::Agent::API::deploy_response` event. event.
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
## config: a :zeek:see:`Management::Configuration` record ## config: a :zeek:see:`Management::Configuration` record describing the
## describing the cluster topology. Note that this contains the full ## cluster topology. This contains the full topology, not just the
## topology, not just the part pertaining to this agent. That's because ## part pertaining to this instance: the cluster framework requires
## the cluster framework requires full cluster visibility to establish ## full cluster visibility to establish needed peerings.
## the needed peerings.
## ##
global set_configuration_request: event(reqid: string, ## force: whether to re-deploy (i.e., restart its Zeek cluster nodes)
config: Management::Configuration); ## when the agent already runs this configuration. This relies on
## the config ID to determine config equality.
##
global deploy_request: event(reqid: string,
config: Management::Configuration, force: bool &default=F);
## Response to a set_configuration_request event. The agent sends ## Response to a deploy_request event. The agent sends
## this back to the controller. ## this back to the controller.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: the result record. ## result: the result record.
## ##
global set_configuration_response: event(reqid: string, global deploy_response: event(reqid: string,
result: Management::ResultVec); result: Management::ResultVec);

View file

@ -23,11 +23,14 @@ module Management::Agent::Runtime;
export { export {
## Request state specific to the agent's Supervisor interactions. ## Request state specific to the agent's Supervisor interactions.
type SupervisorState: record { type SupervisorState: record {
node: string; ##< Name of the node the Supervisor is acting on. ## Name of the node the Supervisor is acting on, if applicable.
node: string &default="";
## The result of a status request.
status: Supervisor::Status &optional;
}; };
## Request state for set_configuration requests. ## Request state for deploy requests.
type SetConfigurationState: record { type DeployState: record {
## Zeek cluster nodes the provided configuration requested ## Zeek cluster nodes the provided configuration requested
## and which have not yet checked in with the agent. ## and which have not yet checked in with the agent.
nodes_pending: set[string]; nodes_pending: set[string];
@ -62,7 +65,7 @@ export {
# members with _agent to disambiguate. # members with _agent to disambiguate.
redef record Management::Request::Request += { redef record Management::Request::Request += {
supervisor_state_agent: SupervisorState &optional; supervisor_state_agent: SupervisorState &optional;
set_configuration_state_agent: SetConfigurationState &optional; deploy_state_agent: DeployState &optional;
node_dispatch_state_agent: NodeDispatchState &optional; node_dispatch_state_agent: NodeDispatchState &optional;
}; };
@ -80,11 +83,22 @@ redef Management::Request::timeout_interval = 5 sec;
# Returns the effective agent topic for this agent. # Returns the effective agent topic for this agent.
global agent_topic: function(): string; global agent_topic: function(): string;
# Finalizes a set_configuration_request transaction: cleans up remaining state # Returns the effective supervisor's address and port, to peer with
# and sends response event. global supervisor_network_info: function(): Broker::NetworkInfo;
global send_set_configuration_response: function(req: Management::Request::Request);
# The global configuration as passed to us by the controller # Finalizes a deploy_request transaction: cleans up remaining state
# and sends response event.
global send_deploy_response: function(req: Management::Request::Request);
# Callback completing a deploy_request after the Supervisor has delivered
# a status response.
global deploy_request_finish: function(req: Management::Request::Request);
# Callback completing a get_nodes_request after the Supervisor has delivered
# a status response.
global get_nodes_request_finish: function(req: Management::Request::Request);
# The global configuration, as deployed by the controller.
global g_config: Management::Configuration; global g_config: Management::Configuration;
# A map to make other instance info accessible # A map to make other instance info accessible
@ -93,10 +107,9 @@ global g_instances: table[string] of Management::Instance;
# A map for the nodes we run on this instance, via this agent. # A map for the nodes we run on this instance, via this agent.
global g_nodes: table[string] of Management::Node; global g_nodes: table[string] of Management::Node;
# The request ID of the most recent configuration update from the controller. # The request ID of the most recent config deployment from the controller. We
# We track it here until the nodes_pending set in the corresponding request's # track it until the nodes_pending set in the corresponding request's
# SetConfigurationState is cleared out, or the corresponding request state hits # DeployState is cleared out, or the corresponding request state hits a timeout.
# a timeout.
global g_config_reqid_pending: string = ""; global g_config_reqid_pending: string = "";
# The complete node map employed by the supervisor to describe the cluster # The complete node map employed by the supervisor to describe the cluster
@ -115,7 +128,20 @@ function agent_topic(): string
return Management::Agent::topic_prefix + "/" + epi$id; return Management::Agent::topic_prefix + "/" + epi$id;
} }
function send_set_configuration_response(req: Management::Request::Request) function supervisor_network_info(): Broker::NetworkInfo
{
# The Supervisor's address defaults to Broker's default, which
# relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker
# internally falls back to listening on any; we pick 127.0.0.1.
local address = Broker::default_listen_address;
if ( address == "" )
address = "127.0.0.1";
return Broker::NetworkInfo($address=address, $bound_port=Broker::default_port);
}
function send_deploy_response(req: Management::Request::Request)
{ {
local node: string; local node: string;
local res: Management::Result; local res: Management::Result;
@ -128,7 +154,7 @@ function send_set_configuration_response(req: Management::Request::Request)
$instance = Management::Agent::get_name(), $instance = Management::Agent::get_name(),
$node = node); $node = node);
if ( node in req$set_configuration_state_agent$nodes_pending ) if ( node in req$deploy_state_agent$nodes_pending )
{ {
# This node failed. # This node failed.
res$success = F; res$success = F;
@ -142,10 +168,10 @@ function send_set_configuration_response(req: Management::Request::Request)
req$results[|req$results|] = res; req$results[|req$results|] = res;
} }
Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
Management::Agent::API::set_configuration_response, req$id, req$results); Management::Agent::API::deploy_response, req$id, req$results);
Management::Request::finish(req$id); Management::Request::finish(req$id);
@ -207,6 +233,8 @@ event Management::Supervisor::API::notify_node_exit(node: string, outputs: Manag
event SupervisorControl::create_response(reqid: string, result: string) event SupervisorControl::create_response(reqid: string, result: string)
{ {
Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result));
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
@ -227,6 +255,8 @@ event SupervisorControl::create_response(reqid: string, result: string)
event SupervisorControl::destroy_response(reqid: string, result: bool) event SupervisorControl::destroy_response(reqid: string, result: bool)
{ {
Management::Log::info(fmt("rx SupervisorControl::destroy_response %s %s", reqid, result));
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
@ -249,28 +279,48 @@ function supervisor_create(nc: Supervisor::NodeConfig)
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = nc$name); req$supervisor_state_agent = SupervisorState($node = nc$name);
Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name));
Broker::publish(SupervisorControl::topic_prefix, Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::create_request, req$id, nc); SupervisorControl::create_request, req$id, nc);
Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id));
} }
function supervisor_destroy(node: string) function supervisor_destroy(node: string)
{ {
local req = Management::Request::create(); local req = Management::Request::create();
req$supervisor_state_agent = SupervisorState($node = node); req$supervisor_state_agent = SupervisorState($node = node);
Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node));
Broker::publish(SupervisorControl::topic_prefix, Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::destroy_request, req$id, node); SupervisorControl::destroy_request, req$id, node);
Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id));
} }
event Management::Agent::API::set_configuration_request(reqid: string, config: Management::Configuration) event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool)
{ {
Management::Log::info(fmt("rx Management::Agent::API::set_configuration_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::deploy_request %s %s", reqid, config$id));
local nodename: string; local nodename: string;
local node: Management::Node; local node: Management::Node;
local nc: Supervisor::NodeConfig; local nc: Supervisor::NodeConfig;
local msg: string; local res: Management::Result;
# Special case: we're already running this configuration.
if ( g_config$id == config$id && ! force )
{
res = Management::Result(
$reqid = reqid,
$instance = Management::Agent::get_name());
Management::Log::info(fmt("already running config %s", config$id));
Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s",
Management::result_to_string(res)));
Broker::publish(agent_topic(),
Management::Agent::API::deploy_response, reqid, vector(res));
return;
}
local req = Management::Request::create(reqid);
req$deploy_state_agent = DeployState();
# Adopt the global configuration provided. The act of trying to launch # Adopt the global configuration provided. The act of trying to launch
# the requested nodes perturbs any existing ones one way or another, so # the requested nodes perturbs any existing ones one way or another, so
@ -282,43 +332,65 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
for ( inst in config$instances ) for ( inst in config$instances )
g_instances[inst$name] = inst; g_instances[inst$name] = inst;
# Terminate existing nodes local areq = Management::Request::create();
for ( nodename in g_nodes ) areq$parent_id = req$id;
supervisor_destroy(nodename); areq$finish = deploy_request_finish;
areq$supervisor_state_agent = SupervisorState();
Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id));
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
}
function deploy_request_finish(areq: Management::Request::Request)
{
local status = areq$supervisor_state_agent$status;
for ( nodename in status$nodes )
{
if ( "ZEEK_MANAGEMENT_NODE" in status$nodes[nodename]$node$env )
next;
supervisor_destroy(status$nodes[nodename]$node$name);
}
local req = Management::Request::lookup(areq$parent_id);
if ( Management::Request::is_null(req) )
return;
local res: Management::Result;
local nc: Supervisor::NodeConfig;
local node: Management::Node;
# Refresh the cluster and nodes tables # Refresh the cluster and nodes tables
g_nodes = table(); g_nodes = table();
g_cluster = table(); g_cluster = table();
# Special case: the config contains no nodes. We can respond right away. # Special case: the config contains no nodes. We can respond right away.
if ( |config$nodes| == 0 ) if ( |g_config$nodes| == 0 )
{ {
g_config_reqid_pending = ""; g_config_reqid_pending = "";
local res = Management::Result( res = Management::Result(
$reqid = reqid, $reqid = req$id,
$instance = Management::Agent::get_name()); $instance = Management::Agent::get_name());
Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
Management::Agent::API::set_configuration_response, reqid, vector(res)); Management::Agent::API::deploy_response, req$id, vector(res));
return; return;
} }
local req = Management::Request::create(reqid);
req$set_configuration_state_agent = SetConfigurationState();
# Establish this request as the pending one: # Establish this request as the pending one:
g_config_reqid_pending = reqid; g_config_reqid_pending = req$id;
for ( node in config$nodes ) for ( node in g_config$nodes )
{ {
# Filter the node set down to the ones this agent manages. # Filter the node set down to the ones this agent manages.
if ( node$instance == Management::Agent::get_name() ) if ( node$instance == Management::Agent::get_name() )
{ {
g_nodes[node$name] = node; g_nodes[node$name] = node;
add req$set_configuration_state_agent$nodes_pending[node$name]; add req$deploy_state_agent$nodes_pending[node$name];
} }
# The cluster and supervisor frameworks require a port for every # The cluster and supervisor frameworks require a port for every
@ -399,25 +471,54 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
# At this point we await Management::Node::API::notify_node_hello events # At this point we await Management::Node::API::notify_node_hello events
# from the new nodes, or a timeout, whichever happens first. These # from the new nodes, or a timeout, whichever happens first. These
# trigger the set_configuration_response event back to the controller. # update the pending nodes in the request state, and eventually trigger
# the deploy_response event back to the controller.
} }
event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) event SupervisorControl::status_response(reqid: string, result: Supervisor::Status)
{ {
Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid));
local req = Management::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( Management::Request::is_null(req) )
return;
if ( ! req?$supervisor_state_agent )
return;
req$supervisor_state_agent$status = result;
Management::Request::finish(reqid);
}
event Management::Agent::API::get_nodes_request(reqid: string)
{
Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = Management::Request::create(reqid);
local areq = Management::Request::create();
areq$parent_id = req$id;
areq$finish = get_nodes_request_finish;
areq$supervisor_state_agent = SupervisorState();
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, areq$id, "");
Management::Log::info(fmt("issued supervisor status, %s", areq$id));
}
function get_nodes_request_finish(areq: Management::Request::Request)
{
local req = Management::Request::lookup(areq$parent_id);
if ( Management::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
Management::Request::finish(reqid); local res = Management::Result($reqid=req$id,
$instance=Management::Agent::get_name());
local res = Management::Result(
$reqid = req$parent_id, $instance = Management::Agent::get_name());
local node_statuses: Management::NodeStatusVec; local node_statuses: Management::NodeStatusVec;
for ( node in result$nodes ) for ( node in areq$supervisor_state_agent$status$nodes )
{ {
local sns = result$nodes[node]; # Supervisor node status local sns = areq$supervisor_state_agent$status$nodes[node]; # Supervisor node status
local cns = Management::NodeStatus( local cns = Management::NodeStatus(
$node=node, $state=Management::PENDING); $node=node, $state=Management::PENDING);
@ -488,19 +589,8 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s",
Management::result_to_string(res))); Management::result_to_string(res)));
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
Management::Agent::API::get_nodes_response, req$parent_id, res); Management::Agent::API::get_nodes_response, req$id, res);
} Management::Request::finish(req$id);
event Management::Agent::API::get_nodes_request(reqid: string)
{
Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = Management::Request::create();
req$parent_id = reqid;
Broker::publish(SupervisorControl::topic_prefix,
SupervisorControl::status_request, req$id, "");
Management::Log::info(fmt("issued supervisor status, %s", req$id));
} }
event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result)
@ -637,9 +727,11 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto
add req$node_dispatch_state_agent$requests[node]; add req$node_dispatch_state_agent$requests[node];
else else
{ {
res = Management::Result($reqid=reqid, $node=node); res = Management::Result($reqid=reqid,
res$success = F; $instance = Management::Agent::get_name(),
res$error = fmt("cluster node %s not in runnning state", node); $success = F,
$error = fmt("cluster node %s not in runnning state", node),
$node=node);
req$results += res; req$results += res;
} }
} }
@ -690,7 +782,7 @@ event Management::Agent::API::agent_standby_request(reqid: string)
# peered/connected -- otherwise there's nothing we can do here via # peered/connected -- otherwise there's nothing we can do here via
# Broker anyway), mainly to keep open the possibility of running # Broker anyway), mainly to keep open the possibility of running
# cluster nodes again later. # cluster nodes again later.
event Management::Agent::API::set_configuration_request("", Management::Configuration()); event Management::Agent::API::deploy_request("", Management::Configuration());
local res = Management::Result( local res = Management::Result(
$reqid = reqid, $reqid = reqid,
@ -710,32 +802,33 @@ event Management::Node::API::notify_node_hello(node: string)
if ( node in g_nodes ) if ( node in g_nodes )
g_nodes[node]$state = Management::RUNNING; g_nodes[node]$state = Management::RUNNING;
# Look up the set_configuration request this node launch was part of (if # Look up the deploy request this node launch was part of (if
# any), and check it off. If it was the last node we expected to launch, # any), and check it off. If it was the last node we expected to launch,
# finalize the request and respond to the controller. # finalize the request and respond to the controller.
local req = Management::Request::lookup(g_config_reqid_pending); local req = Management::Request::lookup(g_config_reqid_pending);
if ( Management::Request::is_null(req) || ! req?$set_configuration_state_agent ) if ( Management::Request::is_null(req) || ! req?$deploy_state_agent )
return; return;
if ( node in req$set_configuration_state_agent$nodes_pending ) if ( node in req$deploy_state_agent$nodes_pending )
{ {
delete req$set_configuration_state_agent$nodes_pending[node]; delete req$deploy_state_agent$nodes_pending[node];
if ( |req$set_configuration_state_agent$nodes_pending| == 0 ) if ( |req$deploy_state_agent$nodes_pending| == 0 )
send_set_configuration_response(req); send_deploy_response(req);
} }
} }
event Management::Request::request_expired(req: Management::Request::Request) event Management::Request::request_expired(req: Management::Request::Request)
{ {
local res = Management::Result($reqid=req$id, local res = Management::Result($reqid=req$id,
$instance = Management::Agent::get_name(),
$success = F, $success = F,
$error = "request timed out"); $error = "request timed out");
if ( req?$set_configuration_state_agent ) if ( req?$deploy_state_agent )
{ {
send_set_configuration_response(req); send_deploy_response(req);
# This timeout means we no longer have a pending request. # This timeout means we no longer have a pending request.
g_config_reqid_pending = ""; g_config_reqid_pending = "";
} }
@ -743,10 +836,16 @@ event Management::Request::request_expired(req: Management::Request::Request)
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
{ {
# This does not (cannot?) immediately verify that the new peer Management::Log::debug(fmt("broker peer %s added: %s", peer, msg));
# is in fact a controller, so we might send this in vain.
# Controllers register the agent upon receipt of the event.
local sni = supervisor_network_info();
if ( peer$network$address == sni$address && peer$network$bound_port == sni$bound_port )
return;
# Supervisor aside, this does not (cannot?) immediately verify that the
# new peer is in fact a controller, so we might send this in vain.
# Controllers register the agent upon receipt of the event.
local epi = Management::Agent::endpoint_info(); local epi = Management::Agent::endpoint_info();
Broker::publish(agent_topic(), Broker::publish(agent_topic(),
@ -765,14 +864,9 @@ event zeek_init()
local epi = Management::Agent::endpoint_info(); local epi = Management::Agent::endpoint_info();
# The agent needs to peer with the supervisor -- this doesn't currently # The agent needs to peer with the supervisor -- this doesn't currently
# happen automatically. The address defaults to Broker's default, which # happen automatically.
# relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker local sni = supervisor_network_info();
# internally falls back to listening on any; we pick 127.0.0.1. Broker::peer(sni$address, sni$bound_port, Broker::default_listen_retry);
local supervisor_addr = Broker::default_listen_address;
if ( supervisor_addr == "" )
supervisor_addr = "127.0.0.1";
Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry);
# Agents need receive communication targeted at it, any responses # Agents need receive communication targeted at it, any responses
# from the supervisor, and any responses from cluster nodes. # from the supervisor, and any responses from cluster nodes.

View file

@ -1,7 +1,7 @@
##! The event API of cluster controllers. Most endpoints consist of event pairs, ##! The event API of cluster controllers. Most endpoints consist of event pairs,
##! where the controller answers a zeek-client request event with a ##! where the controller answers the client's request event with a corresponding
##! corresponding response event. Such event pairs share the same name prefix ##! response event. Such event pairs share the same name prefix and end in
##! and end in "_request" and "_response", respectively. ##! "_request" and "_response", respectively.
@load policy/frameworks/management/types @load policy/frameworks/management/types
@ -9,11 +9,11 @@ module Management::Controller::API;
export { export {
## A simple versioning scheme, used to track basic compatibility of ## A simple versioning scheme, used to track basic compatibility of
## controller, agents, and zeek-client. ## controller, agents, and the client.
const version = 1; const version = 1;
## zeek-client sends this event to request a list of the currently ## The client sends this event to request a list of the currently
## peered agents/instances. ## peered agents/instances.
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
@ -32,37 +32,44 @@ export {
result: Management::Result); result: Management::Result);
## zeek-client sends this event to establish a new cluster configuration, ## Upload a configuration to the controller for later deployment.
## including the full cluster topology. The controller processes the update ## The client sends this event to the controller, which validates the
## and relays it to the agents. Once each has responded (or a timeout occurs) ## configuration and indicates the outcome in its response event. No
## the controller sends a corresponding response event back to the client. ## deployment takes place yet, and existing deployed configurations and
## the running Zeek cluster remain intact. To trigger deployment of an uploaded
## configuration, use :zeek:see:`Management::Controller::API::deploy_request`.
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
## config: a :zeek:see:`Management::Configuration` record ## config: a :zeek:see:`Management::Configuration` record
## specifying the cluster configuration. ## specifying the cluster configuration.
## ##
global set_configuration_request: event(reqid: string, global stage_configuration_request: event(reqid: string,
config: Management::Configuration); config: Management::Configuration);
## Response to a set_configuration_request event. The controller sends ## Response to a stage_configuration_request event. The controller sends
## this back to the client. ## this back to the client, conveying validation results.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a vector of :zeek:see:`Management::Result` records. ## result: a :zeek:see:`Management::Result` vector, indicating whether
## Each member captures one agent's response. ## the controller accepts the configuration. In case of a success,
## a single result record indicates so. Otherwise, the sequence is
## all errors, each indicating a configuration validation error.
## ##
global set_configuration_response: event(reqid: string, global stage_configuration_response: event(reqid: string,
result: Management::ResultVec); result: Management::ResultVec);
## zeek-client sends this event to retrieve the currently deployed ## The client sends this event to retrieve the controller's current
## cluster configuration. ## cluster configuration(s).
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
global get_configuration_request: event(reqid: string); ## deployed: when true, returns the deployed configuration (if any),
## otherwise the staged one (if any).
##
global get_configuration_request: event(reqid: string, deployed: bool);
## Response to a get_configuration_request event. The controller sends ## Response to a get_configuration_request event. The controller sends
## this back to the client. ## this back to the client.
@ -78,7 +85,36 @@ export {
result: Management::Result); result: Management::Result);
## zeek-client sends this event to request a list of ## Trigger deployment of a previously staged configuration. The client
## sends this event to the controller, which deploys the configuration
## to the agents. Agents then terminate any previously running cluster
## nodes and (re-)launch those defined in the new configuration. Once
## each agent has responded (or a timeout occurs), the controller sends
## a response event back to the client, aggregating the results from the
## agents. The controller keeps the staged configuration available for
## download, or re-deployment. In addition, the deployed configuration
## becomes available for download as well, with any augmentations
## (e.g. node ports filled in by auto-assignment) reflected.
##
## reqid: a request identifier string, echoed in the response event.
##
global deploy_request: event(reqid: string);
## Response to a deploy_request event. The controller sends this
## back to the client.
##
## reqid: the request identifier used in the request event.
##
## result: a vector of :zeek:see:`Management::Result` records.
## Each member captures the result of launching one cluster
## node captured in the configuration, or an agent-wide error
## when the result does not indicate a particular node.
##
global deploy_response: event(reqid: string,
result: Management::ResultVec);
## The client sends this event to request a list of
## :zeek:see:`Management::NodeStatus` records that capture ## :zeek:see:`Management::NodeStatus` records that capture
## the status of Supervisor-managed nodes running on the cluster's ## the status of Supervisor-managed nodes running on the cluster's
## instances. ## instances.
@ -102,7 +138,7 @@ export {
result: Management::ResultVec); result: Management::ResultVec);
## zeek-client sends this event to retrieve the current value of a ## The client sends this event to retrieve the current value of a
## variable in Zeek's global namespace, referenced by the given ## variable in Zeek's global namespace, referenced by the given
## identifier (i.e., variable name). The controller asks all agents ## identifier (i.e., variable name). The controller asks all agents
## to retrieve this value from each cluster node, accumulates the ## to retrieve this value from each cluster node, accumulates the

View file

@ -59,6 +59,10 @@ export {
## output gets garbled. ## output gets garbled.
const directory = "" &redef; const directory = "" &redef;
## The name of the Broker store the controller uses to persist internal
## state to disk.
const store_name = "controller";
## Returns the effective name of the controller. ## Returns the effective name of the controller.
global get_name: function(): string; global get_name: function(): string;

View file

@ -18,12 +18,23 @@ module Management::Controller::Runtime;
# Request record below. Without it, it fails to establish link targets for the # Request record below. Without it, it fails to establish link targets for the
# tucked-on types. # tucked-on types.
export { export {
## A cluster configuration uploaded by the client goes through multiple
## states on its way to deployment.
type ConfigState: enum {
STAGED, ##< As provided by the client.
READY, ##< Necessary updates made, e.g. ports filled in.
DEPLOYED, ##< Sent off to the agents for deployment.
};
## Request state specific to ## Request state specific to
## :zeek:see:`Management::Controller::API::set_configuration_request` and ## :zeek:see:`Management::Controller::API::deploy_request` and
## :zeek:see:`Management::Controller::API::set_configuration_response`. ## :zeek:see:`Management::Controller::API::deploy_response`.
type SetConfigurationState: record { type DeployState: record {
## The cluster configuration established with this request ## The cluster configuration the controller is deploying.
config: Management::Configuration; config: Management::Configuration;
## Whether this is a controller-internal deployment, or
## triggered via a request by a remote peer/client.
is_internal: bool &default=F;
## Request state for every controller/agent transaction. ## Request state for every controller/agent transaction.
requests: set[string] &default=set(); requests: set[string] &default=set();
}; };
@ -64,7 +75,7 @@ export {
} }
redef record Management::Request::Request += { redef record Management::Request::Request += {
set_configuration_state: SetConfigurationState &optional; deploy_state: DeployState &optional;
get_nodes_state: GetNodesState &optional; get_nodes_state: GetNodesState &optional;
node_dispatch_state: NodeDispatchState &optional; node_dispatch_state: NodeDispatchState &optional;
test_state: TestState &optional; test_state: TestState &optional;
@ -89,9 +100,11 @@ global add_instance: function(inst: Management::Instance);
# agent_standby_request, so it drops its current cluster nodes (if any). # agent_standby_request, so it drops its current cluster nodes (if any).
global drop_instance: function(inst: Management::Instance); global drop_instance: function(inst: Management::Instance);
# Helpers to simplify handling of config records. # Sends the given configuration to all g_instances members via a
global null_config: function(): Management::Configuration; # Management::Agent::API::deploy_request event. To track responses, it builds up
global is_null_config: function(config: Management::Configuration): bool; # deployment state in the given request for each recipient.
global config_deploy_to_agents: function(config: Management::Configuration,
req: Management::Request::Request);
# Returns list of names of nodes in the given configuration that require a # Returns list of names of nodes in the given configuration that require a
# listening port. Returns empty list if the config has no such nodes. # listening port. Returns empty list if the config has no such nodes.
@ -110,11 +123,10 @@ global config_assign_ports: function(config: Management::Configuration);
global config_validate: function(config: Management::Configuration, global config_validate: function(config: Management::Configuration,
req: Management::Request::Request): bool; req: Management::Request::Request): bool;
# Rejects the given configuration with the given error message. The function # Returns the set of node names in the given configuration that overlaps with
# adds a non-success result record to the given request and send the # the set provided.
# set_configuration_response event back to the client. It does not call finish() global config_filter_nodes_by_name: function(config: Management::Configuration,
# on the request. nodes: set[string]): set[string];
global send_set_configuration_response_error: function(req: Management::Request::Request, error: string);
# Given a Broker ID, this returns the endpoint info associated with it. # Given a Broker ID, this returns the endpoint info associated with it.
# On error, returns a dummy record with an empty ID string. # On error, returns a dummy record with an empty ID string.
@ -145,16 +157,21 @@ global g_instances_known: table[string] of Management::Instance = table();
# instance, and store that in g_instances.) # instance, and store that in g_instances.)
global g_instances_ready: set[string] = set(); global g_instances_ready: set[string] = set();
# The request ID of the most recent configuration update that's come in from # A map from Broker ID values to instance names. When we lose a peering, this
# a client. We track it here until we know we are ready to communicate with all # helps us understand whether it was an instance, and if so, update its state
# agents required by the update. # accordingly.
global g_instances_by_id: table[string] of string;
# The request ID of the most recent deployment request from a client. We track
# it here until we know we are ready to communicate with all agents required for
# the update.
global g_config_reqid_pending: string = ""; global g_config_reqid_pending: string = "";
# The most recent configuration we have successfully deployed. This is also # This table tracks a cluster configuration through its states to deployment.
# the one we send whenever the client requests it. global g_configs: table[ConfigState] of Management::Configuration
global g_config_current: Management::Configuration; &broker_allow_complex_type &backend=Broker::SQLITE;
function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration) function config_deploy_to_agents(config: Management::Configuration, req: Management::Request::Request)
{ {
for ( name in g_instances ) for ( name in g_instances )
{ {
@ -168,12 +185,12 @@ function send_config_to_agents(req: Management::Request::Request, config: Manage
# We track the requests sent off to each agent. As the # We track the requests sent off to each agent. As the
# responses come in, we delete them. Once the requests # responses come in, we delete them. Once the requests
# set is empty, we respond back to the client. # set is empty, we respond back to the client.
add req$set_configuration_state$requests[areq$id]; add req$deploy_state$requests[areq$id];
# We could also broadcast just once on the agent prefix, but # We could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner. # explicit request/response pairs for each agent seems cleaner.
Management::Log::info(fmt("tx Management::Agent::API::set_configuration_request %s to %s", areq$id, name)); Management::Log::info(fmt("tx Management::Agent::API::deploy_request %s to %s", areq$id, name));
Broker::publish(agent_topic, Management::Agent::API::set_configuration_request, areq$id, config); Broker::publish(agent_topic, Management::Agent::API::deploy_request, areq$id, config, F);
} }
} }
@ -239,11 +256,6 @@ function drop_instance(inst: Management::Instance)
Management::Log::info(fmt("dropped instance %s", inst$name)); Management::Log::info(fmt("dropped instance %s", inst$name));
} }
function null_config(): Management::Configuration
{
return Management::Configuration($id="");
}
function config_nodes_lacking_ports(config: Management::Configuration): vector of string function config_nodes_lacking_ports(config: Management::Configuration): vector of string
{ {
local res: vector of string; local res: vector of string;
@ -491,6 +503,18 @@ function config_validate(config: Management::Configuration,
return F; return F;
} }
function config_filter_nodes_by_name(config: Management::Configuration, nodes: set[string])
: set[string]
{
local res: set[string];
local cluster_nodes: set[string];
for ( node in config$nodes )
add cluster_nodes[node$name];
return nodes & cluster_nodes;
}
function find_endpoint(id: string): Broker::EndpointInfo function find_endpoint(id: string): Broker::EndpointInfo
{ {
local peers = Broker::peers(); local peers = Broker::peers();
@ -505,11 +529,6 @@ function find_endpoint(id: string): Broker::EndpointInfo
return Broker::EndpointInfo($id=""); return Broker::EndpointInfo($id="");
} }
function is_null_config(config: Management::Configuration): bool
{
return config$id == "";
}
function is_instance_connectivity_change(inst: Management::Instance): bool function is_instance_connectivity_change(inst: Management::Instance): bool
{ {
# If we're not tracking this instance as part of a cluster config, it's # If we're not tracking this instance as part of a cluster config, it's
@ -536,48 +555,148 @@ function is_instance_connectivity_change(inst: Management::Instance): bool
return F; return F;
} }
function filter_config_nodes_by_name(nodes: set[string]): set[string] function deploy(req: Management::Request::Request)
{ {
local res: set[string]; # This deployment is now pending. It clears when all agents have
local cluster_nodes: set[string]; # processed their config updates successfully, or any responses time
# out.
g_config_reqid_pending = req$id;
for ( node in g_config_current$nodes ) # Compare the instance configuration to our current one. If it matches,
add cluster_nodes[node$name]; # we can proceed to deploying the new cluster topology. If it does not,
# we need to establish connectivity with agents we connect to, or wait
# until all instances that connect to us have done so. Either case
# triggers a notify_agents_ready event, upon which we deploy the config.
return nodes & cluster_nodes; # The current & new set of instance names.
} local insts_current: set[string];
local insts_new: set[string];
function send_set_configuration_response_error(req: Management::Request::Request, error: string) # A set of current instances not contained in the new config.
{ # Those will need to get dropped.
local res = Management::Result($reqid=req$id); local insts_to_drop: set[string];
res$success = F; # The opposite: new instances not yet in our current set. Those we will need
res$error = error; # to establish contact with (or they with us).
req$results += res; local insts_to_add: set[string];
Broker::publish(Management::Controller::topic, # The overlap: instances in both the current and new set. For those we verify
Management::Controller::API::set_configuration_response, req$id, req$results); # that we're actually dealign with the same entities, and might need to re-
# connect if not.
local insts_to_keep: set[string];
# Alternative representation of insts_to_add, directly providing the instances.
local insts_to_peer: table[string] of Management::Instance;
# Helpful locals.
local inst_name: string;
local inst: Management::Instance;
for ( inst_name in g_instances )
add insts_current[inst_name];
for ( inst in g_configs[READY]$instances )
add insts_new[inst$name];
# Populate TODO lists for instances we need to drop, check, or add.
insts_to_drop = insts_current - insts_new;
insts_to_add = insts_new - insts_current;
insts_to_keep = insts_new & insts_current;
for ( inst in g_configs[READY]$instances )
{
if ( inst$name in insts_to_add )
{
insts_to_peer[inst$name] = inst;
next;
}
# Focus on the keepers: check for change in identity/location.
if ( inst$name !in insts_to_keep )
next;
if ( is_instance_connectivity_change(inst) )
{
# The endpoint looks different. We drop the current one
# and need to re-establish connectivity with the new
# one.
add insts_to_drop[inst$name];
add insts_to_add[inst$name];
}
}
# Process our TODO lists. Handle drops first, then additions, in
# case we need to re-establish connectivity with an agent.
for ( inst_name in insts_to_drop )
{
Management::Log::debug(fmt("dropping instance %s", inst_name));
drop_instance(g_instances[inst_name]);
}
for ( inst_name in insts_to_peer )
{
Management::Log::debug(fmt("adding instance %s", inst_name));
add_instance(insts_to_peer[inst_name]);
}
# Updates to instance tables are complete. As a corner case, if the
# config contained no instances (and thus no nodes), we're now done
# since there are no agent interactions to wait for (any agents that
# need to tear down nodes are doing so asynchronously as part of the
# drop_instance() calls above).
if ( |insts_new| == 0 )
{
local config = req$deploy_state$config;
g_configs[DEPLOYED] = config;
g_config_reqid_pending = "";
local res = Management::Result($reqid=req$id, $data=config$id);
req$results += res;
if ( ! req$deploy_state$is_internal )
{
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::deploy_response, req$id, req$results);
}
Management::Request::finish(req$id);
return;
}
} }
event Management::Controller::API::notify_agents_ready(instances: set[string]) event Management::Controller::API::notify_agents_ready(instances: set[string])
{ {
local insts = Management::Util::set_to_vector(instances); local insts = Management::Util::set_to_vector(instances);
local req: Management::Request::Request;
Management::Log::info(fmt("rx Management::Controller::API:notify_agents_ready %s", Management::Log::info(fmt("rx Management::Controller::API:notify_agents_ready %s",
join_string_vec(insts, ", "))); join_string_vec(insts, ", ")));
local req = Management::Request::lookup(g_config_reqid_pending); # If we're not currently deploying a configuration, but have a deployed
# configuration, trigger a deployment at this point. Some of our agents
# might have restarted, and need to get in sync with us. Agents already
# running this configuration will do nothing.
if ( g_config_reqid_pending == "" && DEPLOYED in g_configs )
{
req = Management::Request::create();
req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T);
Management::Log::info(fmt("no deployment in progress, triggering via %s", req$id));
deploy(req);
}
req = Management::Request::lookup(g_config_reqid_pending);
# If there's no pending request, when it's no longer available, or it # If there's no pending request, when it's no longer available, or it
# doesn't have config state, don't do anything else. # doesn't have config state, don't do anything else.
if ( Management::Request::is_null(req) || ! req?$set_configuration_state ) if ( Management::Request::is_null(req) || ! req?$deploy_state )
return; return;
# All instances requested in the pending configuration update are now # All instances requested in the pending configuration update are now
# known to us. Send them the config. As they send their response events # known to us. Send them the config. As they send their response events
# we update the client's request state and eventually send the response # we update the client's request state and eventually send the response
# event to the it. # event to the it.
send_config_to_agents(req, req$set_configuration_state$config); config_deploy_to_agents(req$deploy_state$config, req);
} }
event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count) event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count)
@ -617,6 +736,12 @@ event Management::Agent::API::notify_agent_hello(instance: string, id: string, c
if ( ei$id != "" && ei?$network ) if ( ei$id != "" && ei?$network )
{ {
if ( instance !in g_instances_known )
Management::Log::debug(fmt("instance %s newly checked in", instance));
else
Management::Log::debug(fmt("instance %s checked in again", instance));
g_instances_by_id[id] = instance;
g_instances_known[instance] = Management::Instance( g_instances_known[instance] = Management::Instance(
$name=instance, $host=to_addr(ei$network$address)); $name=instance, $host=to_addr(ei$network$address));
@ -625,8 +750,6 @@ event Management::Agent::API::notify_agent_hello(instance: string, id: string, c
# We connected to this agent, note down its port. # We connected to this agent, note down its port.
g_instances_known[instance]$listen_port = ei$network$bound_port; g_instances_known[instance]$listen_port = ei$network$bound_port;
} }
Management::Log::debug(fmt("instance %s now known to us", instance));
} }
if ( instance in g_instances && instance !in g_instances_ready ) if ( instance in g_instances && instance !in g_instances_ready )
@ -654,7 +777,6 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana
# An agent we've been waiting to hear back from is ready for cluster # An agent we've been waiting to hear back from is ready for cluster
# work. Double-check we still want it, otherwise drop it. # work. Double-check we still want it, otherwise drop it.
if ( ! result$success || result$instance !in g_instances ) if ( ! result$success || result$instance !in g_instances )
{ {
Management::Log::info(fmt( Management::Log::info(fmt(
@ -686,9 +808,10 @@ event Management::Agent::API::notify_log(instance: string, msg: string, node: st
# XXX TODO # XXX TODO
} }
event Management::Agent::API::set_configuration_response(reqid: string, results: Management::ResultVec) event Management::Agent::API::deploy_response(reqid: string, results: Management::ResultVec)
{ {
Management::Log::info(fmt("rx Management::Agent::API::set_configuration_response %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::deploy_response %s %s",
reqid, Management::result_vec_to_string(results)));
# Retrieve state for the request we just got a response to # Retrieve state for the request we just got a response to
local areq = Management::Request::lookup(reqid); local areq = Management::Request::lookup(reqid);
@ -715,197 +838,109 @@ event Management::Agent::API::set_configuration_response(reqid: string, results:
# Mark this request as done by removing it from the table of pending # Mark this request as done by removing it from the table of pending
# ones. The following if-check should always be true. # ones. The following if-check should always be true.
if ( areq$id in req$set_configuration_state$requests ) if ( areq$id in req$deploy_state$requests )
delete req$set_configuration_state$requests[areq$id]; delete req$deploy_state$requests[areq$id];
# If there are any pending requests to the agents, we're # If there are any pending requests to the agents, we're
# done: we respond once every agent has responed (or we time out). # done: we respond once every agent has responed (or we time out).
if ( |req$set_configuration_state$requests| > 0 ) if ( |req$deploy_state$requests| > 0 )
return; return;
# All set_configuration requests to instances are done, so adopt the # All deploy requests to instances are done, so adopt the config
# client's requested configuration as the new one and respond back to # as the new deployed one and respond back to client.
# client. local config = req$deploy_state$config;
g_config_current = req$set_configuration_state$config; g_configs[DEPLOYED] = config;
g_config_reqid_pending = ""; g_config_reqid_pending = "";
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", local res = Management::Result($reqid=req$id, $data=config$id);
Management::Request::to_string(req))); req$results += res;
Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results); if ( ! req$deploy_state$is_internal )
{
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::deploy_response, req$id, req$results);
}
Management::Request::finish(req$id); Management::Request::finish(req$id);
} }
event Management::Controller::API::set_configuration_request(reqid: string, config: Management::Configuration) event Management::Controller::API::stage_configuration_request(reqid: string, config: Management::Configuration)
{ {
Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::stage_configuration_request %s", reqid));
local res: Management::Result;
local req = Management::Request::create(reqid); local req = Management::Request::create(reqid);
local res = Management::Result($reqid=req$id);
local config_copy: Management::Configuration;
req$set_configuration_state = SetConfigurationState($config = config);
# At the moment there can only be one pending request.
if ( g_config_reqid_pending != "" )
{
send_set_configuration_response_error(req,
fmt("request %s still pending", g_config_reqid_pending));
Management::Request::finish(req$id);
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
Management::Request::to_string(req)));
return;
}
# If the config has problems, reject it:
if ( ! config_validate(config, req) ) if ( ! config_validate(config, req) )
{ {
Management::Request::finish(req$id); Management::Request::finish(req$id);
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::stage_configuration_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic, Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results); Management::Controller::API::stage_configuration_response, req$id, req$results);
return; return;
} }
if ( Management::Controller::auto_assign_ports ) if ( ! Management::Controller::auto_assign_ports )
config_assign_ports(config);
else
{ {
local nodes = config_nodes_lacking_ports(config); local nodes = config_nodes_lacking_ports(config);
if ( |nodes| > 0 ) if ( |nodes| > 0 )
{ {
local nodes_str = join_string_vec(nodes, ", "); local nodes_str = join_string_vec(nodes, ", ");
send_set_configuration_response_error(req,
fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str));
Management::Request::finish(req$id); res$success = F;
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", res$error = fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str);
req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::stage_configuration_response %s",
Management::Request::to_string(req))); Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::stage_configuration_response, req$id, req$results);
Management::Request::finish(req$id);
return; return;
} }
} }
# The incoming request is now the pending one. It gets cleared when all g_configs[STAGED] = config;
# agents have processed their config updates successfully, or their config_copy = copy(config);
# responses time out.
g_config_reqid_pending = req$id;
# Compare the instance configuration to our current one. If it matches, if ( Management::Controller::auto_assign_ports )
# we can proceed to deploying the new cluster topology. If it does config_assign_ports(config_copy);
# not, we need to establish connectivity with agents we connect to, or
# wait until all instances that connect to us have done so. Either triggers
# a notify_agents_ready event, upon which we then deploy the topology.
# The current & new set of instance names. g_configs[READY] = config_copy;
local insts_current: set[string];
local insts_new: set[string];
# A set of current instances not contained in the new config. # We return the ID of the new configuration in the response.
# Those will need to get dropped. res$data = config$id;
local insts_to_drop: set[string]; req$results += res;
# The opposite: new instances not yet in our current set. Those we will need Management::Log::info(fmt(
# to establish contact with (or they with us). "tx Management::Controller::API::stage_configuration_response %s",
local insts_to_add: set[string]; Management::result_to_string(res)));
Broker::publish(Management::Controller::topic,
# The overlap: instances in both the current and new set. For those we verify Management::Controller::API::stage_configuration_response, reqid, req$results);
# that we're actually dealign with the same entities, and might need to re- Management::Request::finish(req$id);
# connect if not.
local insts_to_keep: set[string];
# Alternative representation of insts_to_add, directly providing the instances.
local insts_to_peer: table[string] of Management::Instance;
# Helpful locals.
local inst_name: string;
local inst: Management::Instance;
for ( inst_name in g_instances )
add insts_current[inst_name];
for ( inst in config$instances )
add insts_new[inst$name];
# Populate TODO lists for instances we need to drop, check, or add.
insts_to_drop = insts_current - insts_new;
insts_to_add = insts_new - insts_current;
insts_to_keep = insts_new & insts_current;
for ( inst in config$instances )
{
if ( inst$name in insts_to_add )
{
insts_to_peer[inst$name] = inst;
next;
}
# Focus on the keepers: check for change in identity/location.
if ( inst$name !in insts_to_keep )
next;
if ( is_instance_connectivity_change(inst) )
{
# The endpoint looks different. We drop the current one
# and need to re-establish connectivity with the new
# one.
add insts_to_drop[inst$name];
add insts_to_add[inst$name];
}
}
# Process our TODO lists. Handle drops first, then additions, in
# case we need to re-establish connectivity with an agent.
for ( inst_name in insts_to_drop )
{
Management::Log::debug(fmt("dropping instance %s", inst_name));
drop_instance(g_instances[inst_name]);
}
for ( inst_name in insts_to_peer )
{
Management::Log::debug(fmt("adding instance %s", inst_name));
add_instance(insts_to_peer[inst_name]);
}
# Updates to instance tables are complete. As a corner case, if the
# config contained no instances (and thus no nodes), we're now done
# since there are no agent interactions to wait for:
if ( |insts_new| == 0 )
{
g_config_current = req$set_configuration_state$config;
g_config_reqid_pending = "";
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::set_configuration_response, req$id, req$results);
Management::Request::finish(req$id);
return;
}
# Otherwise, check if we're able to send the config to all agents
# involved. If that's the case, this will trigger a
# Management::Controller::API::notify_agents_ready event that implements
# the distribution in the controller's own event handler, above.
check_instances_ready();
} }
event Management::Controller::API::get_configuration_request(reqid: string) event Management::Controller::API::get_configuration_request(reqid: string, deployed: bool)
{ {
Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid));
local res = Management::Result($reqid=reqid); local res = Management::Result($reqid=reqid);
local key = deployed ? DEPLOYED : STAGED;
if ( is_null_config(g_config_current) ) if ( key !in g_configs )
{ {
# We don't have a live configuration yet. # Cut off enum namespacing prefix and turn rest lower-case, for readability:
res$error = fmt("no %s configuration available", to_lower(sub(cat(key), /.+::/, "")));
res$success = F; res$success = F;
res$error = "no configuration deployed";
} }
else else
{ {
res$data = g_config_current; res$data = g_configs[key];
} }
Management::Log::info(fmt( Management::Log::info(fmt(
@ -915,6 +950,48 @@ event Management::Controller::API::get_configuration_request(reqid: string)
Management::Controller::API::get_configuration_response, reqid, res); Management::Controller::API::get_configuration_response, reqid, res);
} }
event Management::Controller::API::deploy_request(reqid: string)
{
local send_error_response = function(req: Management::Request::Request, error: string)
{
local res = Management::Result($reqid=req$id, $success=F, $error=error);
req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::deploy_response, req$id, req$results);
};
Management::Log::info(fmt("rx Management::Controller::API::deploy_request %s", reqid));
local req = Management::Request::create(reqid);
if ( READY !in g_configs )
{
send_error_response(req, "no configuration available to deploy");
Management::Request::finish(req$id);
return;
}
# At the moment there can only be one pending deployment.
if ( g_config_reqid_pending != "" )
{
send_error_response(req,
fmt("earlier deployment %s still pending", g_config_reqid_pending));
Management::Request::finish(req$id);
return;
}
req$deploy_state = DeployState($config = g_configs[READY]);
deploy(req);
# Check if we're already able to send the config to all agents involved:
# If so, this triggers Management::Controller::API::notify_agents_ready,
# which we handle above by distributing the config.
check_instances_ready();
}
event Management::Controller::API::get_instances_request(reqid: string) event Management::Controller::API::get_instances_request(reqid: string)
{ {
Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid));
@ -1111,8 +1188,8 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
# don't propagate them to the agents. # don't propagate them to the agents.
if ( |nodes| > 0 ) if ( |nodes| > 0 )
{ {
# Requested nodes that are in the current configuration: # Requested nodes that are in the deployed configuration:
nodes_final = filter_config_nodes_by_name(nodes); nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes);
# Requested nodes that are not in current configuration: # Requested nodes that are not in current configuration:
local nodes_invalid = nodes - nodes_final; local nodes_invalid = nodes - nodes_final;
@ -1171,16 +1248,21 @@ event Management::Request::request_expired(req: Management::Request::Request)
$success = F, $success = F,
$error = "request timed out"); $error = "request timed out");
if ( req?$set_configuration_state ) Management::Log::info(fmt("request %s timed out", req$id));
if ( req?$deploy_state )
{ {
# This timeout means we no longer have a pending request. # This timeout means we no longer have a pending request.
g_config_reqid_pending = ""; g_config_reqid_pending = "";
req$results += res; req$results += res;
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", if ( ! req$deploy_state$is_internal )
Management::Request::to_string(req))); {
Broker::publish(Management::Controller::topic, Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
Management::Controller::API::set_configuration_response, req$id, req$results); Management::Request::to_string(req)));
Broker::publish(Management::Controller::topic,
Management::Controller::API::deploy_response, req$id, req$results);
}
} }
if ( req?$get_nodes_state ) if ( req?$get_nodes_state )
@ -1240,18 +1322,32 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
Management::Log::debug(fmt("broker peer %s added: %s", peer, msg)); Management::Log::debug(fmt("broker peer %s added: %s", peer, msg));
} }
event Broker::peer_lost(peer: Broker::EndpointInfo, msg: string)
{
Management::Log::debug(fmt("broker peer %s lost: %s", peer, msg));
if ( peer$id in g_instances_by_id )
{
local instance = g_instances_by_id[peer$id];
if ( instance in g_instances_known )
delete g_instances_known[instance];
if ( instance in g_instances_ready )
delete g_instances_ready[instance];
Management::Log::info(fmt("dropped state for instance %s", instance));
delete g_instances_by_id[peer$id];
}
}
event zeek_init() event zeek_init()
{ {
# Initialize null config at startup. We will replace it once we have # The controller always listens: it needs to be able to respond to
# persistence, and again whenever we complete a client's # clients connecting to it, as well as agents if they connect to the
# set_configuration request. # controller. The controller does not automatically connect to any
g_config_current = null_config(); # agents; instances with listening agents are conveyed to the controller
# via configurations uploaded by a client, with connections established
# The controller always listens -- it needs to be able to respond to the # upon deployment.
# Zeek client. This port is also used by the agents if they connect to
# the client. The client doesn't automatically establish or accept
# connectivity to agents: agents are defined and communicated with as
# defined via configurations defined by the client.
local cni = Management::Controller::network_info(); local cni = Management::Controller::network_info();
@ -1261,4 +1357,15 @@ event zeek_init()
Broker::subscribe(Management::Controller::topic); Broker::subscribe(Management::Controller::topic);
Management::Log::info(fmt("controller is live, Broker ID %s", Broker::node_id())); Management::Log::info(fmt("controller is live, Broker ID %s", Broker::node_id()));
# If we have a persisted deployed configuration, we need to make sure
# it's actually running. The agents involved might be gone, running a
# different config, etc. We simply run a deployment: agents already
# running this configuration will do nothing.
if ( DEPLOYED in g_configs )
{
local req = Management::Request::create();
req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T);
deploy(req);
}
} }

View file

@ -38,7 +38,7 @@ export {
## The log level in use for this node. This is the minimum ## The log level in use for this node. This is the minimum
## log level required to produce output. ## log level required to produce output.
global log_level = INFO &redef; global level = INFO &redef;
## A debug-level log message writer. ## A debug-level log message writer.
## ##
@ -84,7 +84,7 @@ global r2s: table[Management::Role] of string = {
function debug(message: string) function debug(message: string)
{ {
if ( enum_to_int(log_level) > enum_to_int(DEBUG) ) if ( enum_to_int(level) > enum_to_int(DEBUG) )
return; return;
local node = Supervisor::node(); local node = Supervisor::node();
@ -94,7 +94,7 @@ function debug(message: string)
function info(message: string) function info(message: string)
{ {
if ( enum_to_int(log_level) > enum_to_int(INFO) ) if ( enum_to_int(level) > enum_to_int(INFO) )
return; return;
local node = Supervisor::node(); local node = Supervisor::node();
@ -104,7 +104,7 @@ function info(message: string)
function warning(message: string) function warning(message: string)
{ {
if ( enum_to_int(log_level) > enum_to_int(WARNING) ) if ( enum_to_int(level) > enum_to_int(WARNING) )
return; return;
local node = Supervisor::node(); local node = Supervisor::node();
@ -114,7 +114,7 @@ function warning(message: string)
function error(message: string) function error(message: string)
{ {
if ( enum_to_int(log_level) > enum_to_int(ERROR) ) if ( enum_to_int(level) > enum_to_int(ERROR) )
return; return;
local node = Supervisor::node(); local node = Supervisor::node();

View file

@ -32,6 +32,14 @@ export {
finished: bool &default=F; finished: bool &default=F;
}; };
# To allow a callback to refer to Requests, the Request type must
# exist. So redef to add it:
redef record Request += {
## A callback to invoke when this request is finished via
## :zeek:see:`Management::Request::finish`.
finish: function(req: Management::Request::Request) &optional;
};
## The timeout interval for request state. Such state (see the ## The timeout interval for request state. Such state (see the
## :zeek:see:`Management::Request` module) ties together request and ## :zeek:see:`Management::Request` module) ties together request and
## response event pairs. A timeout causes cleanup of request state if ## response event pairs. A timeout causes cleanup of request state if
@ -131,6 +139,9 @@ function finish(reqid: string): bool
local req = g_requests[reqid]; local req = g_requests[reqid];
delete g_requests[reqid]; delete g_requests[reqid];
if ( req?$finish )
req$finish(req);
req$finished = T; req$finished = T;
return T; return T;
@ -146,20 +157,12 @@ function is_null(request: Request): bool
function to_string(request: Request): string function to_string(request: Request): string
{ {
local results: string_vec;
local res: Management::Result;
local parent_id = ""; local parent_id = "";
if ( request?$parent_id ) if ( request?$parent_id )
parent_id = fmt(" (via %s)", request$parent_id); parent_id = fmt(" (via %s)", request$parent_id);
for ( idx in request$results )
{
res = request$results[idx];
results[|results|] = Management::result_to_string(res);
}
return fmt("[request %s%s %s, results: %s]", request$id, parent_id, return fmt("[request %s%s %s, results: %s]", request$id, parent_id,
request$finished ? "finished" : "pending", request$finished ? "finished" : "pending",
join_string_vec(results, ",")); Management::result_vec_to_string(request$results));
} }

View file

@ -92,22 +92,27 @@ export {
type NodeStatusVec: vector of NodeStatus; type NodeStatusVec: vector of NodeStatus;
## Return value for request-response API event pairs ## Return value for request-response API event pairs. Some responses
## contain one, others multiple of these. The request ID allows clients
## to string requests and responses together. Agents and the controller
## fill in the instance and node fields whenever there's sufficient
## context to define them. Any result produced by an agent will carry an
## instance value, for example.
type Result: record { type Result: record {
reqid: string; ##< Request ID of operation this result refers to reqid: string; ##< Request ID of operation this result refers to
instance: string &default=""; ##< Name of associated instance (for context)
success: bool &default=T; ##< True if successful success: bool &default=T; ##< True if successful
instance: string &optional; ##< Name of associated instance (for context)
data: any &optional; ##< Addl data returned for successful operation data: any &optional; ##< Addl data returned for successful operation
error: string &default=""; ##< Descriptive error on failure error: string &optional; ##< Descriptive error on failure
node: string &optional; ##< Name of associated node (for context) node: string &optional; ##< Name of associated node (for context)
}; };
type ResultVec: vector of Result; type ResultVec: vector of Result;
## In :zeek:see:`Management::Controller::API::set_configuration_response`, ## In :zeek:see:`Management::Controller::API::deploy_response` events,
## events, each :zeek:see:`Management::Result` indicates the outcome of a ## each :zeek:see:`Management::Result` indicates the outcome of a
## requested cluster node. If a node does not launch properly (meaning ## launched cluster node. If a node does not launch properly (meaning
## it doesn't check in with the agent on thee machine it's running on), ## it doesn't check in with the agent on the machine it's running on),
## the result will indicate failure, and its data field will be an ## the result will indicate failure, and its data field will be an
## instance of this record, capturing the stdout and stderr output of ## instance of this record, capturing the stdout and stderr output of
## the failing node. ## the failing node.
@ -119,6 +124,10 @@ export {
## Given a :zeek:see:`Management::Result` record, ## Given a :zeek:see:`Management::Result` record,
## this function returns a string summarizing it. ## this function returns a string summarizing it.
global result_to_string: function(res: Result): string; global result_to_string: function(res: Result): string;
## Given a vector of :zeek:see:`Management::Result` records,
## this function returns a string summarizing them.
global result_vec_to_string: function(res: ResultVec): string;
} }
function result_to_string(res: Result): string function result_to_string(res: Result): string
@ -127,7 +136,7 @@ function result_to_string(res: Result): string
if ( res$success ) if ( res$success )
result = "success"; result = "success";
else if ( res$error != "" ) else if ( res?$error )
result = fmt("error (%s)", res$error); result = fmt("error (%s)", res$error);
else else
result = "error"; result = "error";
@ -136,7 +145,7 @@ function result_to_string(res: Result): string
if ( res$reqid != "" ) if ( res$reqid != "" )
details[|details|] = fmt("reqid %s", res$reqid); details[|details|] = fmt("reqid %s", res$reqid);
if ( res$instance != "" ) if ( res?$instance )
details[|details|] = fmt("instance %s", res$instance); details[|details|] = fmt("instance %s", res$instance);
if ( res?$node && res$node != "" ) if ( res?$node && res$node != "" )
details[|details|] = fmt("node %s", res$node); details[|details|] = fmt("node %s", res$node);
@ -146,3 +155,13 @@ function result_to_string(res: Result): string
return result; return result;
} }
function result_vec_to_string(res: ResultVec): string
{
local ret: vector of string;
for ( idx in res )
ret += result_to_string(res[idx]);;
return join_string_vec(ret, ", ");
}

View file

@ -1 +1 @@
6cb24ebe9ad0725b28c4e4d5afa3ec1f6d04eed5 e01ffdcd799d3ca2851225994108d500c540fbe2