From b2f9e29bae43c54ae3b9e92c206e52b14e188bcd Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 16:23:42 -0700 Subject: [PATCH 1/9] Management framework: make "result" argument plural in multi-result response events No functional change, just a consistency tweak. Since agent and controller send response events via Broker::publish(), the arguments aren't named and so this only affects the API definition. --- .../frameworks/management/agent/api.zeek | 11 +++++---- .../frameworks/management/controller/api.zeek | 24 +++++++++---------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 857f79c232..880a075194 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -38,10 +38,13 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: the result record. + ## results: A vector of :zeek:see:`Management::Result` records, each + ## capturing the outcome of a single launched node. For failing + ## nodes, the result's data field is a + ## :zeek:see:`Management::NodeOutputs` record. ## global deploy_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The controller sends this event to request a list of @@ -92,13 +95,13 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one Zeek cluster node managed by this ## agent. Upon success, each :zeek:see:`Management::Result` record's ## data member contains the dispatches' response in a data type ## appropriate for the respective dispatch. ## - global node_dispatch_response: event(reqid: string, result: Management::ResultVec); + global node_dispatch_response: event(reqid: string, results: Management::ResultVec); ## The controller sends this event to confirm to the agent that it is diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 9b5de9087c..bb6a2dec30 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -52,13 +52,13 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:see:`Management::Result` vector, indicating whether + ## results: a :zeek:see:`Management::Result` vector, indicating whether ## the controller accepts the configuration. In case of a success, ## a single result record indicates so. Otherwise, the sequence is ## all errors, each indicating a configuration validation error. ## global stage_configuration_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to retrieve the controller's current @@ -105,13 +105,13 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a vector of :zeek:see:`Management::Result` records. + ## results: a vector of :zeek:see:`Management::Result` records. ## Each member captures the result of launching one cluster ## node captured in the configuration, or an agent-wide error ## when the result does not indicate a particular node. ## global deploy_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to request a list of @@ -128,14 +128,14 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` - ## records. Each record covers one cluster instance. Each record's data - ## member is a vector of :zeek:see:`Management::NodeStatus` - ## records, covering the nodes at that instance. Results may also indicate - ## failure, with error messages indicating what went wrong. + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one cluster instance. Each record's + ## data member is a vector of :zeek:see:`Management::NodeStatus` + ## records, covering the nodes at that instance. Results may also + ## indicate failure, with error messages indicating what went wrong. ## global get_nodes_response: event(reqid: string, - result: Management::ResultVec); + results: Management::ResultVec); ## The client sends this event to retrieve the current value of a @@ -161,13 +161,13 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one Zeek cluster node. Each record's ## data field contains a string with the JSON rendering (as produced ## by :zeek:id:`to_json`, including the error strings it potentially ## returns). ## - global get_id_value_response: event(reqid: string, result: Management::ResultVec); + global get_id_value_response: event(reqid: string, results: Management::ResultVec); # Testing events. These don't provide operational value but expose From 1af9bba76e0bda9c8c3379c8e0631e73a66b3747 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 16:26:53 -0700 Subject: [PATCH 2/9] Management framework: minor timeout bugfix The timeout result wasn't actually stored in requests timing out in the agent. (So far that's for deployment requests.) Also log the timing out of any request state, similar to the controller. --- scripts/policy/frameworks/management/agent/main.zeek | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 3ec546f76e..e5d777e8c4 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -821,11 +821,15 @@ event Management::Node::API::notify_node_hello(node: string) event Management::Request::request_expired(req: Management::Request::Request) { + Management::Log::info(fmt("request %s timed out", req$id)); + local res = Management::Result($reqid=req$id, $instance = Management::Agent::get_name(), $success = F, $error = "request timed out"); + req$results += res; + if ( req?$deploy_state_agent ) { send_deploy_response(req); From 05447c413f3dcd634611703b410b87bfbe2b2638 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 16:31:38 -0700 Subject: [PATCH 3/9] Management framework: bugfix for a get_id_value corner case For the case of a running cluster with no connected agents, use the g_instances_known table instead of g_instances. The latter reflects the contents of the last deployed config, not the live scenario of actually attached agents. --- scripts/policy/frameworks/management/controller/main.zeek | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index a5c1840b18..2696ecc929 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -1166,7 +1166,7 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin local res: Management::Result; # Special case: if we have no instances, respond right away. - if ( |g_instances| == 0 ) + if ( |g_instances_known| == 0 ) { Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); res = Management::Result($reqid=reqid, $success=F, $error="no instances connected"); From d994f33636ea279611db49e0deb88093b818a7d1 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 16:32:52 -0700 Subject: [PATCH 4/9] Management framework: log the controller's startup deployment attempt The controller now logs its deployment attempt of a persisted configuration at startup. This is generally helpful to see recorded, and also explains timeout of the underlying request in case of failure (which triggers a timeout message). --- scripts/policy/frameworks/management/controller/main.zeek | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 2696ecc929..d2063e3496 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -1366,6 +1366,8 @@ event zeek_init() { local req = Management::Request::create(); req$deploy_state = DeployState($config=g_configs[DEPLOYED], $is_internal=T); + Management::Log::info(fmt("deploying persisted configuration %s, request %s", + g_configs[DEPLOYED]$id, req$id)); deploy(req); } } From bd39207772bfe64b8219660910da6d2abbfce149 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 16:39:45 -0700 Subject: [PATCH 5/9] Management framework: more consistent Supervisor interaction in the agent This declares our helper functions for sending events to the Supervisor, and makes them return the created request objects to enable the caller to modify them. It also adds a helper for restart and status requests, uses the helpers throughout the module, and makes all handlers more resilient in case Supervisor events other than the agent's arrive. --- .../frameworks/management/agent/main.zeek | 128 +++++++++++++----- 1 file changed, 96 insertions(+), 32 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index e5d777e8c4..63ca551c20 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -86,6 +86,22 @@ global agent_topic: function(): string; # Returns the effective supervisor's address and port, to peer with global supervisor_network_info: function(): Broker::NetworkInfo; +# Wrapper for sending a SupervisorControl::status_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_status: function(node: string): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::create_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_create: function(nc: Supervisor::NodeConfig): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::destroy_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_destroy: function(node: string): Management::Request::Request; + +# Wrapper for sending a SupervisorControl::restart_request to the Supervisor. +# Establishes a request object for the transaction, and returns it. +global supervisor_restart: function(node: string): Management::Request::Request; + # Finalizes a deploy_request transaction: cleans up remaining state # and sends response event. global send_deploy_response: function(req: Management::Request::Request); @@ -231,6 +247,21 @@ event Management::Supervisor::API::notify_node_exit(node: string, outputs: Manag g_outputs[node] = outputs; } +event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) + { + Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid)); + + local req = Management::Request::lookup(reqid); + if ( Management::Request::is_null(req) ) + return; + if ( ! req?$supervisor_state_agent ) + return; + + req$supervisor_state_agent$status = result; + + Management::Request::finish(reqid); + } + event SupervisorControl::create_response(reqid: string, result: string) { Management::Log::info(fmt("rx SupervisorControl::create_response %s %s", reqid, result)); @@ -238,6 +269,8 @@ event SupervisorControl::create_response(reqid: string, result: string) local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; + if ( ! req?$supervisor_state_agent ) + return; local name = req$supervisor_state_agent$node; @@ -260,6 +293,8 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; + if ( ! req?$supervisor_state_agent ) + return; local name = req$supervisor_state_agent$node; @@ -275,7 +310,44 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) Management::Request::finish(reqid); } -function supervisor_create(nc: Supervisor::NodeConfig) +event SupervisorControl::restart_response(reqid: string, result: bool) + { + Management::Log::info(fmt("rx SupervisorControl::restart_response %s %s", reqid, result)); + + local req = Management::Request::lookup(reqid); + if ( Management::Request::is_null(req) ) + return; + if ( ! req?$supervisor_state_agent ) + return; + + local name = req$supervisor_state_agent$node; + req$supervisor_state_agent$restart_result = result; + + if ( ! result ) + { + local msg = fmt("failed to restart node %s", name); + Management::Log::error(msg); + Broker::publish(agent_topic(), + Management::Agent::API::notify_error, + Management::Agent::get_name(), msg, name); + } + + Management::Request::finish(reqid); + } + +function supervisor_status(node: string): Management::Request::Request + { + local req = Management::Request::create(); + req$supervisor_state_agent = SupervisorState($node = node); + + Management::Log::info(fmt("tx SupervisorControl::status_request %s %s", req$id, node)); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::status_request, req$id, node); + + return req; + } + +function supervisor_create(nc: Supervisor::NodeConfig): Management::Request::Request { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = nc$name); @@ -283,9 +355,11 @@ function supervisor_create(nc: Supervisor::NodeConfig) Management::Log::info(fmt("tx SupervisorControl::create_request %s %s", req$id, nc$name)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::create_request, req$id, nc); + + return req; } -function supervisor_destroy(node: string) +function supervisor_destroy(node: string): Management::Request::Request { local req = Management::Request::create(); req$supervisor_state_agent = SupervisorState($node = node); @@ -293,6 +367,20 @@ function supervisor_destroy(node: string) Management::Log::info(fmt("tx SupervisorControl::destroy_request %s %s", req$id, node)); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request, req$id, node); + + return req; + } + +function supervisor_restart(node: string): Management::Request::Request + { + local req = Management::Request::create(); + req$supervisor_state_agent = SupervisorState($node = node); + + Management::Log::info(fmt("tx SupervisorControl::restart_request %s %s", req$id, node)); + Broker::publish(SupervisorControl::topic_prefix, + SupervisorControl::restart_request, req$id, node); + + return req; } event Management::Agent::API::deploy_request(reqid: string, config: Management::Configuration, force: bool) @@ -332,14 +420,9 @@ event Management::Agent::API::deploy_request(reqid: string, config: Management:: for ( inst in config$instances ) g_instances[inst$name] = inst; - local areq = Management::Request::create(); - areq$parent_id = req$id; - areq$finish = deploy_request_finish; - areq$supervisor_state_agent = SupervisorState(); - - Management::Log::info(fmt("tx SupervisorControl::status_request %s, all nodes", areq$id)); - Broker::publish(SupervisorControl::topic_prefix, - SupervisorControl::status_request, areq$id, ""); + local sreq = supervisor_status(""); + sreq$parent_id = reqid; + sreq$finish = deploy_request_finish; } function deploy_request_finish(areq: Management::Request::Request) @@ -475,34 +558,15 @@ function deploy_request_finish(areq: Management::Request::Request) # the deploy_response event back to the controller. } -event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) - { - Management::Log::info(fmt("rx SupervisorControl::status_response %s", reqid)); - local req = Management::Request::lookup(reqid); - - if ( Management::Request::is_null(req) ) - return; - if ( ! req?$supervisor_state_agent ) - return; - - req$supervisor_state_agent$status = result; - Management::Request::finish(reqid); - } - event Management::Agent::API::get_nodes_request(reqid: string) { Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid)); local req = Management::Request::create(reqid); - local areq = Management::Request::create(); - areq$parent_id = req$id; - areq$finish = get_nodes_request_finish; - areq$supervisor_state_agent = SupervisorState(); - - Broker::publish(SupervisorControl::topic_prefix, - SupervisorControl::status_request, areq$id, ""); - Management::Log::info(fmt("issued supervisor status, %s", areq$id)); + local sreq = supervisor_status(""); + sreq$parent_id = reqid; + sreq$finish = get_nodes_request_finish; } function get_nodes_request_finish(areq: Management::Request::Request) From b9879a50a07a0544c93846e3f1a4c110ffa15949 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 17:11:58 -0700 Subject: [PATCH 6/9] Management framework: node restart support This adds restart request/response event pairs that restart nodes in the running Zeek cluster. The implementation is very similar to get_id_value, which also involves distributing a list of nodes to agents and aggregating the responses. --- .../frameworks/management/agent/api.zeek | 29 ++++ .../frameworks/management/agent/main.zeek | 140 ++++++++++++++++ .../frameworks/management/controller/api.zeek | 30 ++++ .../management/controller/main.zeek | 155 ++++++++++++++++++ 4 files changed, 354 insertions(+) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 880a075194..fab289a8e5 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -146,6 +146,35 @@ export { result: Management::Result); + ## The controller sends this event to ask the agent to restart currently + ## running Zeek cluster nodes. For nodes currently running, the agent + ## places these nodes into PENDING state and sends restart events to the + ## Supervisor, rendering its responses into a list of + ## :zeek:see:`Management::Result` records summarizing each node restart. + ## When restarted nodes check in with the agent, they switch back to + ## RUNNING state. The agent ignores nodes not currently running. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all of the + ## agent's current cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Agent::API::restart_request` + ## event. The agent sends this back to the controller when the + ## Supervisor has restarted all nodes affected, or a timoeut occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, one + ## for each Supervisor transaction. Each such result identifies both + ## the instance and node. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + + # Notification events, agent -> controller ## The agent sends this event upon peering as a "check-in", informing diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 63ca551c20..4111828770 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -27,6 +27,8 @@ export { node: string &default=""; ## The result of a status request. status: Supervisor::Status &optional; + ## The result of a restart request. + restart_result: bool &optional; }; ## Request state for deploy requests. @@ -47,6 +49,13 @@ export { requests: set[string] &default=set(); }; + ## Request state for restart requests, tracking eceived responses. + type RestartState: record { + ## Request state for every node the agent asks the Supervisor + ## to restart. + requests: set[string] &default=set(); + }; + # When Management::Agent::archive_logs is T (the default) and the # logging configuration doesn't permanently prevent archival # (e.g. because log rotation isn't configured), the agent triggers this @@ -67,6 +76,7 @@ redef record Management::Request::Request += { supervisor_state_agent: SupervisorState &optional; deploy_state_agent: DeployState &optional; node_dispatch_state_agent: NodeDispatchState &optional; + restart_state_agent: RestartState &optional; }; # Tag our logs correctly @@ -110,6 +120,10 @@ global send_deploy_response: function(req: Management::Request::Request); # a status response. global deploy_request_finish: function(req: Management::Request::Request); +# Callback completing a restart_request after the Supervisor has delivered +# a restart response. +global restart_request_finish: function(req: Management::Request::Request); + # Callback completing a get_nodes_request after the Supervisor has delivered # a status response. global get_nodes_request_finish: function(req: Management::Request::Request); @@ -858,6 +872,124 @@ event Management::Agent::API::agent_standby_request(reqid: string) Management::Agent::API::agent_standby_response, reqid, res); } +function restart_request_finish(sreq: Management::Request::Request) + { + # This is the finish callback we set on requests to the Supervisor to + # restart a node. We look up the parent request (the one sent to us by + # the controller), mark the node in question as done, and respond to the + # controller if we've handled all required nodes. + + local req = Management::Request::lookup(sreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + local node = sreq$supervisor_state_agent$node; + + local res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name(), + $node = node); + + if ( ! sreq$supervisor_state_agent$restart_result ) + { + res$success = F; + res$error = fmt("could not restart node %s", node); + } + + req$results += res; + + if ( node in req$restart_state_agent$requests ) + { + delete req$restart_state_agent$requests[node]; + if ( |req$restart_state_agent$requests| > 0 ) + return; + } + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::restart_request(reqid: string, nodes: set[string]) + { + # This is very similar to node_dispatch_request, because it too works + # with a list of nodes that needs to be dispatched to agents. + + Management::Log::info(fmt("rx Management::Agent::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local node: string; + local cluster_nodes: set[string]; + local nodes_final: set[string]; + + for ( node in g_nodes ) + add cluster_nodes[node]; + + # If this request includes cluster nodes to query, check if this agent + # manages any of those nodes. If it doesn't, respond with an empty + # results vector immediately. Note that any globally unknown nodes + # that the client might have requested already got filtered by the + # controller, so we don't need to worry about them here. + + if ( |nodes| > 0 ) + { + nodes_final = nodes & cluster_nodes; + + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no node overlap", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + } + else if ( |g_nodes| == 0 ) + { + # Special case: the client did not request specific nodes. If + # we aren't running any nodes, respond right away, since there's + # nothing to restart. + Management::Log::info(fmt( + "tx Management::Agent::API::restart_response %s, no nodes registered", + reqid)); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, reqid, vector()); + return; + } + else + { + # We restart all nodes. + nodes_final = cluster_nodes; + } + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$restart_state_agent = RestartState(); + + # Build up state for tracking responses. + for ( node in nodes_final ) + add req$restart_state_agent$requests[node]; + + # Ask the Supervisor to restart nodes. We need to enumerate the nodes + # because restarting all (via "") would hit the agent (and the + # controller, if co-located). + for ( node in nodes_final ) + { + local sreq = supervisor_restart(node); + sreq$parent_id = reqid; + sreq$finish = restart_request_finish; + + if ( node in g_nodes ) + g_nodes[node]$state = Management::PENDING; + } + } + event Management::Node::API::notify_node_hello(node: string) { Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); @@ -900,6 +1032,14 @@ event Management::Request::request_expired(req: Management::Request::Request) # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; } + + if ( req?$restart_state_agent ) + { + Management::Log::info(fmt("tx Management::Agent::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(agent_topic(), + Management::Agent::API::restart_response, req$id, req$results); + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index bb6a2dec30..d97773b75d 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -170,6 +170,36 @@ export { global get_id_value_response: event(reqid: string, results: Management::ResultVec); + ## The client sends this event to restart currently running Zeek cluster + ## nodes. The controller relays the request to its agents, which respond + ## with a list of :zeek:see:`Management::Result` records summarizing + ## each node restart. The controller combines these lists, and sends a + ## :zeek:see:`Management::Controller::API::restart_response` event with + ## the result. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to restart. An + ## empty set, supplied by default, means restart of all current + ## cluster nodes. + ## + global restart_request: event(reqid: string, nodes: set[string] &default=set()); + + ## Response to a :zeek:see:`Management::Controller::API::restart_request` + ## event. The controller sends this back to the client when it has received + ## responses from all agents involved, or a timeout occurs. + ## + ## reqid: the request identifier used in the request event. + ## + ## results: a :zeek:type:`vector` of :zeek:see:`Management::Result`, + ## combining the restart results from all agents. Each such result + ## identifies both the instance and node in question. Results that + ## do not identify an instance are generated by the controller, + ## flagging corner cases, including absence of a deployed cluster + ## or unknown nodes. + ## + global restart_response: event(reqid: string, results: Management::ResultVec); + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index d2063e3496..c88e3127d0 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -70,6 +70,14 @@ export { requests: set[string] &default=set(); }; + ## Request state specific to + ## :zeek:see:`Management::Controller::API::restart_request` and + ## :zeek:see:`Management::Controller::API::restart_response`. + type RestartState: record { + ## Request state for every controller/agent transaction. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -78,6 +86,7 @@ redef record Management::Request::Request += { deploy_state: DeployState &optional; get_nodes_state: GetNodesState &optional; node_dispatch_state: NodeDispatchState &optional; + restart_state: RestartState &optional; test_state: TestState &optional; }; @@ -1238,6 +1247,141 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin } } +event Management::Agent::API::restart_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::restart_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + req$results += results[i]; + + # Mark this request as done: + if ( areq$id in req$restart_state$requests ) + delete req$restart_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$restart_state$requests| > 0 ) + return; + + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Controller::API::restart_request(reqid: string, nodes: set[string]) + { + # This works almost exactly like get_id_value_request, because it too + # works with a list of nodes that needs to be dispatched to agents. + + local send_error_response = function(req: Management::Request::Request, error: string) + { + local res = Management::Result($reqid=req$id, $success=F, $error=error); + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, req$id, req$results); + }; + + Management::Log::info(fmt("rx Management::Controller::API::restart_request %s %s", + reqid, Management::Util::set_to_vector(nodes))); + + local res: Management::Result; + local req = Management::Request::create(reqid); + req$restart_state = RestartState(); + + # Special cases: if we have no instances or no deployment, respond right away. + if ( |g_instances_known| == 0 ) + { + send_error_response(req, "no instances connected"); + Management::Request::finish(reqid); + return; + } + + if ( DEPLOYED !in g_configs ) + { + send_error_response(req, "no active cluster deployment"); + Management::Request::finish(reqid); + return; + } + + local nodes_final: set[string]; + local node: string; + + # Input sanitization: check for any requested nodes that aren't part of + # the deployed configuration. We send back error results for those and + # don't propagate them to the agents. + if ( |nodes| > 0 ) + { + # Requested nodes that are in the deployed configuration: + nodes_final = config_filter_nodes_by_name(g_configs[DEPLOYED], nodes); + # Requested nodes that are not in current configuration: + local nodes_invalid = nodes - nodes_final; + + # Assemble error results for all invalid nodes + for ( node in nodes_invalid ) + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = "unknown cluster node"; + req$results += res; + } + + # If only invalid nodes got requested, we're now done. + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + Management::Request::finish(req$id); + return; + } + } + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = Management::Agent::topic_prefix + "/" + name; + local areq = Management::Request::create(); + + areq$parent_id = req$id; + add req$restart_state$requests[areq$id]; + + Management::Log::info(fmt( + "tx Management::Agent::API::restart_request %s to %s", + areq$id, name)); + + Broker::publish(agent_topic, + Management::Agent::API::restart_request, + areq$id, nodes); + } + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -1296,6 +1440,17 @@ event Management::Request::request_expired(req: Management::Request::Request) } } + if ( req?$restart_state ) + { + req$results += res; + + Management::Log::info(fmt("tx Management::Controller::API::restart_response %s", + Management::Request::to_string(req))); + Broker::publish(Management::Controller::topic, + Management::Controller::API::restart_response, + req$id, req$results); + } + if ( req?$test_state ) { Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); From 3aa0409792828c512be49044e93af986938de650 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 17:33:11 -0700 Subject: [PATCH 7/9] Management framework: edit pass over docstrings This expands cross-referencing in the doc strings and adds a bit more explanation. --- .../frameworks/management/agent/api.zeek | 33 +++++++------ .../frameworks/management/controller/api.zeek | 46 ++++++++++++------- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index fab289a8e5..d1440e2068 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -33,8 +33,8 @@ export { global deploy_request: event(reqid: string, config: Management::Configuration, force: bool &default=F); - ## Response to a deploy_request event. The agent sends - ## this back to the controller. + ## Response to a :zeek:see:`Management::Agent::API::deploy_request` + ## event. The agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -56,8 +56,8 @@ export { ## global get_nodes_request: event(reqid: string); - ## Response to a get_nodes_request event. The agent sends this back to the - ## controller. + ## Response to a :zeek:see:`Management::Agent::API::get_nodes_request` + ## event. The agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -89,9 +89,10 @@ export { global node_dispatch_request: event(reqid: string, action: vector of string, nodes: set[string] &default=set()); - ## Response to a node_dispatch_request event. Each agent sends this back - ## to the controller to report the dispatch outcomes on all nodes managed - ## by that agent. + ## Response to a + ## :zeek:see:`Management::Agent::API::node_dispatch_request` event. Each + ## agent sends this back to the controller to report the dispatch + ## outcomes on all nodes managed by that agent. ## ## reqid: the request identifier used in the request event. ## @@ -105,15 +106,18 @@ export { ## The controller sends this event to confirm to the agent that it is - ## part of the current cluster topology. The agent acknowledges with the - ## corresponding response event. + ## part of the current cluster topology. The agent acknowledges with a + ## :zeek:see:`Management::Agent::API::agent_welcome_response` event, + ## upon which the controller may proceed with a cluster deployment to + ## this agent. ## ## reqid: a request identifier string, echoed in the response event. ## global agent_welcome_request: event(reqid: string); - ## Response to an agent_welcome_request event. The agent sends this - ## back to the controller. + ## Response to a + ## :zeek:see:`Management::Agent::API::agent_welcome_request` event. The + ## agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -135,8 +139,9 @@ export { ## global agent_standby_request: event(reqid: string); - ## Response to an agent_standby_request event. The agent sends this - ## back to the controller. + ## Response to a + ## :zeek:see:`Management::Agent::API::agent_standby_request` event. The + ## agent sends this back to the controller. ## ## reqid: the request identifier used in the request event. ## @@ -196,7 +201,7 @@ export { connecting: bool, api_version: count); - # The following are not yet implemented. + # The following are not yet meaningfully implemented. # Report node state changes. global notify_change: event(instance: string, diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index d97773b75d..d87b6a3a61 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -20,13 +20,14 @@ export { ## global get_instances_request: event(reqid: string); - ## Response to a get_instances_request event. The controller sends - ## this back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_instances_request` + ## event. The controller sends this back to the client. ## ## reqid: the request identifier used in the request event. ## - ## result: the result record. Its data member is a - ## :zeek:see:`Management::Instance` record. + ## result: a :zeek:see:`Management::Result`. Its data member is a vector + ## of :zeek:see:`Management::Instance` records. ## global get_instances_response: event(reqid: string, result: Management::Result); @@ -47,8 +48,10 @@ export { global stage_configuration_request: event(reqid: string, config: Management::Configuration); - ## Response to a stage_configuration_request event. The controller sends - ## this back to the client, conveying validation results. + ## Response to a + ## :zeek:see:`Management::Controller::API::stage_configuration_request` + ## event. The controller sends this back to the client, conveying + ## validation results. ## ## reqid: the request identifier used in the request event. ## @@ -71,8 +74,10 @@ export { ## global get_configuration_request: event(reqid: string, deployed: bool); - ## Response to a get_configuration_request event. The controller sends - ## this back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_configuration_request` + ## event. The controller sends this back to the client, with the + ## requested configuration. ## ## reqid: the request identifier used in the request event. ## @@ -100,8 +105,9 @@ export { ## global deploy_request: event(reqid: string); - ## Response to a deploy_request event. The controller sends this - ## back to the client. + ## Response to a :zeek:see:`Management::Controller::API::deploy_request` + ## event. The controller sends this back to the client, conveying the + ## outcome of the deployment. ## ## reqid: the request identifier used in the request event. ## @@ -123,8 +129,12 @@ export { ## global get_nodes_request: event(reqid: string); - ## Response to a get_nodes_request event. The controller sends this - ## back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_nodes_request` event. The + ## controller sends this back to the client, with a description of the + ## nodes currently managed by the Supervisors on all connected + ## instances. This includes agents and possibly the controller, if it + ## runs jointly with an agent. ## ## reqid: the request identifier used in the request event. ## @@ -156,8 +166,10 @@ export { global get_id_value_request: event(reqid: string, id: string, nodes: set[string] &default=set()); - ## Response to a get_id_value_request event. The controller sends this - ## back to the client. + ## Response to a + ## :zeek:see:`Management::Controller::API::get_id_value_request` + ## event. The controller sends this back to the client, with a JSON + ## representation of the requested global ID on all relevant instances. ## ## reqid: the request identifier used in the request event. ## @@ -216,8 +228,10 @@ export { ## global test_timeout_request: event(reqid: string, with_state: bool); - ## Response to a test_timeout_request event. The controller sends this - ## back to the client if the original request had the with_state flag. + ## Response to a + ## :zeek:see:`Management::Controller::API::test_timeout_request` + ## event. The controller sends this back to the client if the original + ## request had the with_state flag. ## ## reqid: the request identifier used in the request event. ## From 99cd416552a21f76984e9262785e1833e9e85aa8 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 17:18:10 -0700 Subject: [PATCH 8/9] Management framework: bump zeek-client --- auxil/zeek-client | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auxil/zeek-client b/auxil/zeek-client index d92f17cfd8..6daa3f24d9 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit d92f17cfd882dd1edbd3f560181d84f69fbc8037 +Subproject commit 6daa3f24d95c176ec8e504542a8c91aa5d23c3ad From ec2572328371db1055130586788062efdfe2e869 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 22 Jun 2022 21:20:28 -0700 Subject: [PATCH 9/9] Management framework: bump external cluster testsuite --- testing/external/commit-hash.zeek-testing-cluster | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 888db55f2c..7e05fda8d3 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -e01ffdcd799d3ca2851225994108d500c540fbe2 +340bc6191a447008d7cbc4f4f6f6522ff0c70447