From c922f749c57eb3b459d09f5525b43c6cebafa103 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 14:40:00 -0700 Subject: [PATCH 1/9] Management framework: a bit of debug-level logging for troubleshooting --- .../policy/frameworks/management/controller/main.zeek | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index f9d3c2d0a3..4a0ec7d081 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -169,6 +169,8 @@ function add_instance(inst: Management::Instance) Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, Management::Agent::API::agent_welcome_request, req$id); } + else + Management::Log::debug(fmt("instance %s not known to us, skipping", inst$name)); } function drop_instance(inst: Management::Instance) @@ -281,6 +283,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a } add g_instances_known[instance]; + Management::Log::debug(fmt("instance %s now known to us", instance)); if ( instance in g_instances && instance !in g_instances_ready ) { @@ -485,9 +488,15 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf # case we need to re-establish connectivity with an agent. for ( inst_name in insts_to_drop ) + { + Management::Log::debug(fmt("dropping instance %s", inst_name)); drop_instance(g_instances[inst_name]); + } for ( inst_name in insts_to_peer ) + { + Management::Log::debug(fmt("adding instance %s", inst_name)); add_instance(insts_to_peer[inst_name]); + } # Updates to out instance tables are complete, now check if we're already # able to send the config to the agents: From 4371c17d4cc6cb411aabf164cfd5a00dc315340a Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Fri, 27 May 2022 17:03:31 -0700 Subject: [PATCH 2/9] Management framework: verify node starts when deploying a configuration We so far hoped for the best when an agent asked the Supervisor to launch a node. Since the Management::Node::API::notify_node_hello events arriving from new nodes signal when such nodes are up and running, we can use those events to track once/whether all launched nodes have checked in, and respond accordingly. This delays the set_configuration_response event until these checkins have occurred, or a timeout kicks in. In case of error, the agent's response to the controller is in error state and has the remaining, unresponsive/failed set of nodes as its data member. --- .../frameworks/management/agent/main.zeek | 135 +++++++++++++++--- 1 file changed, 114 insertions(+), 21 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 574d2a7674..40efd67606 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -24,6 +24,13 @@ export { node: string; ##< Name of the node the Supervisor is acting on. }; + ## Request state for set_configuration requests. + type SetConfigurationState: record { + ## Zeek cluster nodes the provided configuration requested + ## and which have not yet checked in with the agent. + nodes_pending: set[string]; + }; + ## Request state for node dispatches, tracking the requested action ## as well as received responses. type NodeDispatchState: record { @@ -38,12 +45,20 @@ export { redef record Management::Request::Request += { supervisor_state: SupervisorState &optional; + set_configuration_state: SetConfigurationState &optional; node_dispatch_state: NodeDispatchState &optional; }; # Tag our logs correctly redef Management::role = Management::AGENT; +# Returns the effective agent topic for this agent. +global agent_topic: function(): string; + +# Finalizes a set_configuration_request transaction: cleans up remaining state +# and sends response event. +global send_set_configuration_response: function(req: Management::Request::Request); + # The global configuration as passed to us by the controller global g_config: Management::Configuration; @@ -53,6 +68,12 @@ global g_instances: table[string] of Management::Instance; # A map for the nodes we run on this instance, via this agent. global g_nodes: table[string] of Management::Node; +# The request ID of the most recent configuration update from the controller. +# We track it here until the nodes_pending set in the corresponding request's +# SetConfigurationState is cleared out, or the corresponding request state hits +# a timeout. +global g_config_reqid_pending: string = ""; + # The complete node map employed by the supervisor to describe the cluster # topology to newly forked nodes. We refresh it when we receive new # configurations. @@ -65,6 +86,30 @@ function agent_topic(): string return Management::Agent::topic_prefix + "/" + epi$id; } +function send_set_configuration_response(req: Management::Request::Request) + { + local res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name()); + + if ( |req$set_configuration_state$nodes_pending| > 0 ) + { + res$success = F; + res$error = "some nodes failed to start"; + res$data = req$set_configuration_state$nodes_pending; + } + + Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::set_configuration_response, req$id, res); + + Management::Request::finish(req$id); + + if ( req$id == g_config_reqid_pending ) + g_config_reqid_pending = ""; + } + event SupervisorControl::create_response(reqid: string, result: string) { local req = Management::Request::lookup(reqid); @@ -132,9 +177,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M local nc: Supervisor::NodeConfig; local msg: string; - # Adopt the global configuration provided. - # XXX this can later handle validation and persistence - # XXX should do this transactionally, only set when all else worked + # Adopt the global configuration provided. The act of trying to launch + # the requested nodes perturbs any existing ones one way or another, so + # even if the launch fails it effectively is our new configuration. g_config = config; # Refresh the instances table: @@ -150,19 +195,49 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M g_nodes = table(); g_cluster = table(); + # Special case: the config contains no nodes. We can respond right away. + if ( |config$nodes| == 0 ) + { + g_config_reqid_pending = ""; + + local res = Management::Result( + $reqid = reqid, + $instance = Management::Agent::get_name()); + + Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::set_configuration_response, reqid, res); + return; + } + + local req = Management::Request::create(reqid); + req$set_configuration_state = SetConfigurationState(); + + # Establish this request as the pending one: + g_config_reqid_pending = reqid; + for ( node in config$nodes ) { + # Filter the node set down to the ones this agent manages. if ( node$instance == Management::Agent::get_name() ) + { g_nodes[node$name] = node; + add req$set_configuration_state$nodes_pending[node$name]; + } # The cluster and supervisor frameworks require a port for every - # node, using 0/unknown to signify "don't listen". We use - # optional values and map an absent value to 0/unknown. + # node, using 0/unknown to signify "don't listen". The management + # framework uses optional values, so here we map absent values + # to 0/unknown. local p = 0/unknown; if ( node?$p ) p = node$p; + # Register the node in the g_cluster table. We use it below to + # ship the cluster topology with node configs launched via the + # Supervisor. local cep = Supervisor::ClusterEndpoint( $role = node$role, $host = g_instances[node$instance]$host, @@ -227,22 +302,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M supervisor_create(nc); } - # XXX this currently doesn not fail if any of above problems occurred, - # mainly due to the tediousness of handling the supervisor's response - # events asynchonously. The only indication of error will be - # notification events to the controller. - - if ( reqid != "" ) - { - local res = Management::Result( - $reqid = reqid, - $instance = Management::Agent::get_name()); - - Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", - Management::result_to_string(res))); - Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, reqid, res); - } + # At this point we await Management::Node::API::notify_node_hello events + # from the new nodes, or a timeout, whichever happens first. These + # trigger the set_configuration_response event back to the controller. } event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) @@ -549,8 +611,39 @@ event Management::Node::API::notify_node_hello(node: string) { Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); + # This node is now running; update its state: if ( node in g_nodes ) g_nodes[node]$state = Management::RUNNING; + + # Look up the set_configuration request this node launch was part of (if + # any), and check it off. If it was the last node we expected to launch, + # finalize the request and respond to the controller. + + local req = Management::Request::lookup(g_config_reqid_pending); + + if ( Management::Request::is_null(req) || ! req?$set_configuration_state ) + return; + + if ( node in req$set_configuration_state$nodes_pending ) + { + delete req$set_configuration_state$nodes_pending[node]; + if ( |req$set_configuration_state$nodes_pending| == 0 ) + send_set_configuration_response(req); + } + } + +event Management::Request::request_expired(req: Management::Request::Request) + { + local res = Management::Result($reqid=req$id, + $success = F, + $error = "request timed out"); + + if ( req?$set_configuration_state ) + { + send_set_configuration_response(req); + # This timeout means we no longer have a pending request. + g_config_reqid_pending = ""; + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) From 83c60fd8ac9cd94703a0ce93c5c86864d4f5f310 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Sun, 29 May 2022 22:10:03 -0700 Subject: [PATCH 3/9] Management framework: tune request timeout granularity and interval When the controller relays requests to agents, we want agents to time out more quickly than the corresponding controller requests. This allows agents to respond with more meaningful errors, while the controller's timeout acts mostly as a last resort to ensure a response to the client actually happens. This dials down the table_expire_interval to 2 seconds in both agent and controller, for more predictable timeout behavior. It also dials the agent-side request expiration interval down to 5 seconds, compared to the agent's 10 seconds. We may have to revisit this to allow custom expiration intervals per request/response message type. --- .../policy/frameworks/management/agent/main.zeek | 8 ++++++++ .../frameworks/management/controller/main.zeek | 4 ++++ scripts/policy/frameworks/management/request.zeek | 14 +++++++++----- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 40efd67606..5b6338f62a 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -52,6 +52,14 @@ redef record Management::Request::Request += { # Tag our logs correctly redef Management::role = Management::AGENT; +# Conduct more frequent table expiration checks. This helps get more predictable +# timing for request timeouts and only affects the controller, which is mostly idle. +redef table_expire_interval = 2 sec; + +# Tweak the request timeout so it's relatively quick, and quick enough always to +# time out strictly before the controller's request state (at 10 sec). +redef Management::Request::timeout_interval = 5 sec; + # Returns the effective agent topic for this agent. global agent_topic: function(): string; diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 4a0ec7d081..f83644313e 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -73,6 +73,10 @@ redef record Management::Request::Request += { # Tag our logs correctly redef Management::role = Management::CONTROLLER; +# Conduct more frequent table expiration checks. This helps get more predictable +# timing for request timeouts and only affects the agent, which is mostly idle. +redef table_expire_interval = 2 sec; + global check_instances_ready: function(); global add_instance: function(inst: Management::Instance); global drop_instance: function(inst: Management::Instance); diff --git a/scripts/policy/frameworks/management/request.zeek b/scripts/policy/frameworks/management/request.zeek index 82a4de3648..86de3ea0fd 100644 --- a/scripts/policy/frameworks/management/request.zeek +++ b/scripts/policy/frameworks/management/request.zeek @@ -32,11 +32,15 @@ export { finished: bool &default=F; }; - ## The timeout for request state. Such state (see the :zeek:see:`Management::Request` - ## module) ties together request and response event pairs. The timeout causes - ## its cleanup in the absence of a timely response. It applies both to - ## state kept for client requests, as well as state in the agents for - ## requests to the supervisor. + ## The timeout interval for request state. Such state (see the + ## :zeek:see:`Management::Request` module) ties together request and + ## response event pairs. A timeout causes cleanup of request state if + ## regular request/response processing hasn't already done so. It + ## applies both to request state kept in the controller and the agent, + ## though the two use different timeout values: agent-side requests time + ## out more quickly. This allows agents to send more meaningful error + ## messages, while the controller's timeouts serve as a last resort to + ## ensure response to the client. const timeout_interval = 10sec &redef; ## A token request that serves as a null/nonexistant request. From 49b9f1669c4fb99ba25baf8aad81a59121e617dc Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 12:53:27 -0700 Subject: [PATCH 4/9] Management framework: move to ResultVec in agent's set_configuration response We so far reported one result record per agent, which made it hard to report per-node outcomes for the new configuration. Agents now report one result record per node they're responsible for. --- .../frameworks/management/agent/api.zeek | 2 +- .../frameworks/management/agent/main.zeek | 31 +++++++++++++------ .../management/controller/main.zeek | 9 ++++-- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index f8dc1239a7..9fddaa44f1 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -38,7 +38,7 @@ export { ## result: the result record. ## global set_configuration_response: event(reqid: string, - result: Management::Result); + result: Management::ResultVec); ## The controller sends this event to request a list of diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 5b6338f62a..9a0cb72f57 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -96,21 +96,34 @@ function agent_topic(): string function send_set_configuration_response(req: Management::Request::Request) { - local res = Management::Result( - $reqid = req$id, - $instance = Management::Agent::get_name()); + local node: string; + local res: Management::Result; - if ( |req$set_configuration_state$nodes_pending| > 0 ) + # Put together the results vector for the response event. + for ( node in g_nodes ) { - res$success = F; - res$error = "some nodes failed to start"; - res$data = req$set_configuration_state$nodes_pending; + res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name(), + $node = node); + + if ( node in req$set_configuration_state$nodes_pending ) + { + # This node failed. Pull in any stdout/stderr context + # we might have. + res$success = F; + + # XXX fill in stdout/stderr here if possible + } + + # Add this result to the overall response + req$results[|req$results|] = res; } Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", Management::result_to_string(res))); Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, req$id, res); + Management::Agent::API::set_configuration_response, req$id, req$results); Management::Request::finish(req$id); @@ -215,7 +228,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", Management::result_to_string(res))); Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, reqid, res); + Management::Agent::API::set_configuration_response, reqid, vector(res)); return; } diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index f83644313e..468cc71bab 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -346,7 +346,7 @@ event Management::Agent::API::notify_log(instance: string, msg: string, node: st # XXX TODO } -event Management::Agent::API::set_configuration_response(reqid: string, result: Management::Result) +event Management::Agent::API::set_configuration_response(reqid: string, results: Management::ResultVec) { Management::Log::info(fmt("rx Management::Agent::API::set_configuration_response %s", reqid)); @@ -363,8 +363,11 @@ event Management::Agent::API::set_configuration_response(reqid: string, result: if ( Management::Request::is_null(req) ) return; - # Add this result to the overall response - req$results[|req$results|] = result; + # XXX the usual "any" handling needs to happen here if data is filled in + + # Add this agent's results to the overall response + for ( i in results ) + req$results[|req$results|] = results[i]; # Mark this request as done by removing it from the table of pending # ones. The following if-check should always be true. From f74f21767adfb37e03fd87c27e71bbdb2734fd3a Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 13:23:55 -0700 Subject: [PATCH 5/9] Management framework: disambiguate redef field names in agent and controller During Zeekygen's doc generation both the agent's and controller's main.zeek get loaded. This just happened to not throw errors so far because the redefs either matched perfectly or used different field names. --- .../frameworks/management/agent/main.zeek | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 9a0cb72f57..a7323eedbd 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -43,10 +43,14 @@ export { }; } +# We need to go out of our way here to avoid colliding record field names with +# the similar redef in the controller -- not because of real-world use, but +# because Zeekygen loads it both during documentation extraction. Suffix all +# members with _agent to disambiguate. redef record Management::Request::Request += { - supervisor_state: SupervisorState &optional; - set_configuration_state: SetConfigurationState &optional; - node_dispatch_state: NodeDispatchState &optional; + supervisor_state_agent: SupervisorState &optional; + set_configuration_state_agent: SetConfigurationState &optional; + node_dispatch_state_agent: NodeDispatchState &optional; }; # Tag our logs correctly @@ -107,7 +111,7 @@ function send_set_configuration_response(req: Management::Request::Request) $instance = Management::Agent::get_name(), $node = node); - if ( node in req$set_configuration_state$nodes_pending ) + if ( node in req$set_configuration_state_agent$nodes_pending ) { # This node failed. Pull in any stdout/stderr context # we might have. @@ -137,7 +141,7 @@ event SupervisorControl::create_response(reqid: string, result: string) if ( Management::Request::is_null(req) ) return; - local name = req$supervisor_state$node; + local name = req$supervisor_state_agent$node; if ( |result| > 0 ) { @@ -157,7 +161,7 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) if ( Management::Request::is_null(req) ) return; - local name = req$supervisor_state$node; + local name = req$supervisor_state_agent$node; if ( ! result ) { @@ -174,7 +178,7 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) function supervisor_create(nc: Supervisor::NodeConfig) { local req = Management::Request::create(); - req$supervisor_state = SupervisorState($node = nc$name); + req$supervisor_state_agent = SupervisorState($node = nc$name); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::create_request, req$id, nc); Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); @@ -183,7 +187,7 @@ function supervisor_create(nc: Supervisor::NodeConfig) function supervisor_destroy(node: string) { local req = Management::Request::create(); - req$supervisor_state = SupervisorState($node = node); + req$supervisor_state_agent = SupervisorState($node = node); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request, req$id, node); Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); @@ -233,7 +237,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M } local req = Management::Request::create(reqid); - req$set_configuration_state = SetConfigurationState(); + req$set_configuration_state_agent = SetConfigurationState(); # Establish this request as the pending one: g_config_reqid_pending = reqid; @@ -244,7 +248,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M if ( node$instance == Management::Agent::get_name() ) { g_nodes[node$name] = node; - add req$set_configuration_state$nodes_pending[node$name]; + add req$set_configuration_state_agent$nodes_pending[node$name]; } # The cluster and supervisor frameworks require a port for every @@ -452,8 +456,8 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # report themselves would eventually lead to request timeout. if ( result?$node ) { - if ( result$node in req$node_dispatch_state$requests ) - delete req$node_dispatch_state$requests[result$node]; + if ( result$node in req$node_dispatch_state_agent$requests ) + delete req$node_dispatch_state_agent$requests[result$node]; else { # An unknown or duplicate response -- do nothing. @@ -464,7 +468,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # The usual special treatment for Broker values that are of type "any": # confirm their type here based on the requested dispatch command. - switch req$node_dispatch_state$action[0] + switch req$node_dispatch_state_agent$action[0] { case "get_id_value": if ( result?$data ) @@ -472,7 +476,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag break; default: Management::Log::error(fmt("unexpected dispatch command %s", - req$node_dispatch_state$action[0])); + req$node_dispatch_state_agent$action[0])); break; } @@ -486,7 +490,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # If we still have pending queries out to the agents, do nothing: we'll # handle this soon, or our request will time out and we respond with # error. - if ( |req$node_dispatch_state$requests| > 0 ) + if ( |req$node_dispatch_state_agent$requests| > 0 ) return; # Release the agent-nodes request state, since we now have all responses. @@ -552,7 +556,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto local res: Management::Result; local req = Management::Request::create(reqid); - req$node_dispatch_state = NodeDispatchState($action=action); + req$node_dispatch_state_agent = NodeDispatchState($action=action); # Build up dispatch state for tracking responses. We only dispatch to # nodes that are in state RUNNING, as those have confirmed they're ready @@ -560,7 +564,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto for ( node in nodes_final ) { if ( g_nodes[node]$state == Management::RUNNING ) - add req$node_dispatch_state$requests[node]; + add req$node_dispatch_state_agent$requests[node]; else { res = Management::Result($reqid=reqid, $node=node); @@ -571,7 +575,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto } # Corner case: nothing is in state RUNNING. - if ( |req$node_dispatch_state$requests| == 0 ) + if ( |req$node_dispatch_state_agent$requests| == 0 ) { Management::Log::info(fmt( "tx Management::Agent::API::node_dispatch_response %s, no nodes running", @@ -642,13 +646,13 @@ event Management::Node::API::notify_node_hello(node: string) local req = Management::Request::lookup(g_config_reqid_pending); - if ( Management::Request::is_null(req) || ! req?$set_configuration_state ) + if ( Management::Request::is_null(req) || ! req?$set_configuration_state_agent ) return; - if ( node in req$set_configuration_state$nodes_pending ) + if ( node in req$set_configuration_state_agent$nodes_pending ) { - delete req$set_configuration_state$nodes_pending[node]; - if ( |req$set_configuration_state$nodes_pending| == 0 ) + delete req$set_configuration_state_agent$nodes_pending[node]; + if ( |req$set_configuration_state_agent$nodes_pending| == 0 ) send_set_configuration_response(req); } } @@ -659,7 +663,7 @@ event Management::Request::request_expired(req: Management::Request::Request) $success = F, $error = "request timed out"); - if ( req?$set_configuration_state ) + if ( req?$set_configuration_state_agent ) { send_set_configuration_response(req); # This timeout means we no longer have a pending request. From 24a495da423589927147cefe05476b283e3eec0c Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 12:54:58 -0700 Subject: [PATCH 6/9] Management framework: Supervisor extensions for stdout/stderr handling This improves the framework's handling of Zeek node stdout and stderr by extending the (script-layer) Supervisor functionality. - The Supervisor _either_ directs Zeek nodes' stdout/stderr to files _or_ lets you hook into it at the script level. We'd like both: files make sense to allow inspection outside of the framework, and the framework would benefit from tapping into the streams e.g. for error context. We now provide the file redirection functionality in the Supervisor, in addition to the hook mechanism. The hook mechanism also builds up rolling windows of up to 100 lines (configurable) into stdout/stderr. - The new Mangement::Supervisor::API::notify_node_exit event notifies subscribers (agents, really) that a particular node has exited (and is possibly being restarted by the Supervisor). The event includes the name of the node, plus its recent stdout/stderr context. --- .../management/supervisor/__load__.zeek | 1 + .../frameworks/management/supervisor/api.zeek | 18 +++ .../management/supervisor/config.zeek | 13 ++ .../management/supervisor/main.zeek | 122 ++++++++++++++++++ .../policy/frameworks/management/types.zeek | 12 ++ scripts/test-all-policy.zeek | 4 + .../Baseline/coverage.bare-mode-errors/errors | 4 +- 7 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 scripts/policy/frameworks/management/supervisor/__load__.zeek create mode 100644 scripts/policy/frameworks/management/supervisor/api.zeek create mode 100644 scripts/policy/frameworks/management/supervisor/config.zeek create mode 100644 scripts/policy/frameworks/management/supervisor/main.zeek diff --git a/scripts/policy/frameworks/management/supervisor/__load__.zeek b/scripts/policy/frameworks/management/supervisor/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/management/supervisor/api.zeek b/scripts/policy/frameworks/management/supervisor/api.zeek new file mode 100644 index 0000000000..f02b30d7c7 --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/api.zeek @@ -0,0 +1,18 @@ +@load policy/frameworks/management/types + +module Management::Supervisor::API; + +export { + ## The Supervisor generates this event whenever it has received a status + ## update from the stem, indicating that a node exited. + ## + ## node: the name of a node previously created via + ## :zeek:see:`Supervisor::create`. + ## + ## outputs: stdout/stderr context for the node. The contained strings + ## span up to the 100 most recent lines in the corresponding + ## stream. See :zeek:see:`Management::Supervisor::output_max_lines` + ## to adjust the line limit. + ## + global notify_node_exit: event(node: string, outputs: Management::NodeOutputs); +} diff --git a/scripts/policy/frameworks/management/supervisor/config.zeek b/scripts/policy/frameworks/management/supervisor/config.zeek new file mode 100644 index 0000000000..f910ee8a7d --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/config.zeek @@ -0,0 +1,13 @@ +##! Configuration settings for the Management framework's supervisor extension. + +module Management::Supervisor; + +export { + ## The Broker topic for Management framework communication with the + ## Supervisor. The agent subscribes to this. + const topic_prefix = "zeek/management/supervisor" &redef; + + ## The maximum number of stdout/stderr output lines to convey in + ## :zeek:see:`Management::Supervisor::API::notify_node_exit` events. + const output_max_lines: count = 100 &redef; +} diff --git a/scripts/policy/frameworks/management/supervisor/main.zeek b/scripts/policy/frameworks/management/supervisor/main.zeek new file mode 100644 index 0000000000..f68413a5d3 --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/main.zeek @@ -0,0 +1,122 @@ +##! This module provides functionality the Management framework places directly +##! in the Supervisor. + +@load base/utils/paths +@load base/utils/queue + +@load policy/frameworks/management/types +@load policy/frameworks/management/node/config + +@load ./api +@load ./config + +module Management::Supervisor; + +# stdout/stderr state for a given node. +type NodeOutputStreams: record { + # Line buffers for stdout and stderr. Their length is capped + # to the most recent Management::Supervisor::output_max_lines. + stdout: Queue::Queue; + stderr: Queue::Queue; + + # + stdout_file: file &optional; + stderr_file: file &optional; +}; + +# This tracks output state for the current nodes. +global g_outputs: table[string] of NodeOutputStreams; + +function make_node_output_streams(node: string): NodeOutputStreams + { + local stdout = Queue::init([$max_len = Management::Supervisor::output_max_lines]); + local stderr = Queue::init([$max_len = Management::Supervisor::output_max_lines]); + + local res = NodeOutputStreams($stdout=stdout, $stderr=stderr); + local status = Supervisor::status(node); + + if ( node !in status$nodes ) + return res; + + local ns = status$nodes[node]; + local directory = "."; + + if ( ns$node?$directory ) + directory = ns$node$directory; + + if ( Management::Node::stdout_file != "" ) + res$stdout_file = open(build_path(directory, Management::Node::stdout_file)); + if ( Management::Node::stderr_file != "" ) + res$stderr_file = open(build_path(directory, Management::Node::stderr_file)); + + return res; + } + +hook Supervisor::stdout_hook(node: string, msg: string) + { + if ( node !in g_outputs ) + g_outputs[node] = make_node_output_streams(node); + + # Write to the stdout file if we have one. The flush is clunky, but + # seems worth it: it's too confusing for errors to have happened and not + # yet shown up in the file. (The Supervisor's built-in file redirection + # does this too.) + if ( g_outputs[node]?$stdout_file ) + { + print g_outputs[node]$stdout_file, msg; + flush_all(); + } + + # Update the sliding window of recent output lines. + Queue::put(g_outputs[node]$stdout, msg); + + # Don't print this message in the Supervisor's own stdout + break; + } + +hook Supervisor::stderr_hook(node: string, msg: string) + { + if ( node !in g_outputs ) + g_outputs[node] = make_node_output_streams(node); + + if ( g_outputs[node]?$stderr_file ) + { + print g_outputs[node]$stderr_file, msg; + flush_all(); + } + + Queue::put(g_outputs[node]$stderr, msg); + + # Don't print this message in the Supervisor's own stdout + break; + } + +event Supervisor::node_status(node: string, pid: count) + { + # The node just started or restarted. If we have collected any output + # for its past life, send it via a notify_node_exit event. + if ( node in g_outputs ) + { + local stdout_lines: vector of string; + local stderr_lines: vector of string; + + Queue::get_vector(g_outputs[node]$stdout, stdout_lines); + Queue::get_vector(g_outputs[node]$stderr, stderr_lines); + + if ( |stdout_lines| > 0 || |stderr_lines| > 0 ) + { + local outputs = Management::NodeOutputs( + $stdout = join_string_vec(stdout_lines, "\n"), + $stderr = join_string_vec(stderr_lines, "\n")); + + Broker::publish(topic_prefix, Management::Supervisor::API::notify_node_exit, node, outputs); + } + + if ( g_outputs[node]?$stdout_file ) + close(g_outputs[node]$stdout_file); + if ( g_outputs[node]?$stderr_file ) + close(g_outputs[node]$stderr_file); + } + + g_outputs[node] = make_node_output_streams(node); + } diff --git a/scripts/policy/frameworks/management/types.zeek b/scripts/policy/frameworks/management/types.zeek index db3ac8da55..c4076cd8f7 100644 --- a/scripts/policy/frameworks/management/types.zeek +++ b/scripts/policy/frameworks/management/types.zeek @@ -104,6 +104,18 @@ export { type ResultVec: vector of Result; + ## In :zeek:see:`Management::Controller::API::set_configuration_response`, + ## events, each :zeek:see:`Management::Result` indicates the outcome of a + ## requested cluster node. If a node does not launch properly (meaning + ## it doesn't check in with the agent on thee machine it's running on), + ## the result will indicate failure, and its data field will be an + ## instance of this record, capturing the stdout and stderr output of + ## the failing node. + type NodeOutputs: record { + stdout: string; ##< The stdout stream of a Zeek process + stderr: string; ##< The stderr stream of a Zeek process + }; + ## Given a :zeek:see:`Management::Result` record, ## this function returns a string summarizing it. global result_to_string: function(res: Result): string; diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 2dc50cd8af..eb089a9aa2 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -29,6 +29,10 @@ @load frameworks/management/node/api.zeek @load frameworks/management/node/config.zeek # @load frameworks/management/node/main.zeek +@load frameworks/management/supervisor/__load__.zeek +@load frameworks/management/supervisor/api.zeek +@load frameworks/management/supervisor/config.zeek +@load frameworks/management/supervisor/main.zeek @load frameworks/management/request.zeek @load frameworks/management/types.zeek @load frameworks/management/util.zeek diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index 6e8c09e1bd..d179d2c6ff 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -2,8 +2,8 @@ ### NOTE: This file has been sorted with diff-sort. warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:15 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:66 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:66 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:70 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:70 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") From f10b94de39860243c9e99fec67708e14ba421ca9 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 18:51:15 -0700 Subject: [PATCH 7/9] Management framework: enable stdout/stderr reporting This uses the new frameworks/management/supervisor functionality to maintain stdout/stderr files, and hooks output context into set_configuration error results. --- .../frameworks/management/agent/__load__.zeek | 1 + .../frameworks/management/agent/boot.zeek | 7 ++--- .../frameworks/management/agent/main.zeek | 28 ++++++++++++++----- .../management/controller/boot.zeek | 7 ++--- .../management/controller/main.zeek | 8 ++++-- 5 files changed, 34 insertions(+), 17 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/__load__.zeek b/scripts/policy/frameworks/management/agent/__load__.zeek index 590325ed9a..4c8ed9224f 100644 --- a/scripts/policy/frameworks/management/agent/__load__.zeek +++ b/scripts/policy/frameworks/management/agent/__load__.zeek @@ -12,5 +12,6 @@ @endif @if ( Supervisor::is_supervisor() ) +@load policy/frameworks/management/supervisor @load ./boot @endif diff --git a/scripts/policy/frameworks/management/agent/boot.zeek b/scripts/policy/frameworks/management/agent/boot.zeek index ead12665f2..cd1e302c76 100644 --- a/scripts/policy/frameworks/management/agent/boot.zeek +++ b/scripts/policy/frameworks/management/agent/boot.zeek @@ -39,10 +39,9 @@ event zeek_init() if ( ! mkdir(sn$directory) ) print(fmt("warning: could not create agent state dir '%s'", sn$directory)); - if ( Management::Agent::stdout_file != "" ) - sn$stdout_file = Management::Agent::stdout_file; - if ( Management::Agent::stderr_file != "" ) - sn$stderr_file = Management::Agent::stderr_file; + # We don't set sn$stdout_file/stderr_file here because the Management + # framework's Supervisor shim manages those output files itself. See + # frameworks/management/supervisor/main.zeek for details. # This helps identify Management framework nodes reliably. sn$env["ZEEK_MANAGEMENT_NODE"] = "AGENT"; diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index a7323eedbd..efeb680539 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -9,6 +9,8 @@ @load policy/frameworks/management @load policy/frameworks/management/node/api @load policy/frameworks/management/node/config +@load policy/frameworks/management/supervisor/api +@load policy/frameworks/management/supervisor/config @load ./api @load ./config @@ -91,6 +93,10 @@ global g_config_reqid_pending: string = ""; # configurations. global g_cluster: table[string] of Supervisor::ClusterEndpoint; +# The most recent output contexts we've received from the Supervisor, for +# any of our nodes. +global g_outputs: table[string] of Management::NodeOutputs; + function agent_topic(): string { @@ -113,11 +119,12 @@ function send_set_configuration_response(req: Management::Request::Request) if ( node in req$set_configuration_state_agent$nodes_pending ) { - # This node failed. Pull in any stdout/stderr context - # we might have. + # This node failed. res$success = F; - # XXX fill in stdout/stderr here if possible + # Pull in any stdout/stderr context we might have. + if ( node in g_outputs ) + res$data = g_outputs[node]; } # Add this result to the overall response @@ -135,6 +142,12 @@ function send_set_configuration_response(req: Management::Request::Request) g_config_reqid_pending = ""; } +event Management::Supervisor::API::notify_node_exit(node: string, outputs: Management::NodeOutputs) + { + if ( node in g_nodes ) + g_outputs[node] = outputs; + } + event SupervisorControl::create_response(reqid: string, result: string) { local req = Management::Request::lookup(reqid); @@ -315,10 +328,10 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M # node. nc$scripts[|nc$scripts|] = "policy/frameworks/management/node"; - if ( Management::Node::stdout_file != "" ) - nc$stdout_file = Management::Node::stdout_file; - if ( Management::Node::stderr_file != "" ) - nc$stderr_file = Management::Node::stderr_file; + # We don't set nc$stdout_file/stderr_file here because the + # Management framework's Supervisor shim manages those output + # files itself. See frameworks/management/supervisor/main.zeek + # for details. # XXX could use options to enable per-node overrides for # directory, stdout, stderr, others? @@ -708,6 +721,7 @@ event zeek_init() Broker::subscribe(agent_topic()); Broker::subscribe(SupervisorControl::topic_prefix); Broker::subscribe(Management::Node::node_topic); + Broker::subscribe(Management::Supervisor::topic_prefix); # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) diff --git a/scripts/policy/frameworks/management/controller/boot.zeek b/scripts/policy/frameworks/management/controller/boot.zeek index 6470385d94..e117583e1e 100644 --- a/scripts/policy/frameworks/management/controller/boot.zeek +++ b/scripts/policy/frameworks/management/controller/boot.zeek @@ -34,10 +34,9 @@ event zeek_init() if ( ! mkdir(sn$directory) ) print(fmt("warning: could not create controller state dir '%s'", sn$directory)); - if ( Management::Controller::stdout_file != "" ) - sn$stdout_file = Management::Controller::stdout_file; - if ( Management::Controller::stderr_file != "" ) - sn$stderr_file = Management::Controller::stderr_file; + # We don't set sn$stdout_file/stderr_file here because the Management + # framework's Supervisor shim manages those output files itself. See + # frameworks/management/supervisor/main.zeek for details. # This helps identify Management framework nodes reliably. sn$env["ZEEK_MANAGEMENT_NODE"] = "CONTROLLER"; diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 468cc71bab..ba35619a6c 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -363,11 +363,15 @@ event Management::Agent::API::set_configuration_response(reqid: string, results: if ( Management::Request::is_null(req) ) return; - # XXX the usual "any" handling needs to happen here if data is filled in - # Add this agent's results to the overall response for ( i in results ) + { + # The usual "any" treatment to keep access predictable + if ( results[i]?$data ) + results[i]$data = results[i]$data as Management::NodeOutputs; + req$results[|req$results|] = results[i]; + } # Mark this request as done by removing it from the table of pending # ones. The following if-check should always be true. From 3320e46132afbb6d20e64a0a7c17408a0c807dcb Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 12:51:15 -0700 Subject: [PATCH 8/9] Management framework: bump zeek-client to pull in set-config rendering --- auxil/zeek-client | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auxil/zeek-client b/auxil/zeek-client index 6a3d1b5516..4b1ccaeb1f 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit 6a3d1b5516e5c9343072466e3c627aa13324f2d0 +Subproject commit 4b1ccaeb1f64cc3401c6424923c69d9937fdefb4 From c13b367edec98a909707c7997863b92a21016c3a Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 30 May 2022 21:40:26 -0700 Subject: [PATCH 9/9] Management framework: bump external cluster testsuite --- testing/external/commit-hash.zeek-testing-cluster | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 461b2d5856..e01174b3bc 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -01e1d1ad94cea81091c74e829d86815fdef0dd62 +433c6dea879632b462bf0351e24ff8e4ac3274bb