diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 3235de8598..42b870b4e5 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -32,10 +32,12 @@ export { result: Management::Result); - ## The client sends this event to establish a new cluster configuration, - ## including the full cluster topology. The controller processes the update - ## and relays it to the agents. Once each has responded (or a timeout occurs) - ## the controller sends a corresponding response event back to the client. + ## The client sends this event to upload a new cluster configuration, + ## including the full cluster topology. The controller validates the + ## configuration and indicates the outcome in its response event. No + ## deployment takes place yet, and existing deployed configurations and + ## clusters remain intact. To trigger deployment of an uploaded + ## configuration, use :zeek:see:`Management::Controller::API::deploy_request`. ## ## reqid: a request identifier string, echoed in the response event. ## @@ -50,8 +52,10 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a vector of :zeek:see:`Management::Result` records. - ## Each member captures one agent's response. + ## result: a :zeek:see:`Management::Result` vector, indicating whether + ## the controller accepts the configuration. In case of a success, + ## a single result record indicates so. Otherwise, the sequence is + ## all errors, each indicating a configuration validation error. ## global set_configuration_response: event(reqid: string, result: Management::ResultVec); @@ -62,7 +66,10 @@ export { ## ## reqid: a request identifier string, echoed in the response event. ## - global get_configuration_request: event(reqid: string); + ## deployed: when true, returns the deployed configuration (if any), + ## otherwise the staged one (if any). + ## + global get_configuration_request: event(reqid: string, deployed: bool); ## Response to a get_configuration_request event. The controller sends ## this back to the client. @@ -78,6 +85,32 @@ export { result: Management::Result); + ## The client sends this event to trigger deployment of a previously + ## uploaded configuration. The controller deploys the uploaded + ## configuration to all agents involved in running the former + ## configuration or the new one. The agents terminate any previously + ## running cluster nodes and (re-)launch those defined in the new + ## configuration. Once each agent has responded (or a timeout occurs), + ## the controller sends a response event back to the client. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global deploy_request: event(reqid: string); + + ## Response to a deploy_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a vector of :zeek:see:`Management::Result` records. + ## Each member captures the result of launching one cluster + ## node captured in the configuration, or an agent-wide error + ## when the result does not indicate a particular node. + ## + global deploy_response: event(reqid: string, + result: Management::ResultVec); + + ## The client sends this event to request a list of ## :zeek:see:`Management::NodeStatus` records that capture ## the status of Supervisor-managed nodes running on the cluster's diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 5487e85606..4d9d11705d 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -19,10 +19,10 @@ module Management::Controller::Runtime; # tucked-on types. export { ## Request state specific to - ## :zeek:see:`Management::Controller::API::set_configuration_request` and - ## :zeek:see:`Management::Controller::API::set_configuration_response`. - type SetConfigurationState: record { - ## The cluster configuration established with this request + ## :zeek:see:`Management::Controller::API::deploy_request` and + ## :zeek:see:`Management::Controller::API::deploy_response`. + type DeployState: record { + ## The cluster configuration the controller is deploying. config: Management::Configuration; ## Request state for every controller/agent transaction. requests: set[string] &default=set(); @@ -64,7 +64,7 @@ export { } redef record Management::Request::Request += { - set_configuration_state: SetConfigurationState &optional; + deploy_state: DeployState &optional; get_nodes_state: GetNodesState &optional; node_dispatch_state: NodeDispatchState &optional; test_state: TestState &optional; @@ -93,6 +93,12 @@ global drop_instance: function(inst: Management::Instance); global null_config: function(): Management::Configuration; global config_is_null: function(config: Management::Configuration): bool; +# Sends the given configuration to all g_instances members via a +# Management::Agent::API::deploy_request event. To track responses, it builds up +# deployment state in the given request for each recipient. +global config_deploy_to_agents: function(config: Management::Configuration, + req: Management::Request::Request); + # Returns list of names of nodes in the given configuration that require a # listening port. Returns empty list if the config has no such nodes. global config_nodes_lacking_ports: function(config: Management::Configuration): vector of string; @@ -110,11 +116,16 @@ global config_assign_ports: function(config: Management::Configuration); global config_validate: function(config: Management::Configuration, req: Management::Request::Request): bool; -# Rejects the given configuration with the given error message. The function -# adds a non-success result record to the given request and send the -# set_configuration_response event back to the client. It does not call finish() -# on the request. -global send_set_configuration_response_error: function(req: Management::Request::Request, error: string); +# Returns the set of node names in the given configuration that overlaps with +# the set provided. +global config_filter_nodes_by_name: function(config: Management::Configuration, + nodes: set[string]): set[string]; + +# Fails the given deployment request with the given error message. The function +# adds a non-success result record to the request and send a deploy_response +# event back to the client. It does not call finish() on the request. +global send_deploy_response_error: function( + req: Management::Request::Request, error: string); # Given a Broker ID, this returns the endpoint info associated with it. # On error, returns a dummy record with an empty ID string. @@ -145,16 +156,22 @@ global g_instances_known: table[string] of Management::Instance = table(); # instance, and store that in g_instances.) global g_instances_ready: set[string] = set(); -# The request ID of the most recent configuration update that's come in from -# a client. We track it here until we know we are ready to communicate with all -# agents required by the update. +# The request ID of the most recent deployment request from a client. We track +# it here until we know we are ready to communicate with all agents required for +# the update. global g_config_reqid_pending: string = ""; -# The most recent configuration we have successfully deployed. This is also -# the one we send whenever the client requests it. -global g_config_current: Management::Configuration; +# The most recent configuration we have successfully deployed. When no +# deployment has happened yet, this is a "null config" as per null_config(). +global g_config_deployed: Management::Configuration; -function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration) +# The most recently provided configuration by the client. When the controller +# hasn't yet received a configuration, this is a "null config" as per +# null_config(). Successful deployment doesn't invalidate or clear this +# configuration, it remains set. +global g_config_staged: Management::Configuration; + +function config_deploy_to_agents(config: Management::Configuration, req: Management::Request::Request) { for ( name in g_instances ) { @@ -168,7 +185,7 @@ function send_config_to_agents(req: Management::Request::Request, config: Manage # We track the requests sent off to each agent. As the # responses come in, we delete them. Once the requests # set is empty, we respond back to the client. - add req$set_configuration_state$requests[areq$id]; + add req$deploy_state$requests[areq$id]; # We could also broadcast just once on the agent prefix, but # explicit request/response pairs for each agent seems cleaner. @@ -244,6 +261,11 @@ function null_config(): Management::Configuration return Management::Configuration($id=""); } +function config_is_null(config: Management::Configuration): bool + { + return config$id == ""; + } + function config_nodes_lacking_ports(config: Management::Configuration): vector of string { local res: vector of string; @@ -491,6 +513,18 @@ function config_validate(config: Management::Configuration, return F; } +function config_filter_nodes_by_name(config: Management::Configuration, nodes: set[string]) + : set[string] + { + local res: set[string]; + local cluster_nodes: set[string]; + + for ( node in config$nodes ) + add cluster_nodes[node$name]; + + return nodes & cluster_nodes; + } + function find_endpoint(id: string): Broker::EndpointInfo { local peers = Broker::peers(); @@ -505,11 +539,6 @@ function find_endpoint(id: string): Broker::EndpointInfo return Broker::EndpointInfo($id=""); } -function config_is_null(config: Management::Configuration): bool - { - return config$id == ""; - } - function is_instance_connectivity_change(inst: Management::Instance): bool { # If we're not tracking this instance as part of a cluster config, it's @@ -536,18 +565,7 @@ function is_instance_connectivity_change(inst: Management::Instance): bool return F; } -function filter_config_nodes_by_name(nodes: set[string]): set[string] - { - local res: set[string]; - local cluster_nodes: set[string]; - - for ( node in g_config_current$nodes ) - add cluster_nodes[node$name]; - - return nodes & cluster_nodes; - } - -function send_set_configuration_response_error(req: Management::Request::Request, error: string) +function send_deploy_response_error(req: Management::Request::Request, error: string) { local res = Management::Result($reqid=req$id); @@ -555,8 +573,10 @@ function send_set_configuration_response_error(req: Management::Request::Request res$error = error; req$results += res; + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", + Management::Request::to_string(req))); Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Controller::API::deploy_response, req$id, req$results); } event Management::Controller::API::notify_agents_ready(instances: set[string]) @@ -570,14 +590,14 @@ event Management::Controller::API::notify_agents_ready(instances: set[string]) # If there's no pending request, when it's no longer available, or it # doesn't have config state, don't do anything else. - if ( Management::Request::is_null(req) || ! req?$set_configuration_state ) + if ( Management::Request::is_null(req) || ! req?$deploy_state ) return; # All instances requested in the pending configuration update are now # known to us. Send them the config. As they send their response events # we update the client's request state and eventually send the response # event to the it. - send_config_to_agents(req, req$set_configuration_state$config); + config_deploy_to_agents(req$deploy_state$config, req); } event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count) @@ -714,24 +734,26 @@ event Management::Agent::API::deploy_response(reqid: string, results: Management # Mark this request as done by removing it from the table of pending # ones. The following if-check should always be true. - if ( areq$id in req$set_configuration_state$requests ) - delete req$set_configuration_state$requests[areq$id]; + if ( areq$id in req$deploy_state$requests ) + delete req$deploy_state$requests[areq$id]; # If there are any pending requests to the agents, we're # done: we respond once every agent has responed (or we time out). - if ( |req$set_configuration_state$requests| > 0 ) + if ( |req$deploy_state$requests| > 0 ) return; - # All deploy requests to instances are done, so adopt the - # client's requested configuration as the new one and respond back to - # client. - g_config_current = req$set_configuration_state$config; + # All deploy requests to instances are done, so adopt the config + # as the new deployed one and respond back to client. + g_config_deployed = req$deploy_state$config; g_config_reqid_pending = ""; - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", + local res = Management::Result($reqid=req$id, $data=g_config_deployed$id); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", Management::Request::to_string(req))); Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Controller::API::deploy_response, req$id, req$results); Management::Request::finish(req$id); } @@ -739,24 +761,9 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf { Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid)); - local res: Management::Result; local req = Management::Request::create(reqid); + local res = Management::Result($reqid=req$id); - req$set_configuration_state = SetConfigurationState($config = config); - - # At the moment there can only be one pending request. - if ( g_config_reqid_pending != "" ) - { - send_set_configuration_response_error(req, - fmt("request %s still pending", g_config_reqid_pending)); - - Management::Request::finish(req$id); - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - return; - } - - # If the config has problems, reject it: if ( ! config_validate(config, req) ) { Management::Request::finish(req$id); @@ -776,26 +783,99 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf if ( |nodes| > 0 ) { local nodes_str = join_string_vec(nodes, ", "); - send_set_configuration_response_error(req, - fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str)); - Management::Request::finish(req$id); + res$success = F; + res$error = fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str); + req$results += res; + Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Request::finish(req$id); return; } } - # The incoming request is now the pending one. It gets cleared when all - # agents have processed their config updates successfully, or their - # responses time out. + g_config_staged = config; + + # We return the ID of the new configuration in the response. + res$data = config$id; + req$results += res; + + Management::Log::info(fmt( + "tx Management::Controller::API::set_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::set_configuration_response, reqid, req$results); + Management::Request::finish(req$id); + } + +event Management::Controller::API::get_configuration_request(reqid: string, deployed: bool) + { + Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid)); + + local res = Management::Result($reqid=reqid); + local config: Management::Configuration; + + if ( deployed ) + config = g_config_deployed; + else + config = g_config_staged; + + if ( config_is_null(config) ) + { + res$success = F; + res$error = fmt("no %s configuration available", + deployed ? "deployed" : "staged"); + } + else + { + res$data = config; + } + + Management::Log::info(fmt( + "tx Management::Controller::API::get_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::get_configuration_response, reqid, res); + } + +event Management::Controller::API::deploy_request(reqid: string) + { + Management::Log::info(fmt("rx Management::Controller::API::deploy_request %s", reqid)); + + local req = Management::Request::create(reqid); + + # If there's no staged configuration, there's nothing to deploy. + if ( config_is_null(g_config_staged) ) + { + send_deploy_response_error(req, "no configuration available to deploy"); + Management::Request::finish(req$id); + return; + } + + # At the moment there can only be one pending deployment. + if ( g_config_reqid_pending != "" ) + { + send_deploy_response_error(req, + fmt("earlier deployment %s still pending", g_config_reqid_pending)); + Management::Request::finish(req$id); + return; + } + + req$deploy_state = DeployState($config = g_config_staged); + + # This deployment is now pending. It clears when all agents have + # processed their config updates successfully, or any responses time + # out. g_config_reqid_pending = req$id; # Compare the instance configuration to our current one. If it matches, - # we can proceed to deploying the new cluster topology. If it does - # not, we need to establish connectivity with agents we connect to, or - # wait until all instances that connect to us have done so. Either triggers - # a notify_agents_ready event, upon which we then deploy the topology. + # we can proceed to deploying the new cluster topology. If it does not, + # we need to establish connectivity with agents we connect to, or wait + # until all instances that connect to us have done so. Either case + # triggers a notify_agents_ready event, upon which we deploy the config. # The current & new set of instance names. local insts_current: set[string]; @@ -823,7 +903,7 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf for ( inst_name in g_instances ) add insts_current[inst_name]; - for ( inst in config$instances ) + for ( inst in g_config_staged$instances ) add insts_new[inst$name]; # Populate TODO lists for instances we need to drop, check, or add. @@ -831,7 +911,7 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf insts_to_add = insts_new - insts_current; insts_to_keep = insts_new & insts_current; - for ( inst in config$instances ) + for ( inst in g_config_staged$instances ) { if ( inst$name in insts_to_add ) { @@ -869,16 +949,21 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf # Updates to instance tables are complete. As a corner case, if the # config contained no instances (and thus no nodes), we're now done - # since there are no agent interactions to wait for: + # since there are no agent interactions to wait for (any agents that + # need to tear down nodes are doing so asynchronously as part of the + # drop_instance() calls above). if ( |insts_new| == 0 ) { - g_config_current = req$set_configuration_state$config; + g_config_deployed = req$deploy_state$config; g_config_reqid_pending = ""; - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", + local res = Management::Result($reqid=req$id, $data=g_config_deployed$id); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", Management::Request::to_string(req))); Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Controller::API::deploy_response, req$id, req$results); Management::Request::finish(req$id); return; } @@ -890,30 +975,6 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf check_instances_ready(); } -event Management::Controller::API::get_configuration_request(reqid: string) - { - Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid)); - - local res = Management::Result($reqid=reqid); - - if ( config_is_null(g_config_current) ) - { - # We don't have a live configuration yet. - res$success = F; - res$error = "no configuration deployed"; - } - else - { - res$data = g_config_current; - } - - Management::Log::info(fmt( - "tx Management::Controller::API::get_configuration_response %s", - Management::result_to_string(res))); - Broker::publish(Management::Controller::topic, - Management::Controller::API::get_configuration_response, reqid, res); - } - event Management::Controller::API::get_instances_request(reqid: string) { Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid)); @@ -1110,8 +1171,8 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin # don't propagate them to the agents. if ( |nodes| > 0 ) { - # Requested nodes that are in the current configuration: - nodes_final = filter_config_nodes_by_name(nodes); + # Requested nodes that are in the deployed configuration: + nodes_final = config_filter_nodes_by_name(g_config_deployed, nodes); # Requested nodes that are not in current configuration: local nodes_invalid = nodes - nodes_final; @@ -1170,16 +1231,16 @@ event Management::Request::request_expired(req: Management::Request::Request) $success = F, $error = "request timed out"); - if ( req?$set_configuration_state ) + if ( req?$deploy_state ) { # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; req$results += res; - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", Management::Request::to_string(req))); Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Controller::API::deploy_response, req$id, req$results); } if ( req?$get_nodes_state ) @@ -1241,10 +1302,8 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event zeek_init() { - # Initialize null config at startup. We will replace it once we have - # persistence, and again whenever we complete a client's - # set_configuration request. - g_config_current = null_config(); + g_config_deployed = null_config(); + g_config_staged = null_config(); # The controller always listens: it needs to be able to respond to # clients connecting to it, as well as agents if they connect to the