diff --git a/CHANGES b/CHANGES index 784f3cb6fc..be42243dda 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,17 @@ +5.1.0-dev.108 | 2022-06-23 12:26:26 -0700 + + * Management framework: support for cluster node restarts (Christian Kreibich, Corelight) + + - bump external cluster testsuite + - bump zeek-client + - edit pass over docstrings + - node restart support + - more consistent Supervisor interaction in the agent + - log the controller's startup deployment attempt + - bugfix for a get_id_value corner case + - minor timeout bugfix + - make "result" argument plural in multi-result response events + 5.1.0-dev.98 | 2022-06-22 22:39:32 -0700 * Management framework: separate config staging and deployment (Christian Kreibich, Corelight) diff --git a/VERSION b/VERSION index dd7091e91f..8d5cada9fd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.1.0-dev.98 +5.1.0-dev.108 diff --git a/auxil/zeek-client b/auxil/zeek-client index d92f17cfd8..c1f09a0846 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit d92f17cfd882dd1edbd3f560181d84f69fbc8037 +Subproject commit c1f09a084668cfc8d58d4be91e006d5a14386c0e diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 857f79c232..d1440e2068 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -33,15 +33,18 @@ export { global deploy_request: event(reqid: string, config: Management::Configuration, force: bool &default=F); - ## Response to a deploy_request event. The agent sends - ## this back to the controller. + ## Response to a :zeek:see:`Management::Agent::API::deploy_request` + ## event. The agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## - ## result: the result record. + ## results: A vector of :zeek:see:`Management::Result` records, each + ## capturing the outcome of a single launched node. For failing + ## nodes, the result's data field is a + ## :zeek:see:`Management::NodeOutputs` record. ## global deploy_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The controller sends this event to request a list of @@ -53,8 +56,8 @@ export { ## global get_nodes_request: event(reqid: string); - ## Response to a get_nodes_request event. The agent sends this back to the - ## controller. + ## Response to a :zeek:see:`Management::Agent::API::get_nodes_request` + ## event. The agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -86,31 +89,35 @@ export { global node_dispatch_request: event(reqid: string, action: vector of string, nodes: set[string] &default=set()); - ## Response to a node_dispatch_request event. Each agent sends this back - ## to the controller to report the dispatch outcomes on all nodes managed - ## by that agent. + ## Response to a + ## :zeek:see:`Management::Agent::API::node_dispatch_request` event. Each + ## agent sends this back to the controller to report the dispatch + ## outcomes on all nodes managed by that agent. ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one Zeek cluster node managed by this ## agent. Upon success, each :zeek:see:`Management::Result` record's ## data member contains the dispatches' response in a data type ## appropriate for the respective dispatch. ## - global node_dispatch_response: event(reqid: string, result: Management::ResultVec); + global node_dispatch_response: event(reqid: string, results: Management::ResultVec); ## The controller sends this event to confirm to the agent that it is - ## part of the current cluster topology. The agent acknowledges with the - ## corresponding response event. + ## part of the current cluster topology. The agent acknowledges with a + ## :zeek:see:`Management::Agent::API::agent_welcome_response` event, + ## upon which the controller may proceed with a cluster deployment to + ## this agent. ## ## reqid: a request identifier string, echoed in the response event. ## global agent_welcome_request: event(reqid: string); - ## Response to an agent_welcome_request event. The agent sends this - ## back to the controller. + ## Response to a + ## :zeek:see:`Management::Agent::API::agent_welcome_request` event. The + ## agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -132,8 +139,9 @@ export { ## global agent_standby_request: event(reqid: string); - ## Response to an agent_standby_request event. The agent sends this - ## back to the controller. + ## Response to a + ## :zeek:see:`Management::Agent::API::agent_standby_request` event. The + ## agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -143,6 +151,35 @@ export { result: Management::Result); + ## The controller sends this event to ask the agent to restart currently + ## running Zeek cluster nodes. For nodes currently running, the agent + ## places these nodes into PENDING state and sends restart events to the + ## Supervisor, rendering its responses into a list of + ## :zeek:see:`Management::Result` records summarizing each node restart. + ## When restarted nodes check in with the agent, they switch back to + ## RUNNING state. The agent ignores nodes not currently running. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all of the + ## agent's current cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Agent::API::restart_request` + ## event. The agent sends this back to the controller when the + ## Supervisor has restarted all nodes affected, or a timoeut occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, one + ## for each Supervisor transaction. Each such result identifies both + ## the instance and node. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + + # Notification events, agent -> controller ## The agent sends this event upon peering as a "check-in", informing @@ -164,7 +201,7 @@ export { connecting: bool, api_version: count); - # The following are not yet implemented. + # The following are not yet meaningfully implemented. # Report node state changes. global notify_change: event(instance: string, diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 3ec546f76e..4111828770 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -27,6 +27,8 @@ export { node: string &default=""; ## The result of a status request. status: Supervisor::Status &optional; + ## The result of a restart request. + restart_result: bool &optional; }; ## Request state for deploy requests. @@ -47,6 +49,13 @@ export { requests: set[string] &default=set(); }; + ## Request state for restart requests, tracking eceived responses. + type RestartState: record { + ## Request state for every node the agent asks the Supervisor + ## to restart. + requests: set[string] &default=set(); + }; + # When Management::Agent::archive_logs is T (the default) and the # logging configuration doesn't permanently prevent archival # (e.g. because log rotation isn't configured), the agent triggers this @@ -67,6 +76,7 @@ redef record Management::Request::Request += { supervisor_state_agent: SupervisorState &optional; deploy_state_agent: DeployState &optional; node_dispatch_state_agent: NodeDispatchState &optional; + restart_state_agent: RestartState &optional; }; # Tag our logs correctly @@ -86,6 +96,22 @@ global agent_topic: function(): string; # Returns the effective supervisor's address and port, to peer with global supervisor_network_info: function(): Broker::NetworkInfo; +# Wrapper for sending a SupervisorControl::status_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_status: function(node: string): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::create_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_create: function(nc: Supervisor::NodeConfig): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::destroy_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_destroy: function(node: string): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::restart_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_restart: function(node: string): Management::Request::Request; + # Finalizes a deploy_request transaction: cleans up remaining state # and sends response event. global send_deploy_response: function(req: Management::Request::Request); @@ -94,6 +120,10 @@ global send_deploy_response: function(req: Management::Request::Request); # a status response. global deploy_request_finish: function(req: Management::Request::Request); +# Callback completing a restart_request after the Supervisor has delivered +# a restart response. +global restart_request_finish: function(req: Management::Request::Request); + # Callback completing a get_nodes_request after the Supervisor has delivered # a status response. global get_nodes_request_finish: function(req: Management::Request::Request); @@ -231,6 +261,21 @@ event Management::Supervisor::API::notify_node_exit(node: string, outputs: Manag g_outputs[node] = outputs; } +event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) + { + Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid)); + + local req = Management::Request::lookup(reqid); + if ( Management::Request::is_null(req) ) + return; + if ( ! req?$supervisor_state_agent ) + return; + + req$supervisor_state_agent$status = result; + + Management::Request::finish(reqid); + } + event SupervisorControl::create_response(reqid: string, result: string) { Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result)); @@ -238,6 +283,8 @@ event SupervisorControl::create_response(reqid: string, result: string) local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; + if ( ! req?$supervisor_state_agent ) + return; local name = req$supervisor_state_agent$node; @@ -260,6 +307,8 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; + if ( ! req?$supervisor_state_agent ) + return; local name = req$supervisor_state_agent$node; @@ -275,7 +324,44 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) Management::Request::finish(reqid); } -function supervisor_create(nc: Supervisor::NodeConfig) +event SupervisorControl::restart_response(reqid: string, result: bool) + { + Management::Log::info(fmt("rx SupervisorControl::restart_response %s %s", reqid, result)); + + local req = Management::Request::lookup(reqid); + if ( Management::Request::is_null(req) ) + return; + if ( ! req?$supervisor_state_agent ) + return; + + local name = req$supervisor_state_agent$node; + req$supervisor_state_agent$restart_result = result; + + if ( ! result ) + { + local msg = fmt("failed to restart node %s", name); + Management::Log::error(msg); + Broker::publish(agent_topic(), + Management::Agent::API::notify_error, + Management::Agent::get_name(), msg, name); + } + + Management::Request::finish(reqid); + } + +function supervisor_status(node: string): Management::Request::Request + { + local req = Management::Request::create(); + req$supervisor_state_agent = SupervisorState($node = node); + + Management::Log::info(fmt("tx SupervisorControl::status_request %s %s", req$id, node)); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::status_request, req$id, node); + + return req; + } + +function supervisor_create(nc: Supervisor::NodeConfig): Management::Request::Request { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = nc$name); @@ -283,9 +369,11 @@ function supervisor_create(nc: Supervisor::NodeConfig) Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::create_request, req$id, nc); + + return req; } -function supervisor_destroy(node: string) +function supervisor_destroy(node: string): Management::Request::Request { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = node); @@ -293,6 +381,20 @@ function supervisor_destroy(node: string) Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request, req$id, node); + + return req; + } + +function supervisor_restart(node: string): Management::Request::Request + { + local req = Management::Request::create(); + req$supervisor_state_agent = SupervisorState($node = node); + + Management::Log::info(fmt("tx SupervisorControl::restart_request %s %s", req$id, node)); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::restart_request, req$id, node); + + return req; } event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool) @@ -332,14 +434,9 @@ event Management::Agent::API::deploy_request(reqid: string, config: Management:: for ( inst in config$instances ) g_instances[inst$name] = inst; - local areq = Management::Request::create(); - areq$parent_id = req$id; - areq$finish = deploy_request_finish; - areq$supervisor_state_agent = SupervisorState(); - - Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id)); - Broker::publish(SupervisorControl::topic_prefix, - SupervisorControl::status_request, areq$id, ""); + local sreq = supervisor_status(""); + sreq$parent_id = reqid; + sreq$finish = deploy_request_finish; } function deploy_request_finish(areq: Management::Request::Request) @@ -475,34 +572,15 @@ function deploy_request_finish(areq: Management::Request::Request) # the deploy_response event back to the controller. } -event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) - { - Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid)); - local req = Management::Request::lookup(reqid); - - if ( Management::Request::is_null(req) ) - return; - if ( ! req?$supervisor_state_agent ) - return; - - req$supervisor_state_agent$status = result; - Management::Request::finish(reqid); - } - event Management::Agent::API::get_nodes_request(reqid: string) { Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid)); local req = Management::Request::create(reqid); - local areq = Management::Request::create(); - areq$parent_id = req$id; - areq$finish = get_nodes_request_finish; - areq$supervisor_state_agent = SupervisorState(); - - Broker::publish(SupervisorControl::topic_prefix, - SupervisorControl::status_request, areq$id, ""); - Management::Log::info(fmt("issued supervisor status, %s", areq$id)); + local sreq = supervisor_status(""); + sreq$parent_id = reqid; + sreq$finish = get_nodes_request_finish; } function get_nodes_request_finish(areq: Management::Request::Request) @@ -794,6 +872,124 @@ event Management::Agent::API::agent_standby_request(reqid: string) Management::Agent::API::agent_standby_response, reqid, res); } +function restart_request_finish(sreq: Management::Request::Request) + { + # This is the finish callback we set on requests to the Supervisor to + # restart a node. We look up the parent request (the one sent to us by + # the controller), mark the node in question as done, and respond to the + # controller if we've handled all required nodes. + + local req = Management::Request::lookup(sreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + local node = sreq$supervisor_state_agent$node; + + local res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name(), + $node = node); + + if ( ! sreq$supervisor_state_agent$restart_result ) + { + res$success = F; + res$error = fmt("could not restart node %s", node); + } + + req$results += res; + + if ( node in req$restart_state_agent$requests ) + { + delete req$restart_state_agent$requests[node]; + if ( |req$restart_state_agent$requests| > 0 ) + return; + } + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::restart_request(reqid: string, nodes: set[string]) + { + # This is very similar to node_dispatch_request, because it too works + # with a list of nodes that needs to be dispatched to agents. + + Management::Log::info(fmt("rx Management::Agent::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local node: string; + local cluster_nodes: set[string]; + local nodes_final: set[string]; + + for ( node in g_nodes ) + add cluster_nodes[node]; + + # If this request includes cluster nodes to query, check if this agent + # manages any of those nodes. If it doesn't, respond with an empty + # results vector immediately. Note that any globally unknown nodes + # that the client might have requested already got filtered by the + # controller, so we don't need to worry about them here. + + if ( |nodes| > 0 ) + { + nodes_final = nodes & cluster_nodes; + + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no node overlap", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + } + else if ( |g_nodes| == 0 ) + { + # Special case: the client did not request specific nodes. If + # we aren't running any nodes, respond right away, since there's + # nothing to restart. + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no nodes registered", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + else + { + # We restart all nodes. + nodes_final = cluster_nodes; + } + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$restart_state_agent = RestartState(); + + # Build up state for tracking responses. + for ( node in nodes_final ) + add req$restart_state_agent$requests[node]; + + # Ask the Supervisor to restart nodes. We need to enumerate the nodes + # because restarting all (via "") would hit the agent (and the + # controller, if co-located). + for ( node in nodes_final ) + { + local sreq = supervisor_restart(node); + sreq$parent_id = reqid; + sreq$finish = restart_request_finish; + + if ( node in g_nodes ) + g_nodes[node]$state = Management::PENDING; + } + } + event Management::Node::API::notify_node_hello(node: string) { Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); @@ -821,17 +1017,29 @@ event Management::Node::API::notify_node_hello(node: string) event Management::Request::request_expired(req: Management::Request::Request) { + Management::Log::info(fmt("request %s timed out", req$id)); + local res = Management::Result($reqid=req$id, $instance = Management::Agent::get_name(), $success = F, $error = "request timed out"); + req$results += res; + if ( req?$deploy_state_agent ) { send_deploy_response(req); # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; } + + if ( req?$restart_state_agent ) + { + Management::Log::info(fmt("tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, req$id, req$results); + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 9b5de9087c..d87b6a3a61 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -20,13 +20,14 @@ export { ## global get_instances_request: event(reqid: string); - ## Response to a get_instances_request event. The controller sends - ## this back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_instances_request` + ## event. The controller sends this back to the client. ## ## reqid: the request identifier used in the request event. ## - ## result: the result record. Its data member is a - ## :zeek:see:`Management::Instance` record. + ## result: a :zeek:see:`Management::Result`. Its data member is a vector + ## of :zeek:see:`Management::Instance` records. ## global get_instances_response: event(reqid: string, result: Management::Result); @@ -47,18 +48,20 @@ export { global stage_configuration_request: event(reqid: string, config: Management::Configuration); - ## Response to a stage_configuration_request event. The controller sends - ## this back to the client, conveying validation results. + ## Response to a + ## :zeek:see:`Management::Controller::API::stage_configuration_request` + ## event. The controller sends this back to the client, conveying + ## validation results. ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:see:`Management::Result` vector, indicating whether + ## results: a :zeek:see:`Management::Result` vector, indicating whether ## the controller accepts the configuration. In case of a success, ## a single result record indicates so. Otherwise, the sequence is ## all errors, each indicating a configuration validation error. ## global stage_configuration_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to retrieve the controller's current @@ -71,8 +74,10 @@ export { ## global get_configuration_request: event(reqid: string, deployed: bool); - ## Response to a get_configuration_request event. The controller sends - ## this back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_configuration_request` + ## event. The controller sends this back to the client, with the + ## requested configuration. ## ## reqid: the request identifier used in the request event. ## @@ -100,18 +105,19 @@ export { ## global deploy_request: event(reqid: string); - ## Response to a deploy_request event. The controller sends this - ## back to the client. + ## Response to a :zeek:see:`Management::Controller::API::deploy_request` + ## event. The controller sends this back to the client, conveying the + ## outcome of the deployment. ## ## reqid: the request identifier used in the request event. ## - ## result: a vector of :zeek:see:`Management::Result` records. + ## results: a vector of :zeek:see:`Management::Result` records. ## Each member captures the result of launching one cluster ## node captured in the configuration, or an agent-wide error ## when the result does not indicate a particular node. ## global deploy_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to request a list of @@ -123,19 +129,23 @@ export { ## global get_nodes_request: event(reqid: string); - ## Response to a get_nodes_request event. The controller sends this - ## back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_nodes_request` event. The + ## controller sends this back to the client, with a description of the + ## nodes currently managed by the Supervisors on all connected + ## instances. This includes agents and possibly the controller, if it + ## runs jointly with an agent. ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` - ## records. Each record covers one cluster instance. Each record's data - ## member is a vector of :zeek:see:`Management::NodeStatus` - ## records, covering the nodes at that instance. Results may also indicate - ## failure, with error messages indicating what went wrong. + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one cluster instance. Each record's + ## data member is a vector of :zeek:see:`Management::NodeStatus` + ## records, covering the nodes at that instance. Results may also + ## indicate failure, with error messages indicating what went wrong. ## global get_nodes_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to retrieve the current value of a @@ -156,20 +166,52 @@ export { global get_id_value_request: event(reqid: string, id: string, nodes: set[string] &default=set()); - ## Response to a get_id_value_request event. The controller sends this - ## back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_id_value_request` + ## event. The controller sends this back to the client, with a JSON + ## representation of the requested global ID on all relevant instances. ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one Zeek cluster node. Each record's ## data field contains a string with the JSON rendering (as produced ## by :zeek:id:`to_json`, including the error strings it potentially ## returns). ## - global get_id_value_response: event(reqid: string, result: Management::ResultVec); + global get_id_value_response: event(reqid: string, results: Management::ResultVec); + ## The client sends this event to restart currently running Zeek cluster + ## nodes. The controller relays the request to its agents, which respond + ## with a list of :zeek:see:`Management::Result` records summarizing + ## each node restart. The controller combines these lists, and sends a + ## :zeek:see:`Management::Controller::API::restart_response` event with + ## the result. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all current + ## cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Controller::API::restart_request` + ## event. The controller sends this back to the client when it has received + ## responses from all agents involved, or a timeout occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, + ## combining the restart results from all agents. Each such result + ## identifies both the instance and node in question. Results that + ## do not identify an instance are generated by the controller, + ## flagging corner cases, including absence of a deployed cluster + ## or unknown nodes. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. @@ -186,8 +228,10 @@ export { ## global test_timeout_request: event(reqid: string, with_state: bool); - ## Response to a test_timeout_request event. The controller sends this - ## back to the client if the original request had the with_state flag. + ## Response to a + ## :zeek:see:`Management::Controller::API::test_timeout_request` + ## event. The controller sends this back to the client if the original + ## request had the with_state flag. ## ## reqid: the request identifier used in the request event. ## diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index a5c1840b18..c88e3127d0 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -70,6 +70,14 @@ export { requests: set[string] &default=set(); }; + ## Request state specific to + ## :zeek:see:`Management::Controller::API::restart_request` and + ## :zeek:see:`Management::Controller::API::restart_response`. + type RestartState: record { + ## Request state for every controller/agent transaction. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -78,6 +86,7 @@ redef record Management::Request::Request += { deploy_state: DeployState &optional; get_nodes_state: GetNodesState &optional; node_dispatch_state: NodeDispatchState &optional; + restart_state: RestartState &optional; test_state: TestState &optional; }; @@ -1166,7 +1175,7 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin local res: Management::Result; # Special case: if we have no instances, respond right away. - if ( |g_instances| == 0 ) + if ( |g_instances_known| == 0 ) { Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); res = Management::Result($reqid=reqid, $success=F, $error="no instances connected"); @@ -1238,6 +1247,141 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin } } +event Management::Agent::API::restart_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::restart_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + req$results += results[i]; + + # Mark this request as done: + if ( areq$id in req$restart_state$requests ) + delete req$restart_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$restart_state$requests| > 0 ) + return; + + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Controller::API::restart_request(reqid: string, nodes: set[string]) + { + # This works almost exactly like get_id_value_request, because it too + # works with a list of nodes that needs to be dispatched to agents. + + local send_error_response = function(req: Management::Request::Request, error: string) + { + local res = Management::Result($reqid=req$id, $success=F, $error=error); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, req$id, req$results); + }; + + Management::Log::info(fmt("rx Management::Controller::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local res: Management::Result; + local req = Management::Request::create(reqid); + req$restart_state = RestartState(); + + # Special cases: if we have no instances or no deployment, respond right away. + if ( |g_instances_known| == 0 ) + { + send_error_response(req, "no instances connected"); + Management::Request::finish(reqid); + return; + } + + if ( DEPLOYED !in g_configs ) + { + send_error_response(req, "no active cluster deployment"); + Management::Request::finish(reqid); + return; + } + + local nodes_final: set[string]; + local node: string; + + # Input sanitization: check for any requested nodes that aren't part of + # the deployed configuration. We send back error results for those and + # don't propagate them to the agents. + if ( |nodes| > 0 ) + { + # Requested nodes that are in the deployed configuration: + nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes); + # Requested nodes that are not in current configuration: + local nodes_invalid = nodes - nodes_final; + + # Assemble error results for all invalid nodes + for ( node in nodes_invalid ) + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = "unknown cluster node"; + req$results += res; + } + + # If only invalid nodes got requested, we're now done. + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + return; + } + } + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = Management::Agent::topic_prefix + "/" + name; + local areq = Management::Request::create(); + + areq$parent_id = req$id; + add req$restart_state$requests[areq$id]; + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_request %s to %s", + areq$id, name)); + + Broker::publish(agent_topic, + Management::Agent::API::restart_request, + areq$id, nodes); + } + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -1296,6 +1440,17 @@ event Management::Request::request_expired(req: Management::Request::Request) } } + if ( req?$restart_state ) + { + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + } + if ( req?$test_state ) { Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); @@ -1366,6 +1521,8 @@ event zeek_init() { local req = Management::Request::create(); req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T); + Management::Log::info(fmt("deploying persisted configuration %s, request %s", + g_configs[DEPLOYED]$id, req$id)); deploy(req); } } diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 888db55f2c..93029cac41 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -e01ffdcd799d3ca2851225994108d500c540fbe2 +566b3325abec336c7540feac22a9d543f2629b82