diff --git a/CHANGES b/CHANGES index be99ad6da3..784f3cb6fc 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,28 @@ +5.1.0-dev.98 | 2022-06-22 22:39:32 -0700 + + * Management framework: separate config staging and deployment (Christian Kreibich, Corelight) + + - bump external cluster testsuite + - bump zeek-client + - rename set_configuration events to stage_configuration + - trigger deployment upon when instances are ready + - more resilient node shutdown upon deployment + - re-trigger deployment upon controller launch + - move most deployment handling to internal function + - distinguish internally and externally requested deployments + - track instances by their Broker IDs + - tweak Supervisor event logging + - make helper function a local + - rename "log_level" to "level" + - add "finish" callback to requests + - add a helper for rendering result vectors to a string + - agents now skip re-deployment of current config + - suppress notify_agent_hello upon Supervisor peering + - introduce state machine for configs and persist them + - introduce deployment API in controller + - rename agent "set_configuration" to "deploy" + - consistency fixes to the Result record + 5.1.0-dev.75 | 2022-06-22 12:06:00 -0700 * Provide zeek-client by default (Christian Kreibich, Corelight) diff --git a/VERSION b/VERSION index 376ad6ea7f..dd7091e91f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.1.0-dev.75 +5.1.0-dev.98 diff --git a/auxil/zeek-client b/auxil/zeek-client index 0f1ee6489e..d92f17cfd8 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit 0f1ee6489ecc07767cb1f37beb312eae69e5a6d8 +Subproject commit d92f17cfd882dd1edbd3f560181d84f69fbc8037 diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index ba9c0b22bd..857f79c232 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -15,29 +15,32 @@ export { # Agent API events - ## The controller sends this event to convey a new cluster configuration - ## to the agent. Once processed, the agent responds with the response - ## event. + ## The controller sends this event to deploy a cluster configuration to + ## this instance. Once processed, the agent responds with a + ## :zeek:see:`Management::Agent::API::deploy_response` event. event. ## ## reqid: a request identifier string, echoed in the response event. ## - ## config: a :zeek:see:`Management::Configuration` record - ## describing the cluster topology. Note that this contains the full - ## topology, not just the part pertaining to this agent. That's because - ## the cluster framework requires full cluster visibility to establish - ## the needed peerings. + ## config: a :zeek:see:`Management::Configuration` record describing the + ## cluster topology. This contains the full topology, not just the + ## part pertaining to this instance: the cluster framework requires + ## full cluster visibility to establish needed peerings. ## - global set_configuration_request: event(reqid: string, - config: Management::Configuration); + ## force: whether to re-deploy (i.e., restart its Zeek cluster nodes) + ## when the agent already runs this configuration. This relies on + ## the config ID to determine config equality. + ## + global deploy_request: event(reqid: string, + config: Management::Configuration, force: bool &default=F); - ## Response to a set_configuration_request event. The agent sends + ## Response to a deploy_request event. The agent sends ## this back to the controller. ## ## reqid: the request identifier used in the request event. ## ## result: the result record. ## - global set_configuration_response: event(reqid: string, + global deploy_response: event(reqid: string, result: Management::ResultVec); diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 4757f5a3ec..3ec546f76e 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -23,11 +23,14 @@ module Management::Agent::Runtime; export { ## Request state specific to the agent's Supervisor interactions. type SupervisorState: record { - node: string; ##< Name of the node the Supervisor is acting on. + ## Name of the node the Supervisor is acting on, if applicable. + node: string &default=""; + ## The result of a status request. + status: Supervisor::Status &optional; }; - ## Request state for set_configuration requests. - type SetConfigurationState: record { + ## Request state for deploy requests. + type DeployState: record { ## Zeek cluster nodes the provided configuration requested ## and which have not yet checked in with the agent. nodes_pending: set[string]; @@ -62,7 +65,7 @@ export { # members with _agent to disambiguate. redef record Management::Request::Request += { supervisor_state_agent: SupervisorState &optional; - set_configuration_state_agent: SetConfigurationState &optional; + deploy_state_agent: DeployState &optional; node_dispatch_state_agent: NodeDispatchState &optional; }; @@ -80,11 +83,22 @@ redef Management::Request::timeout_interval = 5 sec; # Returns the effective agent topic for this agent. global agent_topic: function(): string; -# Finalizes a set_configuration_request transaction: cleans up remaining state -# and sends response event. -global send_set_configuration_response: function(req: Management::Request::Request); +# Returns the effective supervisor's address and port, to peer with +global supervisor_network_info: function(): Broker::NetworkInfo; -# The global configuration as passed to us by the controller +# Finalizes a deploy_request transaction: cleans up remaining state +# and sends response event. +global send_deploy_response: function(req: Management::Request::Request); + +# Callback completing a deploy_request after the Supervisor has delivered +# a status response. +global deploy_request_finish: function(req: Management::Request::Request); + +# Callback completing a get_nodes_request after the Supervisor has delivered +# a status response. +global get_nodes_request_finish: function(req: Management::Request::Request); + +# The global configuration, as deployed by the controller. global g_config: Management::Configuration; # A map to make other instance info accessible @@ -93,10 +107,9 @@ global g_instances: table[string] of Management::Instance; # A map for the nodes we run on this instance, via this agent. global g_nodes: table[string] of Management::Node; -# The request ID of the most recent configuration update from the controller. -# We track it here until the nodes_pending set in the corresponding request's -# SetConfigurationState is cleared out, or the corresponding request state hits -# a timeout. +# The request ID of the most recent config deployment from the controller. We +# track it until the nodes_pending set in the corresponding request's +# DeployState is cleared out, or the corresponding request state hits a timeout. global g_config_reqid_pending: string = ""; # The complete node map employed by the supervisor to describe the cluster @@ -115,7 +128,20 @@ function agent_topic(): string return Management::Agent::topic_prefix + "/" + epi$id; } -function send_set_configuration_response(req: Management::Request::Request) +function supervisor_network_info(): Broker::NetworkInfo + { + # The Supervisor's address defaults to Broker's default, which + # relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker + # internally falls back to listening on any; we pick 127.0.0.1. + local address = Broker::default_listen_address; + + if ( address == "" ) + address = "127.0.0.1"; + + return Broker::NetworkInfo($address=address, $bound_port=Broker::default_port); + } + +function send_deploy_response(req: Management::Request::Request) { local node: string; local res: Management::Result; @@ -128,7 +154,7 @@ function send_set_configuration_response(req: Management::Request::Request) $instance = Management::Agent::get_name(), $node = node); - if ( node in req$set_configuration_state_agent$nodes_pending ) + if ( node in req$deploy_state_agent$nodes_pending ) { # This node failed. res$success = F; @@ -142,10 +168,10 @@ function send_set_configuration_response(req: Management::Request::Request) req$results[|req$results|] = res; } - Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s", Management::result_to_string(res))); Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, req$id, req$results); + Management::Agent::API::deploy_response, req$id, req$results); Management::Request::finish(req$id); @@ -207,6 +233,8 @@ event Management::Supervisor::API::notify_node_exit(node: string, outputs: Manag event SupervisorControl::create_response(reqid: string, result: string) { + Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result)); + local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; @@ -227,6 +255,8 @@ event SupervisorControl::create_response(reqid: string, result: string) event SupervisorControl::destroy_response(reqid: string, result: bool) { + Management::Log::info(fmt("rx SupervisorControl::destroy_response %s %s", reqid, result)); + local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; @@ -249,28 +279,48 @@ function supervisor_create(nc: Supervisor::NodeConfig) { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = nc$name); + + Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::create_request, req$id, nc); - Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); } function supervisor_destroy(node: string) { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = node); + + Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request, req$id, node); - Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); } -event Management::Agent::API::set_configuration_request(reqid: string, config: Management::Configuration) +event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool) { - Management::Log::info(fmt("rx Management::Agent::API::set_configuration_request %s", reqid)); + Management::Log::info(fmt("rx Management::Agent::API::deploy_request %s %s", reqid, config$id)); local nodename: string; local node: Management::Node; local nc: Supervisor::NodeConfig; - local msg: string; + local res: Management::Result; + + # Special case: we're already running this configuration. + if ( g_config$id == config$id && ! force ) + { + res = Management::Result( + $reqid = reqid, + $instance = Management::Agent::get_name()); + + Management::Log::info(fmt("already running config %s", config$id)); + Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s", + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::deploy_response, reqid, vector(res)); + return; + } + + local req = Management::Request::create(reqid); + req$deploy_state_agent = DeployState(); # Adopt the global configuration provided. The act of trying to launch # the requested nodes perturbs any existing ones one way or another, so @@ -282,43 +332,65 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M for ( inst in config$instances ) g_instances[inst$name] = inst; - # Terminate existing nodes - for ( nodename in g_nodes ) - supervisor_destroy(nodename); + local areq = Management::Request::create(); + areq$parent_id = req$id; + areq$finish = deploy_request_finish; + areq$supervisor_state_agent = SupervisorState(); + + Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id)); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::status_request, areq$id, ""); + } + +function deploy_request_finish(areq: Management::Request::Request) + { + local status = areq$supervisor_state_agent$status; + + for ( nodename in status$nodes ) + { + if ( "ZEEK_MANAGEMENT_NODE" in status$nodes[nodename]$node$env ) + next; + supervisor_destroy(status$nodes[nodename]$node$name); + } + + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + local res: Management::Result; + local nc: Supervisor::NodeConfig; + local node: Management::Node; # Refresh the cluster and nodes tables g_nodes = table(); g_cluster = table(); # Special case: the config contains no nodes. We can respond right away. - if ( |config$nodes| == 0 ) + if ( |g_config$nodes| == 0 ) { g_config_reqid_pending = ""; - local res = Management::Result( - $reqid = reqid, + res = Management::Result( + $reqid = req$id, $instance = Management::Agent::get_name()); - Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::Log::info(fmt("tx Management::Agent::API::deploy_response %s", Management::result_to_string(res))); Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, reqid, vector(res)); + Management::Agent::API::deploy_response, req$id, vector(res)); return; } - local req = Management::Request::create(reqid); - req$set_configuration_state_agent = SetConfigurationState(); - # Establish this request as the pending one: - g_config_reqid_pending = reqid; + g_config_reqid_pending = req$id; - for ( node in config$nodes ) + for ( node in g_config$nodes ) { # Filter the node set down to the ones this agent manages. if ( node$instance == Management::Agent::get_name() ) { g_nodes[node$name] = node; - add req$set_configuration_state_agent$nodes_pending[node$name]; + add req$deploy_state_agent$nodes_pending[node$name]; } # The cluster and supervisor frameworks require a port for every @@ -399,25 +471,54 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M # At this point we await Management::Node::API::notify_node_hello events # from the new nodes, or a timeout, whichever happens first. These - # trigger the set_configuration_response event back to the controller. + # update the pending nodes in the request state, and eventually trigger + # the deploy_response event back to the controller. } event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) { + Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid)); local req = Management::Request::lookup(reqid); + + if ( Management::Request::is_null(req) ) + return; + if ( ! req?$supervisor_state_agent ) + return; + + req$supervisor_state_agent$status = result; + Management::Request::finish(reqid); + } + +event Management::Agent::API::get_nodes_request(reqid: string) + { + Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid)); + + local req = Management::Request::create(reqid); + + local areq = Management::Request::create(); + areq$parent_id = req$id; + areq$finish = get_nodes_request_finish; + areq$supervisor_state_agent = SupervisorState(); + + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::status_request, areq$id, ""); + Management::Log::info(fmt("issued supervisor status, %s", areq$id)); + } + +function get_nodes_request_finish(areq: Management::Request::Request) + { + local req = Management::Request::lookup(areq$parent_id); if ( Management::Request::is_null(req) ) return; - Management::Request::finish(reqid); - - local res = Management::Result( - $reqid = req$parent_id, $instance = Management::Agent::get_name()); + local res = Management::Result($reqid=req$id, + $instance=Management::Agent::get_name()); local node_statuses: Management::NodeStatusVec; - for ( node in result$nodes ) + for ( node in areq$supervisor_state_agent$status$nodes ) { - local sns = result$nodes[node]; # Supervisor node status + local sns = areq$supervisor_state_agent$status$nodes[node]; # Supervisor node status local cns = Management::NodeStatus( $node=node, $state=Management::PENDING); @@ -488,19 +589,8 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s", Management::result_to_string(res))); Broker::publish(agent_topic(), - Management::Agent::API::get_nodes_response, req$parent_id, res); - } - -event Management::Agent::API::get_nodes_request(reqid: string) - { - Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid)); - - local req = Management::Request::create(); - req$parent_id = reqid; - - Broker::publish(SupervisorControl::topic_prefix, - SupervisorControl::status_request, req$id, ""); - Management::Log::info(fmt("issued supervisor status, %s", req$id)); + Management::Agent::API::get_nodes_response, req$id, res); + Management::Request::finish(req$id); } event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) @@ -637,9 +727,11 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto add req$node_dispatch_state_agent$requests[node]; else { - res = Management::Result($reqid=reqid, $node=node); - res$success = F; - res$error = fmt("cluster node %s not in runnning state", node); + res = Management::Result($reqid=reqid, + $instance = Management::Agent::get_name(), + $success = F, + $error = fmt("cluster node %s not in runnning state", node), + $node=node); req$results += res; } } @@ -690,7 +782,7 @@ event Management::Agent::API::agent_standby_request(reqid: string) # peered/connected -- otherwise there's nothing we can do here via # Broker anyway), mainly to keep open the possibility of running # cluster nodes again later. - event Management::Agent::API::set_configuration_request("", Management::Configuration()); + event Management::Agent::API::deploy_request("", Management::Configuration()); local res = Management::Result( $reqid = reqid, @@ -710,32 +802,33 @@ event Management::Node::API::notify_node_hello(node: string) if ( node in g_nodes ) g_nodes[node]$state = Management::RUNNING; - # Look up the set_configuration request this node launch was part of (if + # Look up the deploy request this node launch was part of (if # any), and check it off. If it was the last node we expected to launch, # finalize the request and respond to the controller. local req = Management::Request::lookup(g_config_reqid_pending); - if ( Management::Request::is_null(req) || ! req?$set_configuration_state_agent ) + if ( Management::Request::is_null(req) || ! req?$deploy_state_agent ) return; - if ( node in req$set_configuration_state_agent$nodes_pending ) + if ( node in req$deploy_state_agent$nodes_pending ) { - delete req$set_configuration_state_agent$nodes_pending[node]; - if ( |req$set_configuration_state_agent$nodes_pending| == 0 ) - send_set_configuration_response(req); + delete req$deploy_state_agent$nodes_pending[node]; + if ( |req$deploy_state_agent$nodes_pending| == 0 ) + send_deploy_response(req); } } event Management::Request::request_expired(req: Management::Request::Request) { local res = Management::Result($reqid=req$id, + $instance = Management::Agent::get_name(), $success = F, $error = "request timed out"); - if ( req?$set_configuration_state_agent ) + if ( req?$deploy_state_agent ) { - send_set_configuration_response(req); + send_deploy_response(req); # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; } @@ -743,10 +836,16 @@ event Management::Request::request_expired(req: Management::Request::Request) event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { - # This does not (cannot?) immediately verify that the new peer - # is in fact a controller, so we might send this in vain. - # Controllers register the agent upon receipt of the event. + Management::Log::debug(fmt("broker peer %s added: %s", peer, msg)); + local sni = supervisor_network_info(); + + if ( peer$network$address == sni$address && peer$network$bound_port == sni$bound_port ) + return; + + # Supervisor aside, this does not (cannot?) immediately verify that the + # new peer is in fact a controller, so we might send this in vain. + # Controllers register the agent upon receipt of the event. local epi = Management::Agent::endpoint_info(); Broker::publish(agent_topic(), @@ -765,14 +864,9 @@ event zeek_init() local epi = Management::Agent::endpoint_info(); # The agent needs to peer with the supervisor -- this doesn't currently - # happen automatically. The address defaults to Broker's default, which - # relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker - # internally falls back to listening on any; we pick 127.0.0.1. - local supervisor_addr = Broker::default_listen_address; - if ( supervisor_addr == "" ) - supervisor_addr = "127.0.0.1"; - - Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry); + # happen automatically. + local sni = supervisor_network_info(); + Broker::peer(sni$address, sni$bound_port, Broker::default_listen_retry); # Agents need receive communication targeted at it, any responses # from the supervisor, and any responses from cluster nodes. diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index b0897ed20a..9b5de9087c 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -1,7 +1,7 @@ ##! The event API of cluster controllers. Most endpoints consist of event pairs, -##! where the controller answers a zeek-client request event with a -##! corresponding response event. Such event pairs share the same name prefix -##! and end in "_request" and "_response", respectively. +##! where the controller answers the client's request event with a corresponding +##! response event. Such event pairs share the same name prefix and end in +##! "_request" and "_response", respectively. @load policy/frameworks/management/types @@ -9,11 +9,11 @@ module Management::Controller::API; export { ## A simple versioning scheme, used to track basic compatibility of - ## controller, agents, and zeek-client. + ## controller, agents, and the client. const version = 1; - ## zeek-client sends this event to request a list of the currently + ## The client sends this event to request a list of the currently ## peered agents/instances. ## ## reqid: a request identifier string, echoed in the response event. @@ -32,37 +32,44 @@ export { result: Management::Result); - ## zeek-client sends this event to establish a new cluster configuration, - ## including the full cluster topology. The controller processes the update - ## and relays it to the agents. Once each has responded (or a timeout occurs) - ## the controller sends a corresponding response event back to the client. + ## Upload a configuration to the controller for later deployment. + ## The client sends this event to the controller, which validates the + ## configuration and indicates the outcome in its response event. No + ## deployment takes place yet, and existing deployed configurations and + ## the running Zeek cluster remain intact. To trigger deployment of an uploaded + ## configuration, use :zeek:see:`Management::Controller::API::deploy_request`. ## ## reqid: a request identifier string, echoed in the response event. ## ## config: a :zeek:see:`Management::Configuration` record ## specifying the cluster configuration. ## - global set_configuration_request: event(reqid: string, + global stage_configuration_request: event(reqid: string, config: Management::Configuration); - ## Response to a set_configuration_request event. The controller sends - ## this back to the client. + ## Response to a stage_configuration_request event. The controller sends + ## this back to the client, conveying validation results. ## ## reqid: the request identifier used in the request event. ## - ## result: a vector of :zeek:see:`Management::Result` records. - ## Each member captures one agent's response. + ## result: a :zeek:see:`Management::Result` vector, indicating whether + ## the controller accepts the configuration. In case of a success, + ## a single result record indicates so. Otherwise, the sequence is + ## all errors, each indicating a configuration validation error. ## - global set_configuration_response: event(reqid: string, + global stage_configuration_response: event(reqid: string, result: Management::ResultVec); - ## zeek-client sends this event to retrieve the currently deployed - ## cluster configuration. + ## The client sends this event to retrieve the controller's current + ## cluster configuration(s). ## ## reqid: a request identifier string, echoed in the response event. ## - global get_configuration_request: event(reqid: string); + ## deployed: when true, returns the deployed configuration (if any), + ## otherwise the staged one (if any). + ## + global get_configuration_request: event(reqid: string, deployed: bool); ## Response to a get_configuration_request event. The controller sends ## this back to the client. @@ -78,7 +85,36 @@ export { result: Management::Result); - ## zeek-client sends this event to request a list of + ## Trigger deployment of a previously staged configuration. The client + ## sends this event to the controller, which deploys the configuration + ## to the agents. Agents then terminate any previously running cluster + ## nodes and (re-)launch those defined in the new configuration. Once + ## each agent has responded (or a timeout occurs), the controller sends + ## a response event back to the client, aggregating the results from the + ## agents. The controller keeps the staged configuration available for + ## download, or re-deployment. In addition, the deployed configuration + ## becomes available for download as well, with any augmentations + ## (e.g. node ports filled in by auto-assignment) reflected. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global deploy_request: event(reqid: string); + + ## Response to a deploy_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a vector of :zeek:see:`Management::Result` records. + ## Each member captures the result of launching one cluster + ## node captured in the configuration, or an agent-wide error + ## when the result does not indicate a particular node. + ## + global deploy_response: event(reqid: string, + result: Management::ResultVec); + + + ## The client sends this event to request a list of ## :zeek:see:`Management::NodeStatus` records that capture ## the status of Supervisor-managed nodes running on the cluster's ## instances. @@ -102,7 +138,7 @@ export { result: Management::ResultVec); - ## zeek-client sends this event to retrieve the current value of a + ## The client sends this event to retrieve the current value of a ## variable in Zeek's global namespace, referenced by the given ## identifier (i.e., variable name). The controller asks all agents ## to retrieve this value from each cluster node, accumulates the diff --git a/scripts/policy/frameworks/management/controller/config.zeek b/scripts/policy/frameworks/management/controller/config.zeek index 01b8445a2b..eb1e0ca2d6 100644 --- a/scripts/policy/frameworks/management/controller/config.zeek +++ b/scripts/policy/frameworks/management/controller/config.zeek @@ -59,6 +59,10 @@ export { ## output gets garbled. const directory = "" &redef; + ## The name of the Broker store the controller uses to persist internal + ## state to disk. + const store_name = "controller"; + ## Returns the effective name of the controller. global get_name: function(): string; diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index ef6047e3bf..a5c1840b18 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -18,12 +18,23 @@ module Management::Controller::Runtime; # Request record below. Without it, it fails to establish link targets for the # tucked-on types. export { + ## A cluster configuration uploaded by the client goes through multiple + ## states on its way to deployment. + type ConfigState: enum { + STAGED, ##< As provided by the client. + READY, ##< Necessary updates made, e.g. ports filled in. + DEPLOYED, ##< Sent off to the agents for deployment. + }; + ## Request state specific to - ## :zeek:see:`Management::Controller::API::set_configuration_request` and - ## :zeek:see:`Management::Controller::API::set_configuration_response`. - type SetConfigurationState: record { - ## The cluster configuration established with this request + ## :zeek:see:`Management::Controller::API::deploy_request` and + ## :zeek:see:`Management::Controller::API::deploy_response`. + type DeployState: record { + ## The cluster configuration the controller is deploying. config: Management::Configuration; + ## Whether this is a controller-internal deployment, or + ## triggered via a request by a remote peer/client. + is_internal: bool &default=F; ## Request state for every controller/agent transaction. requests: set[string] &default=set(); }; @@ -64,7 +75,7 @@ export { } redef record Management::Request::Request += { - set_configuration_state: SetConfigurationState &optional; + deploy_state: DeployState &optional; get_nodes_state: GetNodesState &optional; node_dispatch_state: NodeDispatchState &optional; test_state: TestState &optional; @@ -89,9 +100,11 @@ global add_instance: function(inst: Management::Instance); # agent_standby_request, so it drops its current cluster nodes (if any). global drop_instance: function(inst: Management::Instance); -# Helpers to simplify handling of config records. -global null_config: function(): Management::Configuration; -global is_null_config: function(config: Management::Configuration): bool; +# Sends the given configuration to all g_instances members via a +# Management::Agent::API::deploy_request event. To track responses, it builds up +# deployment state in the given request for each recipient. +global config_deploy_to_agents: function(config: Management::Configuration, + req: Management::Request::Request); # Returns list of names of nodes in the given configuration that require a # listening port. Returns empty list if the config has no such nodes. @@ -110,11 +123,10 @@ global config_assign_ports: function(config: Management::Configuration); global config_validate: function(config: Management::Configuration, req: Management::Request::Request): bool; -# Rejects the given configuration with the given error message. The function -# adds a non-success result record to the given request and send the -# set_configuration_response event back to the client. It does not call finish() -# on the request. -global send_set_configuration_response_error: function(req: Management::Request::Request, error: string); +# Returns the set of node names in the given configuration that overlaps with +# the set provided. +global config_filter_nodes_by_name: function(config: Management::Configuration, + nodes: set[string]): set[string]; # Given a Broker ID, this returns the endpoint info associated with it. # On error, returns a dummy record with an empty ID string. @@ -145,16 +157,21 @@ global g_instances_known: table[string] of Management::Instance = table(); # instance, and store that in g_instances.) global g_instances_ready: set[string] = set(); -# The request ID of the most recent configuration update that's come in from -# a client. We track it here until we know we are ready to communicate with all -# agents required by the update. +# A map from Broker ID values to instance names. When we lose a peering, this +# helps us understand whether it was an instance, and if so, update its state +# accordingly. +global g_instances_by_id: table[string] of string; + +# The request ID of the most recent deployment request from a client. We track +# it here until we know we are ready to communicate with all agents required for +# the update. global g_config_reqid_pending: string = ""; -# The most recent configuration we have successfully deployed. This is also -# the one we send whenever the client requests it. -global g_config_current: Management::Configuration; +# This table tracks a cluster configuration through its states to deployment. +global g_configs: table[ConfigState] of Management::Configuration + &broker_allow_complex_type &backend=Broker::SQLITE; -function send_config_to_agents(req: Management::Request::Request, config: Management::Configuration) +function config_deploy_to_agents(config: Management::Configuration, req: Management::Request::Request) { for ( name in g_instances ) { @@ -168,12 +185,12 @@ function send_config_to_agents(req: Management::Request::Request, config: Manage # We track the requests sent off to each agent. As the # responses come in, we delete them. Once the requests # set is empty, we respond back to the client. - add req$set_configuration_state$requests[areq$id]; + add req$deploy_state$requests[areq$id]; # We could also broadcast just once on the agent prefix, but # explicit request/response pairs for each agent seems cleaner. - Management::Log::info(fmt("tx Management::Agent::API::set_configuration_request %s to %s", areq$id, name)); - Broker::publish(agent_topic, Management::Agent::API::set_configuration_request, areq$id, config); + Management::Log::info(fmt("tx Management::Agent::API::deploy_request %s to %s", areq$id, name)); + Broker::publish(agent_topic, Management::Agent::API::deploy_request, areq$id, config, F); } } @@ -239,11 +256,6 @@ function drop_instance(inst: Management::Instance) Management::Log::info(fmt("dropped instance %s", inst$name)); } -function null_config(): Management::Configuration - { - return Management::Configuration($id=""); - } - function config_nodes_lacking_ports(config: Management::Configuration): vector of string { local res: vector of string; @@ -491,6 +503,18 @@ function config_validate(config: Management::Configuration, return F; } +function config_filter_nodes_by_name(config: Management::Configuration, nodes: set[string]) + : set[string] + { + local res: set[string]; + local cluster_nodes: set[string]; + + for ( node in config$nodes ) + add cluster_nodes[node$name]; + + return nodes & cluster_nodes; + } + function find_endpoint(id: string): Broker::EndpointInfo { local peers = Broker::peers(); @@ -505,11 +529,6 @@ function find_endpoint(id: string): Broker::EndpointInfo return Broker::EndpointInfo($id=""); } -function is_null_config(config: Management::Configuration): bool - { - return config$id == ""; - } - function is_instance_connectivity_change(inst: Management::Instance): bool { # If we're not tracking this instance as part of a cluster config, it's @@ -536,48 +555,148 @@ function is_instance_connectivity_change(inst: Management::Instance): bool return F; } -function filter_config_nodes_by_name(nodes: set[string]): set[string] +function deploy(req: Management::Request::Request) { - local res: set[string]; - local cluster_nodes: set[string]; + # This deployment is now pending. It clears when all agents have + # processed their config updates successfully, or any responses time + # out. + g_config_reqid_pending = req$id; - for ( node in g_config_current$nodes ) - add cluster_nodes[node$name]; + # Compare the instance configuration to our current one. If it matches, + # we can proceed to deploying the new cluster topology. If it does not, + # we need to establish connectivity with agents we connect to, or wait + # until all instances that connect to us have done so. Either case + # triggers a notify_agents_ready event, upon which we deploy the config. - return nodes & cluster_nodes; - } + # The current & new set of instance names. + local insts_current: set[string]; + local insts_new: set[string]; -function send_set_configuration_response_error(req: Management::Request::Request, error: string) - { - local res = Management::Result($reqid=req$id); + # A set of current instances not contained in the new config. + # Those will need to get dropped. + local insts_to_drop: set[string]; - res$success = F; - res$error = error; - req$results += res; + # The opposite: new instances not yet in our current set. Those we will need + # to establish contact with (or they with us). + local insts_to_add: set[string]; - Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + # The overlap: instances in both the current and new set. For those we verify + # that we're actually dealign with the same entities, and might need to re- + # connect if not. + local insts_to_keep: set[string]; + + # Alternative representation of insts_to_add, directly providing the instances. + local insts_to_peer: table[string] of Management::Instance; + + # Helpful locals. + local inst_name: string; + local inst: Management::Instance; + + for ( inst_name in g_instances ) + add insts_current[inst_name]; + for ( inst in g_configs[READY]$instances ) + add insts_new[inst$name]; + + # Populate TODO lists for instances we need to drop, check, or add. + insts_to_drop = insts_current - insts_new; + insts_to_add = insts_new - insts_current; + insts_to_keep = insts_new & insts_current; + + for ( inst in g_configs[READY]$instances ) + { + if ( inst$name in insts_to_add ) + { + insts_to_peer[inst$name] = inst; + next; + } + + # Focus on the keepers: check for change in identity/location. + if ( inst$name !in insts_to_keep ) + next; + + if ( is_instance_connectivity_change(inst) ) + { + # The endpoint looks different. We drop the current one + # and need to re-establish connectivity with the new + # one. + add insts_to_drop[inst$name]; + add insts_to_add[inst$name]; + } + } + + # Process our TODO lists. Handle drops first, then additions, in + # case we need to re-establish connectivity with an agent. + + for ( inst_name in insts_to_drop ) + { + Management::Log::debug(fmt("dropping instance %s", inst_name)); + drop_instance(g_instances[inst_name]); + } + for ( inst_name in insts_to_peer ) + { + Management::Log::debug(fmt("adding instance %s", inst_name)); + add_instance(insts_to_peer[inst_name]); + } + + # Updates to instance tables are complete. As a corner case, if the + # config contained no instances (and thus no nodes), we're now done + # since there are no agent interactions to wait for (any agents that + # need to tear down nodes are doing so asynchronously as part of the + # drop_instance() calls above). + if ( |insts_new| == 0 ) + { + local config = req$deploy_state$config; + g_configs[DEPLOYED] = config; + g_config_reqid_pending = ""; + + local res = Management::Result($reqid=req$id, $data=config$id); + req$results += res; + + if ( ! req$deploy_state$is_internal ) + { + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::deploy_response, req$id, req$results); + } + + Management::Request::finish(req$id); + return; + } } event Management::Controller::API::notify_agents_ready(instances: set[string]) { local insts = Management::Util::set_to_vector(instances); + local req: Management::Request::Request; Management::Log::info(fmt("rx Management::Controller::API:notify_agents_ready %s", join_string_vec(insts, ", "))); - local req = Management::Request::lookup(g_config_reqid_pending); + # If we're not currently deploying a configuration, but have a deployed + # configuration, trigger a deployment at this point. Some of our agents + # might have restarted, and need to get in sync with us. Agents already + # running this configuration will do nothing. + if ( g_config_reqid_pending == "" && DEPLOYED in g_configs ) + { + req = Management::Request::create(); + req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T); + Management::Log::info(fmt("no deployment in progress, triggering via %s", req$id)); + deploy(req); + } + + req = Management::Request::lookup(g_config_reqid_pending); # If there's no pending request, when it's no longer available, or it # doesn't have config state, don't do anything else. - if ( Management::Request::is_null(req) || ! req?$set_configuration_state ) + if ( Management::Request::is_null(req) || ! req?$deploy_state ) return; # All instances requested in the pending configuration update are now # known to us. Send them the config. As they send their response events # we update the client's request state and eventually send the response # event to the it. - send_config_to_agents(req, req$set_configuration_state$config); + config_deploy_to_agents(req$deploy_state$config, req); } event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count) @@ -617,6 +736,12 @@ event Management::Agent::API::notify_agent_hello(instance: string, id: string, c if ( ei$id != "" && ei?$network ) { + if ( instance !in g_instances_known ) + Management::Log::debug(fmt("instance %s newly checked in", instance)); + else + Management::Log::debug(fmt("instance %s checked in again", instance)); + + g_instances_by_id[id] = instance; g_instances_known[instance] = Management::Instance( $name=instance, $host=to_addr(ei$network$address)); @@ -625,8 +750,6 @@ event Management::Agent::API::notify_agent_hello(instance: string, id: string, c # We connected to this agent, note down its port. g_instances_known[instance]$listen_port = ei$network$bound_port; } - - Management::Log::debug(fmt("instance %s now known to us", instance)); } if ( instance in g_instances && instance !in g_instances_ready ) @@ -654,7 +777,6 @@ event Management::Agent::API::agent_welcome_response(reqid: string, result: Mana # An agent we've been waiting to hear back from is ready for cluster # work. Double-check we still want it, otherwise drop it. - if ( ! result$success || result$instance !in g_instances ) { Management::Log::info(fmt( @@ -686,9 +808,10 @@ event Management::Agent::API::notify_log(instance: string, msg: string, node: st # XXX TODO } -event Management::Agent::API::set_configuration_response(reqid: string, results: Management::ResultVec) +event Management::Agent::API::deploy_response(reqid: string, results: Management::ResultVec) { - Management::Log::info(fmt("rx Management::Agent::API::set_configuration_response %s", reqid)); + Management::Log::info(fmt("rx Management::Agent::API::deploy_response %s %s", + reqid, Management::result_vec_to_string(results))); # Retrieve state for the request we just got a response to local areq = Management::Request::lookup(reqid); @@ -715,197 +838,109 @@ event Management::Agent::API::set_configuration_response(reqid: string, results: # Mark this request as done by removing it from the table of pending # ones. The following if-check should always be true. - if ( areq$id in req$set_configuration_state$requests ) - delete req$set_configuration_state$requests[areq$id]; + if ( areq$id in req$deploy_state$requests ) + delete req$deploy_state$requests[areq$id]; # If there are any pending requests to the agents, we're # done: we respond once every agent has responed (or we time out). - if ( |req$set_configuration_state$requests| > 0 ) + if ( |req$deploy_state$requests| > 0 ) return; - # All set_configuration requests to instances are done, so adopt the - # client's requested configuration as the new one and respond back to - # client. - g_config_current = req$set_configuration_state$config; + # All deploy requests to instances are done, so adopt the config + # as the new deployed one and respond back to client. + local config = req$deploy_state$config; + g_configs[DEPLOYED] = config; g_config_reqid_pending = ""; - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + local res = Management::Result($reqid=req$id, $data=config$id); + req$results += res; + + if ( ! req$deploy_state$is_internal ) + { + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::deploy_response, req$id, req$results); + } + Management::Request::finish(req$id); } -event Management::Controller::API::set_configuration_request(reqid: string, config: Management::Configuration) +event Management::Controller::API::stage_configuration_request(reqid: string, config: Management::Configuration) { - Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid)); + Management::Log::info(fmt("rx Management::Controller::API::stage_configuration_request %s", reqid)); - local res: Management::Result; local req = Management::Request::create(reqid); + local res = Management::Result($reqid=req$id); + local config_copy: Management::Configuration; - req$set_configuration_state = SetConfigurationState($config = config); - - # At the moment there can only be one pending request. - if ( g_config_reqid_pending != "" ) - { - send_set_configuration_response_error(req, - fmt("request %s still pending", g_config_reqid_pending)); - - Management::Request::finish(req$id); - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - return; - } - - # If the config has problems, reject it: if ( ! config_validate(config, req) ) { Management::Request::finish(req$id); - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", + Management::Log::info(fmt("tx Management::Controller::API::stage_configuration_response %s", Management::Request::to_string(req))); Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + Management::Controller::API::stage_configuration_response, req$id, req$results); return; } - if ( Management::Controller::auto_assign_ports ) - config_assign_ports(config); - else + if ( ! Management::Controller::auto_assign_ports ) { local nodes = config_nodes_lacking_ports(config); if ( |nodes| > 0 ) { local nodes_str = join_string_vec(nodes, ", "); - send_set_configuration_response_error(req, - fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str)); - Management::Request::finish(req$id); - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", + res$success = F; + res$error = fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::stage_configuration_response %s", Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::stage_configuration_response, req$id, req$results); + Management::Request::finish(req$id); return; } } - # The incoming request is now the pending one. It gets cleared when all - # agents have processed their config updates successfully, or their - # responses time out. - g_config_reqid_pending = req$id; + g_configs[STAGED] = config; + config_copy = copy(config); - # Compare the instance configuration to our current one. If it matches, - # we can proceed to deploying the new cluster topology. If it does - # not, we need to establish connectivity with agents we connect to, or - # wait until all instances that connect to us have done so. Either triggers - # a notify_agents_ready event, upon which we then deploy the topology. + if ( Management::Controller::auto_assign_ports ) + config_assign_ports(config_copy); - # The current & new set of instance names. - local insts_current: set[string]; - local insts_new: set[string]; + g_configs[READY] = config_copy; - # A set of current instances not contained in the new config. - # Those will need to get dropped. - local insts_to_drop: set[string]; + # We return the ID of the new configuration in the response. + res$data = config$id; + req$results += res; - # The opposite: new instances not yet in our current set. Those we will need - # to establish contact with (or they with us). - local insts_to_add: set[string]; - - # The overlap: instances in both the current and new set. For those we verify - # that we're actually dealign with the same entities, and might need to re- - # connect if not. - local insts_to_keep: set[string]; - - # Alternative representation of insts_to_add, directly providing the instances. - local insts_to_peer: table[string] of Management::Instance; - - # Helpful locals. - local inst_name: string; - local inst: Management::Instance; - - for ( inst_name in g_instances ) - add insts_current[inst_name]; - for ( inst in config$instances ) - add insts_new[inst$name]; - - # Populate TODO lists for instances we need to drop, check, or add. - insts_to_drop = insts_current - insts_new; - insts_to_add = insts_new - insts_current; - insts_to_keep = insts_new & insts_current; - - for ( inst in config$instances ) - { - if ( inst$name in insts_to_add ) - { - insts_to_peer[inst$name] = inst; - next; - } - - # Focus on the keepers: check for change in identity/location. - if ( inst$name !in insts_to_keep ) - next; - - if ( is_instance_connectivity_change(inst) ) - { - # The endpoint looks different. We drop the current one - # and need to re-establish connectivity with the new - # one. - add insts_to_drop[inst$name]; - add insts_to_add[inst$name]; - } - } - - # Process our TODO lists. Handle drops first, then additions, in - # case we need to re-establish connectivity with an agent. - - for ( inst_name in insts_to_drop ) - { - Management::Log::debug(fmt("dropping instance %s", inst_name)); - drop_instance(g_instances[inst_name]); - } - for ( inst_name in insts_to_peer ) - { - Management::Log::debug(fmt("adding instance %s", inst_name)); - add_instance(insts_to_peer[inst_name]); - } - - # Updates to instance tables are complete. As a corner case, if the - # config contained no instances (and thus no nodes), we're now done - # since there are no agent interactions to wait for: - if ( |insts_new| == 0 ) - { - g_config_current = req$set_configuration_state$config; - g_config_reqid_pending = ""; - - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); - Management::Request::finish(req$id); - return; - } - - # Otherwise, check if we're able to send the config to all agents - # involved. If that's the case, this will trigger a - # Management::Controller::API::notify_agents_ready event that implements - # the distribution in the controller's own event handler, above. - check_instances_ready(); + Management::Log::info(fmt( + "tx Management::Controller::API::stage_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::stage_configuration_response, reqid, req$results); + Management::Request::finish(req$id); } -event Management::Controller::API::get_configuration_request(reqid: string) +event Management::Controller::API::get_configuration_request(reqid: string, deployed: bool) { Management::Log::info(fmt("rx Management::Controller::API::get_configuration_request %s", reqid)); local res = Management::Result($reqid=reqid); + local key = deployed ? DEPLOYED : STAGED; - if ( is_null_config(g_config_current) ) + if ( key !in g_configs ) { - # We don't have a live configuration yet. + # Cut off enum namespacing prefix and turn rest lower-case, for readability: + res$error = fmt("no %s configuration available", to_lower(sub(cat(key), /.+::/, ""))); res$success = F; - res$error = "no configuration deployed"; } else { - res$data = g_config_current; + res$data = g_configs[key]; } Management::Log::info(fmt( @@ -915,6 +950,48 @@ event Management::Controller::API::get_configuration_request(reqid: string) Management::Controller::API::get_configuration_response, reqid, res); } +event Management::Controller::API::deploy_request(reqid: string) + { + local send_error_response = function(req: Management::Request::Request, error: string) + { + local res = Management::Result($reqid=req$id, $success=F, $error=error); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::deploy_response, req$id, req$results); + }; + + Management::Log::info(fmt("rx Management::Controller::API::deploy_request %s", reqid)); + + local req = Management::Request::create(reqid); + + if ( READY !in g_configs ) + { + send_error_response(req, "no configuration available to deploy"); + Management::Request::finish(req$id); + return; + } + + # At the moment there can only be one pending deployment. + if ( g_config_reqid_pending != "" ) + { + send_error_response(req, + fmt("earlier deployment %s still pending", g_config_reqid_pending)); + Management::Request::finish(req$id); + return; + } + + req$deploy_state = DeployState($config = g_configs[READY]); + deploy(req); + + # Check if we're already able to send the config to all agents involved: + # If so, this triggers Management::Controller::API::notify_agents_ready, + # which we handle above by distributing the config. + check_instances_ready(); + } + event Management::Controller::API::get_instances_request(reqid: string) { Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid)); @@ -1111,8 +1188,8 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin # don't propagate them to the agents. if ( |nodes| > 0 ) { - # Requested nodes that are in the current configuration: - nodes_final = filter_config_nodes_by_name(nodes); + # Requested nodes that are in the deployed configuration: + nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes); # Requested nodes that are not in current configuration: local nodes_invalid = nodes - nodes_final; @@ -1171,16 +1248,21 @@ event Management::Request::request_expired(req: Management::Request::Request) $success = F, $error = "request timed out"); - if ( req?$set_configuration_state ) + Management::Log::info(fmt("request %s timed out", req$id)); + + if ( req?$deploy_state ) { # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; req$results += res; - Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", - Management::Request::to_string(req))); - Broker::publish(Management::Controller::topic, - Management::Controller::API::set_configuration_response, req$id, req$results); + if ( ! req$deploy_state$is_internal ) + { + Management::Log::info(fmt("tx Management::Controller::API::deploy_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::deploy_response, req$id, req$results); + } } if ( req?$get_nodes_state ) @@ -1240,18 +1322,32 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) Management::Log::debug(fmt("broker peer %s added: %s", peer, msg)); } +event Broker::peer_lost(peer: Broker::EndpointInfo, msg: string) + { + Management::Log::debug(fmt("broker peer %s lost: %s", peer, msg)); + + if ( peer$id in g_instances_by_id ) + { + local instance = g_instances_by_id[peer$id]; + + if ( instance in g_instances_known ) + delete g_instances_known[instance]; + if ( instance in g_instances_ready ) + delete g_instances_ready[instance]; + + Management::Log::info(fmt("dropped state for instance %s", instance)); + delete g_instances_by_id[peer$id]; + } + } + event zeek_init() { - # Initialize null config at startup. We will replace it once we have - # persistence, and again whenever we complete a client's - # set_configuration request. - g_config_current = null_config(); - - # The controller always listens -- it needs to be able to respond to the - # Zeek client. This port is also used by the agents if they connect to - # the client. The client doesn't automatically establish or accept - # connectivity to agents: agents are defined and communicated with as - # defined via configurations defined by the client. + # The controller always listens: it needs to be able to respond to + # clients connecting to it, as well as agents if they connect to the + # controller. The controller does not automatically connect to any + # agents; instances with listening agents are conveyed to the controller + # via configurations uploaded by a client, with connections established + # upon deployment. local cni = Management::Controller::network_info(); @@ -1261,4 +1357,15 @@ event zeek_init() Broker::subscribe(Management::Controller::topic); Management::Log::info(fmt("controller is live, Broker ID %s", Broker::node_id())); + + # If we have a persisted deployed configuration, we need to make sure + # it's actually running. The agents involved might be gone, running a + # different config, etc. We simply run a deployment: agents already + # running this configuration will do nothing. + if ( DEPLOYED in g_configs ) + { + local req = Management::Request::create(); + req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T); + deploy(req); + } } diff --git a/scripts/policy/frameworks/management/log.zeek b/scripts/policy/frameworks/management/log.zeek index a6f8d37571..95798421a7 100644 --- a/scripts/policy/frameworks/management/log.zeek +++ b/scripts/policy/frameworks/management/log.zeek @@ -38,7 +38,7 @@ export { ## The log level in use for this node. This is the minimum ## log level required to produce output. - global log_level = INFO &redef; + global level = INFO &redef; ## A debug-level log message writer. ## @@ -84,7 +84,7 @@ global r2s: table[Management::Role] of string = { function debug(message: string) { - if ( enum_to_int(log_level) > enum_to_int(DEBUG) ) + if ( enum_to_int(level) > enum_to_int(DEBUG) ) return; local node = Supervisor::node(); @@ -94,7 +94,7 @@ function debug(message: string) function info(message: string) { - if ( enum_to_int(log_level) > enum_to_int(INFO) ) + if ( enum_to_int(level) > enum_to_int(INFO) ) return; local node = Supervisor::node(); @@ -104,7 +104,7 @@ function info(message: string) function warning(message: string) { - if ( enum_to_int(log_level) > enum_to_int(WARNING) ) + if ( enum_to_int(level) > enum_to_int(WARNING) ) return; local node = Supervisor::node(); @@ -114,7 +114,7 @@ function warning(message: string) function error(message: string) { - if ( enum_to_int(log_level) > enum_to_int(ERROR) ) + if ( enum_to_int(level) > enum_to_int(ERROR) ) return; local node = Supervisor::node(); diff --git a/scripts/policy/frameworks/management/request.zeek b/scripts/policy/frameworks/management/request.zeek index 86de3ea0fd..f7a798ace6 100644 --- a/scripts/policy/frameworks/management/request.zeek +++ b/scripts/policy/frameworks/management/request.zeek @@ -32,6 +32,14 @@ export { finished: bool &default=F; }; + # To allow a callback to refer to Requests, the Request type must + # exist. So redef to add it: + redef record Request += { + ## A callback to invoke when this request is finished via + ## :zeek:see:`Management::Request::finish`. + finish: function(req: Management::Request::Request) &optional; + }; + ## The timeout interval for request state. Such state (see the ## :zeek:see:`Management::Request` module) ties together request and ## response event pairs. A timeout causes cleanup of request state if @@ -131,6 +139,9 @@ function finish(reqid: string): bool local req = g_requests[reqid]; delete g_requests[reqid]; + if ( req?$finish ) + req$finish(req); + req$finished = T; return T; @@ -146,20 +157,12 @@ function is_null(request: Request): bool function to_string(request: Request): string { - local results: string_vec; - local res: Management::Result; local parent_id = ""; if ( request?$parent_id ) parent_id = fmt(" (via %s)", request$parent_id); - for ( idx in request$results ) - { - res = request$results[idx]; - results[|results|] = Management::result_to_string(res); - } - return fmt("[request %s%s %s, results: %s]", request$id, parent_id, request$finished ? "finished" : "pending", - join_string_vec(results, ",")); + Management::result_vec_to_string(request$results)); } diff --git a/scripts/policy/frameworks/management/types.zeek b/scripts/policy/frameworks/management/types.zeek index c4076cd8f7..796c943754 100644 --- a/scripts/policy/frameworks/management/types.zeek +++ b/scripts/policy/frameworks/management/types.zeek @@ -92,22 +92,27 @@ export { type NodeStatusVec: vector of NodeStatus; - ## Return value for request-response API event pairs + ## Return value for request-response API event pairs. Some responses + ## contain one, others multiple of these. The request ID allows clients + ## to string requests and responses together. Agents and the controller + ## fill in the instance and node fields whenever there's sufficient + ## context to define them. Any result produced by an agent will carry an + ## instance value, for example. type Result: record { reqid: string; ##< Request ID of operation this result refers to - instance: string &default=""; ##< Name of associated instance (for context) success: bool &default=T; ##< True if successful + instance: string &optional; ##< Name of associated instance (for context) data: any &optional; ##< Addl data returned for successful operation - error: string &default=""; ##< Descriptive error on failure + error: string &optional; ##< Descriptive error on failure node: string &optional; ##< Name of associated node (for context) }; type ResultVec: vector of Result; - ## In :zeek:see:`Management::Controller::API::set_configuration_response`, - ## events, each :zeek:see:`Management::Result` indicates the outcome of a - ## requested cluster node. If a node does not launch properly (meaning - ## it doesn't check in with the agent on thee machine it's running on), + ## In :zeek:see:`Management::Controller::API::deploy_response` events, + ## each :zeek:see:`Management::Result` indicates the outcome of a + ## launched cluster node. If a node does not launch properly (meaning + ## it doesn't check in with the agent on the machine it's running on), ## the result will indicate failure, and its data field will be an ## instance of this record, capturing the stdout and stderr output of ## the failing node. @@ -119,6 +124,10 @@ export { ## Given a :zeek:see:`Management::Result` record, ## this function returns a string summarizing it. global result_to_string: function(res: Result): string; + + ## Given a vector of :zeek:see:`Management::Result` records, + ## this function returns a string summarizing them. + global result_vec_to_string: function(res: ResultVec): string; } function result_to_string(res: Result): string @@ -127,7 +136,7 @@ function result_to_string(res: Result): string if ( res$success ) result = "success"; - else if ( res$error != "" ) + else if ( res?$error ) result = fmt("error (%s)", res$error); else result = "error"; @@ -136,7 +145,7 @@ function result_to_string(res: Result): string if ( res$reqid != "" ) details[|details|] = fmt("reqid %s", res$reqid); - if ( res$instance != "" ) + if ( res?$instance ) details[|details|] = fmt("instance %s", res$instance); if ( res?$node && res$node != "" ) details[|details|] = fmt("node %s", res$node); @@ -146,3 +155,13 @@ function result_to_string(res: Result): string return result; } + +function result_vec_to_string(res: ResultVec): string + { + local ret: vector of string; + + for ( idx in res ) + ret += result_to_string(res[idx]);; + + return join_string_vec(ret, ", "); + } diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index c818a8cb3a..888db55f2c 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -6cb24ebe9ad0725b28c4e4d5afa3ec1f6d04eed5 +e01ffdcd799d3ca2851225994108d500c540fbe2