mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Management framework: introduce deployment API in controller
This separates uploading a configuration from deploying it to the instances into separate event transactions. set_configuration_request/response remains, but now only conducts validation and storage of the new configuration (upon validation success, and not yet persisted to disk). The response event indicates success or the list of validation errors. Successful upload now returns the configuration's ID in the result record's data struct. The new deploy_request/response event takes a previously uploaded configuration and deploys it to the agents. The controller now tracks uploaded and deployed configurations separately. Uploading assigns g_config_staged; deployment assigns g_config_deployed. Deployment does not affect g_config_staged. The get_config_request/response event pair now allows selecting the configuration the caller would like to retrieve.
This commit is contained in:
parent
0480b5f39c
commit
77556e9f11
2 changed files with 211 additions and 119 deletions
|
@ -32,10 +32,12 @@ export {
|
||||||
result: Management::Result);
|
result: Management::Result);
|
||||||
|
|
||||||
|
|
||||||
## The client sends this event to establish a new cluster configuration,
|
## The client sends this event to upload a new cluster configuration,
|
||||||
## including the full cluster topology. The controller processes the update
|
## including the full cluster topology. The controller validates the
|
||||||
## and relays it to the agents. Once each has responded (or a timeout occurs)
|
## configuration and indicates the outcome in its response event. No
|
||||||
## the controller sends a corresponding response event back to the client.
|
## deployment takes place yet, and existing deployed configurations and
|
||||||
|
## clusters remain intact. To trigger deployment of an uploaded
|
||||||
|
## configuration, use :zeek:see:`Management::Controller::API::deploy_request`.
|
||||||
##
|
##
|
||||||
## reqid: a request identifier string, echoed in the response event.
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
##
|
##
|
||||||
|
@ -50,8 +52,10 @@ export {
|
||||||
##
|
##
|
||||||
## reqid: the request identifier used in the request event.
|
## reqid: the request identifier used in the request event.
|
||||||
##
|
##
|
||||||
## result: a vector of :zeek:see:`Management::Result` records.
|
## result: a :zeek:see:`Management::Result` vector, indicating whether
|
||||||
## Each member captures one agent's response.
|
## the controller accepts the configuration. In case of a success,
|
||||||
|
## a single result record indicates so. Otherwise, the sequence is
|
||||||
|
## all errors, each indicating a configuration validation error.
|
||||||
##
|
##
|
||||||
global set_configuration_response: event(reqid: string,
|
global set_configuration_response: event(reqid: string,
|
||||||
result: Management::ResultVec);
|
result: Management::ResultVec);
|
||||||
|
@ -62,7 +66,10 @@ export {
|
||||||
##
|
##
|
||||||
## reqid: a request identifier string, echoed in the response event.
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
##
|
##
|
||||||
global get_configuration_request: event(reqid: string);
|
## deployed: when true, returns the deployed configuration (if any),
|
||||||
|
## otherwise the staged one (if any).
|
||||||
|
##
|
||||||
|
global get_configuration_request: event(reqid: string, deployed: bool);
|
||||||
|
|
||||||
## Response to a get_configuration_request event. The controller sends
|
## Response to a get_configuration_request event. The controller sends
|
||||||
## this back to the client.
|
## this back to the client.
|
||||||
|
@ -78,6 +85,32 @@ export {
|
||||||
result: Management::Result);
|
result: Management::Result);
|
||||||
|
|
||||||
|
|
||||||
|
## The client sends this event to trigger deployment of a previously
|
||||||
|
## uploaded configuration. The controller deploys the uploaded
|
||||||
|
## configuration to all agents involved in running the former
|
||||||
|
## configuration or the new one. The agents terminate any previously
|
||||||
|
## running cluster nodes and (re-)launch those defined in the new
|
||||||
|
## configuration. Once each agent has responded (or a timeout occurs),
|
||||||
|
## the controller sends a response event back to the client.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
global deploy_request: event(reqid: string);
|
||||||
|
|
||||||
|
## Response to a deploy_request event. The controller sends this
|
||||||
|
## back to the client.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## result: a vector of :zeek:see:`Management::Result` records.
|
||||||
|
## Each member captures the result of launching one cluster
|
||||||
|
## node captured in the configuration, or an agent-wide error
|
||||||
|
## when the result does not indicate a particular node.
|
||||||
|
##
|
||||||
|
global deploy_response: event(reqid: string,
|
||||||
|
result: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
## The client sends this event to request a list of
|
## The client sends this event to request a list of
|
||||||
## :zeek:see:`Management::NodeStatus` records that capture
|
## :zeek:see:`Management::NodeStatus` records that capture
|
||||||
## the status of Supervisor-managed nodes running on the cluster's
|
## the status of Supervisor-managed nodes running on the cluster's
|
||||||
|
|
|
@ -19,10 +19,10 @@ module Management::Controller::Runtime;
|
||||||
# tucked-on types.
|
# tucked-on types.
|
||||||
export {
|
export {
|
||||||
## Request state specific to
|
## Request state specific to
|
||||||
## :zeek:see:`Management::Controller::API::set_configuration_request` and
|
## :zeek:see:`Management::Controller::API::deploy_request` and
|
||||||
## :zeek:see:`Management::Controller::API::set_configuration_response`.
|
## :zeek:see:`Management::Controller::API::deploy_response`.
|
||||||
type SetConfigurationState: record {
|
type DeployState: record {
|
||||||
## The cluster configuration established with this request
|
## The cluster configuration the controller is deploying.
|
||||||
config: Management::Configuration;
|
config: Management::Configuration;
|
||||||
## Request state for every controller/agent transaction.
|
## Request state for every controller/agent transaction.
|
||||||
requests: set[string] &default=set();
|
requests: set[string] &default=set();
|
||||||
|
@ -64,7 +64,7 @@ export {
|
||||||
}
|
}
|
||||||
|
|
||||||
redef record Management::Request::Request += {
|
redef record Management::Request::Request += {
|
||||||
set_configuration_state: SetConfigurationState &optional;
|
deploy_state: DeployState &optional;
|
||||||
get_nodes_state: GetNodesState &optional;
|
get_nodes_state: GetNodesState &optional;
|
||||||
node_dispatch_state: NodeDispatchState &optional;
|
node_dispatch_state: NodeDispatchState &optional;
|
||||||
test_state: TestState &optional;
|
test_state: TestState &optional;
|
||||||
|
@ -93,6 +93,12 @@ global drop_instance: function(inst: Management::Instance);
|
||||||
global null_config: function(): Management::Configuration;
|
global null_config: function(): Management::Configuration;
|
||||||
global config_is_null: function(config: Management::Configuration): bool;
|
global config_is_null: function(config: Management::Configuration): bool;
|
||||||
|
|
||||||
|
# Sends the given configuration to all g_instances members via a
|
||||||
|
# Management::Agent::API::deploy_request event. To track responses, it builds up
|
||||||
|
# deployment state in the given request for each recipient.
|
||||||
|
global config_deploy_to_agents: function(config: Management::Configuration,
|
||||||
|
req: Management::Request::Request);
|
||||||
|
|
||||||
# Returns list of names of nodes in the given configuration that require a
|
# Returns list of names of nodes in the given configuration that require a
|
||||||
# listening port. Returns empty list if the config has no such nodes.
|
# listening port. Returns empty list if the config has no such nodes.
|
||||||
global config_nodes_lacking_ports: function(config: Management::Configuration): vector of string;
|
global config_nodes_lacking_ports: function(config: Management::Configuration): vector of string;
|
||||||
|
@ -110,11 +116,16 @@ global config_assign_ports: function(config: Management::Configuration);
|
||||||
global config_validate: function(config: Management::Configuration,
|
global config_validate: function(config: Management::Configuration,
|
||||||
req: Management::Request::Request): bool;
|
req: Management::Request::Request): bool;
|
||||||
|
|
||||||
# Rejects the given configuration with the given error message. The function
|
# Returns the set of node names in the given configuration that overlaps with
|
||||||
# adds a non-success result record to the given request and send the
|
# the set provided.
|
||||||
# set_configuration_response event back to the client. It does not call finish()
|
global config_filter_nodes_by_name: function(config: Management::Configuration,
|
||||||
# on the request.
|
nodes: set[string]): set[string];
|
||||||
global send_set_configuration_response_error: function(req: Management::Request::Request, error: string);
|
|
||||||
|
# Fails the given deployment request with the given error message. The function
|
||||||
|
# adds a non-success result record to the request and send a deploy_response
|
||||||
|
# event back to the client. It does not call finish() on the request.
|
||||||
|
global send_deploy_response_error: function(
|
||||||
|
req: Management::Request::Request, error: string);
|
||||||
|
|
||||||
# Given a Broker ID, this returns the endpoint info associated with it.
|
# Given a Broker ID, this returns the endpoint info associated with it.
|
||||||
# On error, returns a dummy record with an empty ID string.
|
# On error, returns a dummy record with an empty ID string.
|
||||||
|
@ -145,16 +156,22 @@ global g_instances_known: table[string] of Management::Instance = table();
|
||||||
# instance, and store that in g_instances.)
|
# instance, and store that in g_instances.)
|
||||||
global g_instances_ready: set[string] = set();
|
global g_instances_ready: set[string] = set();
|
||||||
|
|
||||||
# The request ID of the most recent configuration update that's come in from
|
# The request ID of the most recent deployment request from a client. We track
|
||||||
# a client. We track it here until we know we are ready to communicate with all
|
# it here until we know we are ready to communicate with all agents required for
|
||||||
# agents required by the update.
|
# the update.
|
||||||
global g_config_reqid_pending: string = "";
|
global g_config_reqid_pending: string = "";
|
||||||
|
|
||||||
# The most recent configuration we have successfully deployed. This is also
|
# The most recent configuration we have successfully deployed. When no
|
||||||
# the one we send whenever the client requests it.
|
# deployment has happened yet, this is a "null config" as per null_config().
|
||||||
global g_config_current: Management::Configuration;
|
global g_config_deployed: Management::Configuration;
|
||||||
|
|
||||||
function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration)
|
# The most recently provided configuration by the client. When the controller
|
||||||
|
# hasn't yet received a configuration, this is a "null config" as per
|
||||||
|
# null_config(). Successful deployment doesn't invalidate or clear this
|
||||||
|
# configuration, it remains set.
|
||||||
|
global g_config_staged: Management::Configuration;
|
||||||
|
|
||||||
|
function config_deploy_to_agents(config: Management::Configuration, req: Management::Request::Request)
|
||||||
{
|
{
|
||||||
for ( name in g_instances )
|
for ( name in g_instances )
|
||||||
{
|
{
|
||||||
|
@ -168,7 +185,7 @@ function send_config_to_agents(req: Management::Request::Request, config: Manage
|
||||||
# We track the requests sent off to each agent. As the
|
# We track the requests sent off to each agent. As the
|
||||||
# responses come in, we delete them. Once the requests
|
# responses come in, we delete them. Once the requests
|
||||||
# set is empty, we respond back to the client.
|
# set is empty, we respond back to the client.
|
||||||
add req$set_configuration_state$requests[areq$id];
|
add req$deploy_state$requests[areq$id];
|
||||||
|
|
||||||
# We could also broadcast just once on the agent prefix, but
|
# We could also broadcast just once on the agent prefix, but
|
||||||
# explicit request/response pairs for each agent seems cleaner.
|
# explicit request/response pairs for each agent seems cleaner.
|
||||||
|
@ -244,6 +261,11 @@ function null_config(): Management::Configuration
|
||||||
return Management::Configuration($id="");
|
return Management::Configuration($id="");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function config_is_null(config: Management::Configuration): bool
|
||||||
|
{
|
||||||
|
return config$id == "";
|
||||||
|
}
|
||||||
|
|
||||||
function config_nodes_lacking_ports(config: Management::Configuration): vector of string
|
function config_nodes_lacking_ports(config: Management::Configuration): vector of string
|
||||||
{
|
{
|
||||||
local res: vector of string;
|
local res: vector of string;
|
||||||
|
@ -491,6 +513,18 @@ function config_validate(config: Management::Configuration,
|
||||||
return F;
|
return F;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function config_filter_nodes_by_name(config: Management::Configuration, nodes: set[string])
|
||||||
|
: set[string]
|
||||||
|
{
|
||||||
|
local res: set[string];
|
||||||
|
local cluster_nodes: set[string];
|
||||||
|
|
||||||
|
for ( node in config$nodes )
|
||||||
|
add cluster_nodes[node$name];
|
||||||
|
|
||||||
|
return nodes & cluster_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
function find_endpoint(id: string): Broker::EndpointInfo
|
function find_endpoint(id: string): Broker::EndpointInfo
|
||||||
{
|
{
|
||||||
local peers = Broker::peers();
|
local peers = Broker::peers();
|
||||||
|
@ -505,11 +539,6 @@ function find_endpoint(id: string): Broker::EndpointInfo
|
||||||
return Broker::EndpointInfo($id="");
|
return Broker::EndpointInfo($id="");
|
||||||
}
|
}
|
||||||
|
|
||||||
function config_is_null(config: Management::Configuration): bool
|
|
||||||
{
|
|
||||||
return config$id == "";
|
|
||||||
}
|
|
||||||
|
|
||||||
function is_instance_connectivity_change(inst: Management::Instance): bool
|
function is_instance_connectivity_change(inst: Management::Instance): bool
|
||||||
{
|
{
|
||||||
# If we're not tracking this instance as part of a cluster config, it's
|
# If we're not tracking this instance as part of a cluster config, it's
|
||||||
|
@ -536,18 +565,7 @@ function is_instance_connectivity_change(inst: Management::Instance): bool
|
||||||
return F;
|
return F;
|
||||||
}
|
}
|
||||||
|
|
||||||
function filter_config_nodes_by_name(nodes: set[string]): set[string]
|
function send_deploy_response_error(req: Management::Request::Request, error: string)
|
||||||
{
|
|
||||||
local res: set[string];
|
|
||||||
local cluster_nodes: set[string];
|
|
||||||
|
|
||||||
for ( node in g_config_current$nodes )
|
|
||||||
add cluster_nodes[node$name];
|
|
||||||
|
|
||||||
return nodes & cluster_nodes;
|
|
||||||
}
|
|
||||||
|
|
||||||
function send_set_configuration_response_error(req: Management::Request::Request, error: string)
|
|
||||||
{
|
{
|
||||||
local res = Management::Result($reqid=req$id);
|
local res = Management::Result($reqid=req$id);
|
||||||
|
|
||||||
|
@ -555,8 +573,10 @@ function send_set_configuration_response_error(req: Management::Request::Request
|
||||||
res$error = error;
|
res$error = error;
|
||||||
req$results += res;
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
Broker::publish(Management::Controller::topic,
|
Broker::publish(Management::Controller::topic,
|
||||||
Management::Controller::API::set_configuration_response, req$id, req$results);
|
Management::Controller::API::deploy_response, req$id, req$results);
|
||||||
}
|
}
|
||||||
|
|
||||||
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||||
|
@ -570,14 +590,14 @@ event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||||
|
|
||||||
# If there's no pending request, when it's no longer available, or it
|
# If there's no pending request, when it's no longer available, or it
|
||||||
# doesn't have config state, don't do anything else.
|
# doesn't have config state, don't do anything else.
|
||||||
if ( Management::Request::is_null(req) || ! req?$set_configuration_state )
|
if ( Management::Request::is_null(req) || ! req?$deploy_state )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
# All instances requested in the pending configuration update are now
|
# All instances requested in the pending configuration update are now
|
||||||
# known to us. Send them the config. As they send their response events
|
# known to us. Send them the config. As they send their response events
|
||||||
# we update the client's request state and eventually send the response
|
# we update the client's request state and eventually send the response
|
||||||
# event to the it.
|
# event to the it.
|
||||||
send_config_to_agents(req, req$set_configuration_state$config);
|
config_deploy_to_agents(req$deploy_state$config, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count)
|
event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count)
|
||||||
|
@ -714,24 +734,26 @@ event Management::Agent::API::deploy_response(reqid: string, results: Management
|
||||||
|
|
||||||
# Mark this request as done by removing it from the table of pending
|
# Mark this request as done by removing it from the table of pending
|
||||||
# ones. The following if-check should always be true.
|
# ones. The following if-check should always be true.
|
||||||
if ( areq$id in req$set_configuration_state$requests )
|
if ( areq$id in req$deploy_state$requests )
|
||||||
delete req$set_configuration_state$requests[areq$id];
|
delete req$deploy_state$requests[areq$id];
|
||||||
|
|
||||||
# If there are any pending requests to the agents, we're
|
# If there are any pending requests to the agents, we're
|
||||||
# done: we respond once every agent has responed (or we time out).
|
# done: we respond once every agent has responed (or we time out).
|
||||||
if ( |req$set_configuration_state$requests| > 0 )
|
if ( |req$deploy_state$requests| > 0 )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
# All deploy requests to instances are done, so adopt the
|
# All deploy requests to instances are done, so adopt the config
|
||||||
# client's requested configuration as the new one and respond back to
|
# as the new deployed one and respond back to client.
|
||||||
# client.
|
g_config_deployed = req$deploy_state$config;
|
||||||
g_config_current = req$set_configuration_state$config;
|
|
||||||
g_config_reqid_pending = "";
|
g_config_reqid_pending = "";
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
local res = Management::Result($reqid=req$id, $data=g_config_deployed$id);
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
|
||||||
Management::Request::to_string(req)));
|
Management::Request::to_string(req)));
|
||||||
Broker::publish(Management::Controller::topic,
|
Broker::publish(Management::Controller::topic,
|
||||||
Management::Controller::API::set_configuration_response, req$id, req$results);
|
Management::Controller::API::deploy_response, req$id, req$results);
|
||||||
Management::Request::finish(req$id);
|
Management::Request::finish(req$id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -739,24 +761,9 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid));
|
Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid));
|
||||||
|
|
||||||
local res: Management::Result;
|
|
||||||
local req = Management::Request::create(reqid);
|
local req = Management::Request::create(reqid);
|
||||||
|
local res = Management::Result($reqid=req$id);
|
||||||
|
|
||||||
req$set_configuration_state = SetConfigurationState($config = config);
|
|
||||||
|
|
||||||
# At the moment there can only be one pending request.
|
|
||||||
if ( g_config_reqid_pending != "" )
|
|
||||||
{
|
|
||||||
send_set_configuration_response_error(req,
|
|
||||||
fmt("request %s still pending", g_config_reqid_pending));
|
|
||||||
|
|
||||||
Management::Request::finish(req$id);
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
|
||||||
Management::Request::to_string(req)));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
# If the config has problems, reject it:
|
|
||||||
if ( ! config_validate(config, req) )
|
if ( ! config_validate(config, req) )
|
||||||
{
|
{
|
||||||
Management::Request::finish(req$id);
|
Management::Request::finish(req$id);
|
||||||
|
@ -776,26 +783,99 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
if ( |nodes| > 0 )
|
if ( |nodes| > 0 )
|
||||||
{
|
{
|
||||||
local nodes_str = join_string_vec(nodes, ", ");
|
local nodes_str = join_string_vec(nodes, ", ");
|
||||||
send_set_configuration_response_error(req,
|
|
||||||
fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str));
|
|
||||||
|
|
||||||
Management::Request::finish(req$id);
|
res$success = F;
|
||||||
|
res$error = fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str);
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
||||||
Management::Request::to_string(req)));
|
Management::Request::to_string(req)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::set_configuration_response, req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# The incoming request is now the pending one. It gets cleared when all
|
g_config_staged = config;
|
||||||
# agents have processed their config updates successfully, or their
|
|
||||||
# responses time out.
|
# We return the ID of the new configuration in the response.
|
||||||
|
res$data = config$id;
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::set_configuration_response %s",
|
||||||
|
Management::result_to_string(res)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::set_configuration_response, reqid, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Controller::API::get_configuration_request(reqid: string, deployed: bool)
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid));
|
||||||
|
|
||||||
|
local res = Management::Result($reqid=reqid);
|
||||||
|
local config: Management::Configuration;
|
||||||
|
|
||||||
|
if ( deployed )
|
||||||
|
config = g_config_deployed;
|
||||||
|
else
|
||||||
|
config = g_config_staged;
|
||||||
|
|
||||||
|
if ( config_is_null(config) )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = fmt("no %s configuration available",
|
||||||
|
deployed ? "deployed" : "staged");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
res$data = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::get_configuration_response %s",
|
||||||
|
Management::result_to_string(res)));
|
||||||
|
Broker::publish(Management::Controller::topic,
|
||||||
|
Management::Controller::API::get_configuration_response, reqid, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Controller::API::deploy_request(reqid: string)
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Controller::API::deploy_request %s", reqid));
|
||||||
|
|
||||||
|
local req = Management::Request::create(reqid);
|
||||||
|
|
||||||
|
# If there's no staged configuration, there's nothing to deploy.
|
||||||
|
if ( config_is_null(g_config_staged) )
|
||||||
|
{
|
||||||
|
send_deploy_response_error(req, "no configuration available to deploy");
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
# At the moment there can only be one pending deployment.
|
||||||
|
if ( g_config_reqid_pending != "" )
|
||||||
|
{
|
||||||
|
send_deploy_response_error(req,
|
||||||
|
fmt("earlier deployment %s still pending", g_config_reqid_pending));
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
req$deploy_state = DeployState($config = g_config_staged);
|
||||||
|
|
||||||
|
# This deployment is now pending. It clears when all agents have
|
||||||
|
# processed their config updates successfully, or any responses time
|
||||||
|
# out.
|
||||||
g_config_reqid_pending = req$id;
|
g_config_reqid_pending = req$id;
|
||||||
|
|
||||||
# Compare the instance configuration to our current one. If it matches,
|
# Compare the instance configuration to our current one. If it matches,
|
||||||
# we can proceed to deploying the new cluster topology. If it does
|
# we can proceed to deploying the new cluster topology. If it does not,
|
||||||
# not, we need to establish connectivity with agents we connect to, or
|
# we need to establish connectivity with agents we connect to, or wait
|
||||||
# wait until all instances that connect to us have done so. Either triggers
|
# until all instances that connect to us have done so. Either case
|
||||||
# a notify_agents_ready event, upon which we then deploy the topology.
|
# triggers a notify_agents_ready event, upon which we deploy the config.
|
||||||
|
|
||||||
# The current & new set of instance names.
|
# The current & new set of instance names.
|
||||||
local insts_current: set[string];
|
local insts_current: set[string];
|
||||||
|
@ -823,7 +903,7 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
|
|
||||||
for ( inst_name in g_instances )
|
for ( inst_name in g_instances )
|
||||||
add insts_current[inst_name];
|
add insts_current[inst_name];
|
||||||
for ( inst in config$instances )
|
for ( inst in g_config_staged$instances )
|
||||||
add insts_new[inst$name];
|
add insts_new[inst$name];
|
||||||
|
|
||||||
# Populate TODO lists for instances we need to drop, check, or add.
|
# Populate TODO lists for instances we need to drop, check, or add.
|
||||||
|
@ -831,7 +911,7 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
insts_to_add = insts_new - insts_current;
|
insts_to_add = insts_new - insts_current;
|
||||||
insts_to_keep = insts_new & insts_current;
|
insts_to_keep = insts_new & insts_current;
|
||||||
|
|
||||||
for ( inst in config$instances )
|
for ( inst in g_config_staged$instances )
|
||||||
{
|
{
|
||||||
if ( inst$name in insts_to_add )
|
if ( inst$name in insts_to_add )
|
||||||
{
|
{
|
||||||
|
@ -869,16 +949,21 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
|
|
||||||
# Updates to instance tables are complete. As a corner case, if the
|
# Updates to instance tables are complete. As a corner case, if the
|
||||||
# config contained no instances (and thus no nodes), we're now done
|
# config contained no instances (and thus no nodes), we're now done
|
||||||
# since there are no agent interactions to wait for:
|
# since there are no agent interactions to wait for (any agents that
|
||||||
|
# need to tear down nodes are doing so asynchronously as part of the
|
||||||
|
# drop_instance() calls above).
|
||||||
if ( |insts_new| == 0 )
|
if ( |insts_new| == 0 )
|
||||||
{
|
{
|
||||||
g_config_current = req$set_configuration_state$config;
|
g_config_deployed = req$deploy_state$config;
|
||||||
g_config_reqid_pending = "";
|
g_config_reqid_pending = "";
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
local res = Management::Result($reqid=req$id, $data=g_config_deployed$id);
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
|
||||||
Management::Request::to_string(req)));
|
Management::Request::to_string(req)));
|
||||||
Broker::publish(Management::Controller::topic,
|
Broker::publish(Management::Controller::topic,
|
||||||
Management::Controller::API::set_configuration_response, req$id, req$results);
|
Management::Controller::API::deploy_response, req$id, req$results);
|
||||||
Management::Request::finish(req$id);
|
Management::Request::finish(req$id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -890,30 +975,6 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
check_instances_ready();
|
check_instances_ready();
|
||||||
}
|
}
|
||||||
|
|
||||||
event Management::Controller::API::get_configuration_request(reqid: string)
|
|
||||||
{
|
|
||||||
Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid));
|
|
||||||
|
|
||||||
local res = Management::Result($reqid=reqid);
|
|
||||||
|
|
||||||
if ( config_is_null(g_config_current) )
|
|
||||||
{
|
|
||||||
# We don't have a live configuration yet.
|
|
||||||
res$success = F;
|
|
||||||
res$error = "no configuration deployed";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
res$data = g_config_current;
|
|
||||||
}
|
|
||||||
|
|
||||||
Management::Log::info(fmt(
|
|
||||||
"tx Management::Controller::API::get_configuration_response %s",
|
|
||||||
Management::result_to_string(res)));
|
|
||||||
Broker::publish(Management::Controller::topic,
|
|
||||||
Management::Controller::API::get_configuration_response, reqid, res);
|
|
||||||
}
|
|
||||||
|
|
||||||
event Management::Controller::API::get_instances_request(reqid: string)
|
event Management::Controller::API::get_instances_request(reqid: string)
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid));
|
Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid));
|
||||||
|
@ -1110,8 +1171,8 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
|
||||||
# don't propagate them to the agents.
|
# don't propagate them to the agents.
|
||||||
if ( |nodes| > 0 )
|
if ( |nodes| > 0 )
|
||||||
{
|
{
|
||||||
# Requested nodes that are in the current configuration:
|
# Requested nodes that are in the deployed configuration:
|
||||||
nodes_final = filter_config_nodes_by_name(nodes);
|
nodes_final = config_filter_nodes_by_name(g_config_deployed, nodes);
|
||||||
# Requested nodes that are not in current configuration:
|
# Requested nodes that are not in current configuration:
|
||||||
local nodes_invalid = nodes - nodes_final;
|
local nodes_invalid = nodes - nodes_final;
|
||||||
|
|
||||||
|
@ -1170,16 +1231,16 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
$success = F,
|
$success = F,
|
||||||
$error = "request timed out");
|
$error = "request timed out");
|
||||||
|
|
||||||
if ( req?$set_configuration_state )
|
if ( req?$deploy_state )
|
||||||
{
|
{
|
||||||
# This timeout means we no longer have a pending request.
|
# This timeout means we no longer have a pending request.
|
||||||
g_config_reqid_pending = "";
|
g_config_reqid_pending = "";
|
||||||
req$results += res;
|
req$results += res;
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s",
|
||||||
Management::Request::to_string(req)));
|
Management::Request::to_string(req)));
|
||||||
Broker::publish(Management::Controller::topic,
|
Broker::publish(Management::Controller::topic,
|
||||||
Management::Controller::API::set_configuration_response, req$id, req$results);
|
Management::Controller::API::deploy_response, req$id, req$results);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( req?$get_nodes_state )
|
if ( req?$get_nodes_state )
|
||||||
|
@ -1241,10 +1302,8 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
|
|
||||||
event zeek_init()
|
event zeek_init()
|
||||||
{
|
{
|
||||||
# Initialize null config at startup. We will replace it once we have
|
g_config_deployed = null_config();
|
||||||
# persistence, and again whenever we complete a client's
|
g_config_staged = null_config();
|
||||||
# set_configuration request.
|
|
||||||
g_config_current = null_config();
|
|
||||||
|
|
||||||
# The controller always listens: it needs to be able to respond to
|
# The controller always listens: it needs to be able to respond to
|
||||||
# clients connecting to it, as well as agents if they connect to the
|
# clients connecting to it, as well as agents if they connect to the
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue