diff --git a/CHANGES b/CHANGES index 0fc8cf42dd..4b0e8923ac 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,17 @@ +5.0.0-dev.535 | 2022-05-31 12:58:32 -0700 + + * Management framework updates (Christian Kreibich, Corelight) + + - bump external cluster testsuite + - bump zeek-client to pull in set-config rendering + - enable stdout/stderr reporting + - Supervisor extensions for stdout/stderr handling + - disambiguate redef field names in agent and controller + - move to ResultVec in agent's set_configuration response + - tune request timeout granularity and interval + - verify node starts when deploying a configuration + - a bit of debug-level logging for troubleshooting + 5.0.0-dev.525 | 2022-05-31 12:53:01 -0700 * Add Supervisor::node_status notification event (Christian Kreibich, Corelight) diff --git a/VERSION b/VERSION index 2ea2357a14..bcad617922 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.0.0-dev.525 +5.0.0-dev.535 diff --git a/auxil/zeek-client b/auxil/zeek-client index 6a3d1b5516..2b00ec50c0 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit 6a3d1b5516e5c9343072466e3c627aa13324f2d0 +Subproject commit 2b00ec50c0e79e9f3ad18bad58b92ba32b405274 diff --git a/scripts/policy/frameworks/management/agent/__load__.zeek b/scripts/policy/frameworks/management/agent/__load__.zeek index 590325ed9a..4c8ed9224f 100644 --- a/scripts/policy/frameworks/management/agent/__load__.zeek +++ b/scripts/policy/frameworks/management/agent/__load__.zeek @@ -12,5 +12,6 @@ @endif @if ( Supervisor::is_supervisor() ) +@load policy/frameworks/management/supervisor @load ./boot @endif diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index f8dc1239a7..9fddaa44f1 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -38,7 +38,7 @@ export { ## result: the result record. ## global set_configuration_response: event(reqid: string, - result: Management::Result); + result: Management::ResultVec); ## The controller sends this event to request a list of diff --git a/scripts/policy/frameworks/management/agent/boot.zeek b/scripts/policy/frameworks/management/agent/boot.zeek index ead12665f2..cd1e302c76 100644 --- a/scripts/policy/frameworks/management/agent/boot.zeek +++ b/scripts/policy/frameworks/management/agent/boot.zeek @@ -39,10 +39,9 @@ event zeek_init() if ( ! mkdir(sn$directory) ) print(fmt("warning: could not create agent state dir '%s'", sn$directory)); - if ( Management::Agent::stdout_file != "" ) - sn$stdout_file = Management::Agent::stdout_file; - if ( Management::Agent::stderr_file != "" ) - sn$stderr_file = Management::Agent::stderr_file; + # We don't set sn$stdout_file/stderr_file here because the Management + # framework's Supervisor shim manages those output files itself. See + # frameworks/management/supervisor/main.zeek for details. # This helps identify Management framework nodes reliably. sn$env["ZEEK_MANAGEMENT_NODE"] = "AGENT"; diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 574d2a7674..efeb680539 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -9,6 +9,8 @@ @load policy/frameworks/management @load policy/frameworks/management/node/api @load policy/frameworks/management/node/config +@load policy/frameworks/management/supervisor/api +@load policy/frameworks/management/supervisor/config @load ./api @load ./config @@ -24,6 +26,13 @@ export { node: string; ##< Name of the node the Supervisor is acting on. }; + ## Request state for set_configuration requests. + type SetConfigurationState: record { + ## Zeek cluster nodes the provided configuration requested + ## and which have not yet checked in with the agent. + nodes_pending: set[string]; + }; + ## Request state for node dispatches, tracking the requested action ## as well as received responses. type NodeDispatchState: record { @@ -36,14 +45,34 @@ export { }; } +# We need to go out of our way here to avoid colliding record field names with +# the similar redef in the controller -- not because of real-world use, but +# because Zeekygen loads it both during documentation extraction. Suffix all +# members with _agent to disambiguate. redef record Management::Request::Request += { - supervisor_state: SupervisorState &optional; - node_dispatch_state: NodeDispatchState &optional; + supervisor_state_agent: SupervisorState &optional; + set_configuration_state_agent: SetConfigurationState &optional; + node_dispatch_state_agent: NodeDispatchState &optional; }; # Tag our logs correctly redef Management::role = Management::AGENT; +# Conduct more frequent table expiration checks. This helps get more predictable +# timing for request timeouts and only affects the controller, which is mostly idle. +redef table_expire_interval = 2 sec; + +# Tweak the request timeout so it's relatively quick, and quick enough always to +# time out strictly before the controller's request state (at 10 sec). +redef Management::Request::timeout_interval = 5 sec; + +# Returns the effective agent topic for this agent. +global agent_topic: function(): string; + +# Finalizes a set_configuration_request transaction: cleans up remaining state +# and sends response event. +global send_set_configuration_response: function(req: Management::Request::Request); + # The global configuration as passed to us by the controller global g_config: Management::Configuration; @@ -53,11 +82,21 @@ global g_instances: table[string] of Management::Instance; # A map for the nodes we run on this instance, via this agent. global g_nodes: table[string] of Management::Node; +# The request ID of the most recent configuration update from the controller. +# We track it here until the nodes_pending set in the corresponding request's +# SetConfigurationState is cleared out, or the corresponding request state hits +# a timeout. +global g_config_reqid_pending: string = ""; + # The complete node map employed by the supervisor to describe the cluster # topology to newly forked nodes. We refresh it when we receive new # configurations. global g_cluster: table[string] of Supervisor::ClusterEndpoint; +# The most recent output contexts we've received from the Supervisor, for +# any of our nodes. +global g_outputs: table[string] of Management::NodeOutputs; + function agent_topic(): string { @@ -65,13 +104,57 @@ function agent_topic(): string return Management::Agent::topic_prefix + "/" + epi$id; } +function send_set_configuration_response(req: Management::Request::Request) + { + local node: string; + local res: Management::Result; + + # Put together the results vector for the response event. + for ( node in g_nodes ) + { + res = Management::Result( + $reqid = req$id, + $instance = Management::Agent::get_name(), + $node = node); + + if ( node in req$set_configuration_state_agent$nodes_pending ) + { + # This node failed. + res$success = F; + + # Pull in any stdout/stderr context we might have. + if ( node in g_outputs ) + res$data = g_outputs[node]; + } + + # Add this result to the overall response + req$results[|req$results|] = res; + } + + Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::set_configuration_response, req$id, req$results); + + Management::Request::finish(req$id); + + if ( req$id == g_config_reqid_pending ) + g_config_reqid_pending = ""; + } + +event Management::Supervisor::API::notify_node_exit(node: string, outputs: Management::NodeOutputs) + { + if ( node in g_nodes ) + g_outputs[node] = outputs; + } + event SupervisorControl::create_response(reqid: string, result: string) { local req = Management::Request::lookup(reqid); if ( Management::Request::is_null(req) ) return; - local name = req$supervisor_state$node; + local name = req$supervisor_state_agent$node; if ( |result| > 0 ) { @@ -91,7 +174,7 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) if ( Management::Request::is_null(req) ) return; - local name = req$supervisor_state$node; + local name = req$supervisor_state_agent$node; if ( ! result ) { @@ -108,7 +191,7 @@ event SupervisorControl::destroy_response(reqid: string, result: bool) function supervisor_create(nc: Supervisor::NodeConfig) { local req = Management::Request::create(); - req$supervisor_state = SupervisorState($node = nc$name); + req$supervisor_state_agent = SupervisorState($node = nc$name); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::create_request, req$id, nc); Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); @@ -117,7 +200,7 @@ function supervisor_create(nc: Supervisor::NodeConfig) function supervisor_destroy(node: string) { local req = Management::Request::create(); - req$supervisor_state = SupervisorState($node = node); + req$supervisor_state_agent = SupervisorState($node = node); Broker::publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request, req$id, node); Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); @@ -132,9 +215,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M local nc: Supervisor::NodeConfig; local msg: string; - # Adopt the global configuration provided. - # XXX this can later handle validation and persistence - # XXX should do this transactionally, only set when all else worked + # Adopt the global configuration provided. The act of trying to launch + # the requested nodes perturbs any existing ones one way or another, so + # even if the launch fails it effectively is our new configuration. g_config = config; # Refresh the instances table: @@ -150,19 +233,49 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M g_nodes = table(); g_cluster = table(); + # Special case: the config contains no nodes. We can respond right away. + if ( |config$nodes| == 0 ) + { + g_config_reqid_pending = ""; + + local res = Management::Result( + $reqid = reqid, + $instance = Management::Agent::get_name()); + + Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", + Management::result_to_string(res))); + Broker::publish(agent_topic(), + Management::Agent::API::set_configuration_response, reqid, vector(res)); + return; + } + + local req = Management::Request::create(reqid); + req$set_configuration_state_agent = SetConfigurationState(); + + # Establish this request as the pending one: + g_config_reqid_pending = reqid; + for ( node in config$nodes ) { + # Filter the node set down to the ones this agent manages. if ( node$instance == Management::Agent::get_name() ) + { g_nodes[node$name] = node; + add req$set_configuration_state_agent$nodes_pending[node$name]; + } # The cluster and supervisor frameworks require a port for every - # node, using 0/unknown to signify "don't listen". We use - # optional values and map an absent value to 0/unknown. + # node, using 0/unknown to signify "don't listen". The management + # framework uses optional values, so here we map absent values + # to 0/unknown. local p = 0/unknown; if ( node?$p ) p = node$p; + # Register the node in the g_cluster table. We use it below to + # ship the cluster topology with node configs launched via the + # Supervisor. local cep = Supervisor::ClusterEndpoint( $role = node$role, $host = g_instances[node$instance]$host, @@ -215,10 +328,10 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M # node. nc$scripts[|nc$scripts|] = "policy/frameworks/management/node"; - if ( Management::Node::stdout_file != "" ) - nc$stdout_file = Management::Node::stdout_file; - if ( Management::Node::stderr_file != "" ) - nc$stderr_file = Management::Node::stderr_file; + # We don't set nc$stdout_file/stderr_file here because the + # Management framework's Supervisor shim manages those output + # files itself. See frameworks/management/supervisor/main.zeek + # for details. # XXX could use options to enable per-node overrides for # directory, stdout, stderr, others? @@ -227,22 +340,9 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M supervisor_create(nc); } - # XXX this currently doesn not fail if any of above problems occurred, - # mainly due to the tediousness of handling the supervisor's response - # events asynchonously. The only indication of error will be - # notification events to the controller. - - if ( reqid != "" ) - { - local res = Management::Result( - $reqid = reqid, - $instance = Management::Agent::get_name()); - - Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s", - Management::result_to_string(res))); - Broker::publish(agent_topic(), - Management::Agent::API::set_configuration_response, reqid, res); - } + # At this point we await Management::Node::API::notify_node_hello events + # from the new nodes, or a timeout, whichever happens first. These + # trigger the set_configuration_response event back to the controller. } event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) @@ -369,8 +469,8 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # report themselves would eventually lead to request timeout. if ( result?$node ) { - if ( result$node in req$node_dispatch_state$requests ) - delete req$node_dispatch_state$requests[result$node]; + if ( result$node in req$node_dispatch_state_agent$requests ) + delete req$node_dispatch_state_agent$requests[result$node]; else { # An unknown or duplicate response -- do nothing. @@ -381,7 +481,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # The usual special treatment for Broker values that are of type "any": # confirm their type here based on the requested dispatch command. - switch req$node_dispatch_state$action[0] + switch req$node_dispatch_state_agent$action[0] { case "get_id_value": if ( result?$data ) @@ -389,7 +489,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag break; default: Management::Log::error(fmt("unexpected dispatch command %s", - req$node_dispatch_state$action[0])); + req$node_dispatch_state_agent$action[0])); break; } @@ -403,7 +503,7 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # If we still have pending queries out to the agents, do nothing: we'll # handle this soon, or our request will time out and we respond with # error. - if ( |req$node_dispatch_state$requests| > 0 ) + if ( |req$node_dispatch_state_agent$requests| > 0 ) return; # Release the agent-nodes request state, since we now have all responses. @@ -469,7 +569,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto local res: Management::Result; local req = Management::Request::create(reqid); - req$node_dispatch_state = NodeDispatchState($action=action); + req$node_dispatch_state_agent = NodeDispatchState($action=action); # Build up dispatch state for tracking responses. We only dispatch to # nodes that are in state RUNNING, as those have confirmed they're ready @@ -477,7 +577,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto for ( node in nodes_final ) { if ( g_nodes[node]$state == Management::RUNNING ) - add req$node_dispatch_state$requests[node]; + add req$node_dispatch_state_agent$requests[node]; else { res = Management::Result($reqid=reqid, $node=node); @@ -488,7 +588,7 @@ event Management::Agent::API::node_dispatch_request(reqid: string, action: vecto } # Corner case: nothing is in state RUNNING. - if ( |req$node_dispatch_state$requests| == 0 ) + if ( |req$node_dispatch_state_agent$requests| == 0 ) { Management::Log::info(fmt( "tx Management::Agent::API::node_dispatch_response %s, no nodes running", @@ -549,8 +649,39 @@ event Management::Node::API::notify_node_hello(node: string) { Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); + # This node is now running; update its state: if ( node in g_nodes ) g_nodes[node]$state = Management::RUNNING; + + # Look up the set_configuration request this node launch was part of (if + # any), and check it off. If it was the last node we expected to launch, + # finalize the request and respond to the controller. + + local req = Management::Request::lookup(g_config_reqid_pending); + + if ( Management::Request::is_null(req) || ! req?$set_configuration_state_agent ) + return; + + if ( node in req$set_configuration_state_agent$nodes_pending ) + { + delete req$set_configuration_state_agent$nodes_pending[node]; + if ( |req$set_configuration_state_agent$nodes_pending| == 0 ) + send_set_configuration_response(req); + } + } + +event Management::Request::request_expired(req: Management::Request::Request) + { + local res = Management::Result($reqid=req$id, + $success = F, + $error = "request timed out"); + + if ( req?$set_configuration_state_agent ) + { + send_set_configuration_response(req); + # This timeout means we no longer have a pending request. + g_config_reqid_pending = ""; + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) @@ -590,6 +721,7 @@ event zeek_init() Broker::subscribe(agent_topic()); Broker::subscribe(SupervisorControl::topic_prefix); Broker::subscribe(Management::Node::node_topic); + Broker::subscribe(Management::Supervisor::topic_prefix); # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) diff --git a/scripts/policy/frameworks/management/controller/boot.zeek b/scripts/policy/frameworks/management/controller/boot.zeek index 6470385d94..e117583e1e 100644 --- a/scripts/policy/frameworks/management/controller/boot.zeek +++ b/scripts/policy/frameworks/management/controller/boot.zeek @@ -34,10 +34,9 @@ event zeek_init() if ( ! mkdir(sn$directory) ) print(fmt("warning: could not create controller state dir '%s'", sn$directory)); - if ( Management::Controller::stdout_file != "" ) - sn$stdout_file = Management::Controller::stdout_file; - if ( Management::Controller::stderr_file != "" ) - sn$stderr_file = Management::Controller::stderr_file; + # We don't set sn$stdout_file/stderr_file here because the Management + # framework's Supervisor shim manages those output files itself. See + # frameworks/management/supervisor/main.zeek for details. # This helps identify Management framework nodes reliably. sn$env["ZEEK_MANAGEMENT_NODE"] = "CONTROLLER"; diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index f9d3c2d0a3..ba35619a6c 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -73,6 +73,10 @@ redef record Management::Request::Request += { # Tag our logs correctly redef Management::role = Management::CONTROLLER; +# Conduct more frequent table expiration checks. This helps get more predictable +# timing for request timeouts and only affects the agent, which is mostly idle. +redef table_expire_interval = 2 sec; + global check_instances_ready: function(); global add_instance: function(inst: Management::Instance); global drop_instance: function(inst: Management::Instance); @@ -169,6 +173,8 @@ function add_instance(inst: Management::Instance) Broker::publish(Management::Agent::topic_prefix + "/" + inst$name, Management::Agent::API::agent_welcome_request, req$id); } + else + Management::Log::debug(fmt("instance %s not known to us, skipping", inst$name)); } function drop_instance(inst: Management::Instance) @@ -281,6 +287,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a } add g_instances_known[instance]; + Management::Log::debug(fmt("instance %s now known to us", instance)); if ( instance in g_instances && instance !in g_instances_ready ) { @@ -339,7 +346,7 @@ event Management::Agent::API::notify_log(instance: string, msg: string, node: st # XXX TODO } -event Management::Agent::API::set_configuration_response(reqid: string, result: Management::Result) +event Management::Agent::API::set_configuration_response(reqid: string, results: Management::ResultVec) { Management::Log::info(fmt("rx Management::Agent::API::set_configuration_response %s", reqid)); @@ -356,8 +363,15 @@ event Management::Agent::API::set_configuration_response(reqid: string, result: if ( Management::Request::is_null(req) ) return; - # Add this result to the overall response - req$results[|req$results|] = result; + # Add this agent's results to the overall response + for ( i in results ) + { + # The usual "any" treatment to keep access predictable + if ( results[i]?$data ) + results[i]$data = results[i]$data as Management::NodeOutputs; + + req$results[|req$results|] = results[i]; + } # Mark this request as done by removing it from the table of pending # ones. The following if-check should always be true. @@ -485,9 +499,15 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf # case we need to re-establish connectivity with an agent. for ( inst_name in insts_to_drop ) + { + Management::Log::debug(fmt("dropping instance %s", inst_name)); drop_instance(g_instances[inst_name]); + } for ( inst_name in insts_to_peer ) + { + Management::Log::debug(fmt("adding instance %s", inst_name)); add_instance(insts_to_peer[inst_name]); + } # Updates to out instance tables are complete, now check if we're already # able to send the config to the agents: diff --git a/scripts/policy/frameworks/management/request.zeek b/scripts/policy/frameworks/management/request.zeek index 82a4de3648..86de3ea0fd 100644 --- a/scripts/policy/frameworks/management/request.zeek +++ b/scripts/policy/frameworks/management/request.zeek @@ -32,11 +32,15 @@ export { finished: bool &default=F; }; - ## The timeout for request state. Such state (see the :zeek:see:`Management::Request` - ## module) ties together request and response event pairs. The timeout causes - ## its cleanup in the absence of a timely response. It applies both to - ## state kept for client requests, as well as state in the agents for - ## requests to the supervisor. + ## The timeout interval for request state. Such state (see the + ## :zeek:see:`Management::Request` module) ties together request and + ## response event pairs. A timeout causes cleanup of request state if + ## regular request/response processing hasn't already done so. It + ## applies both to request state kept in the controller and the agent, + ## though the two use different timeout values: agent-side requests time + ## out more quickly. This allows agents to send more meaningful error + ## messages, while the controller's timeouts serve as a last resort to + ## ensure response to the client. const timeout_interval = 10sec &redef; ## A token request that serves as a null/nonexistant request. diff --git a/scripts/policy/frameworks/management/supervisor/__load__.zeek b/scripts/policy/frameworks/management/supervisor/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/management/supervisor/api.zeek b/scripts/policy/frameworks/management/supervisor/api.zeek new file mode 100644 index 0000000000..f02b30d7c7 --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/api.zeek @@ -0,0 +1,18 @@ +@load policy/frameworks/management/types + +module Management::Supervisor::API; + +export { + ## The Supervisor generates this event whenever it has received a status + ## update from the stem, indicating that a node exited. + ## + ## node: the name of a node previously created via + ## :zeek:see:`Supervisor::create`. + ## + ## outputs: stdout/stderr context for the node. The contained strings + ## span up to the 100 most recent lines in the corresponding + ## stream. See :zeek:see:`Management::Supervisor::output_max_lines` + ## to adjust the line limit. + ## + global notify_node_exit: event(node: string, outputs: Management::NodeOutputs); +} diff --git a/scripts/policy/frameworks/management/supervisor/config.zeek b/scripts/policy/frameworks/management/supervisor/config.zeek new file mode 100644 index 0000000000..f910ee8a7d --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/config.zeek @@ -0,0 +1,13 @@ +##! Configuration settings for the Management framework's supervisor extension. + +module Management::Supervisor; + +export { + ## The Broker topic for Management framework communication with the + ## Supervisor. The agent subscribes to this. + const topic_prefix = "zeek/management/supervisor" &redef; + + ## The maximum number of stdout/stderr output lines to convey in + ## :zeek:see:`Management::Supervisor::API::notify_node_exit` events. + const output_max_lines: count = 100 &redef; +} diff --git a/scripts/policy/frameworks/management/supervisor/main.zeek b/scripts/policy/frameworks/management/supervisor/main.zeek new file mode 100644 index 0000000000..f68413a5d3 --- /dev/null +++ b/scripts/policy/frameworks/management/supervisor/main.zeek @@ -0,0 +1,122 @@ +##! This module provides functionality the Management framework places directly +##! in the Supervisor. + +@load base/utils/paths +@load base/utils/queue + +@load policy/frameworks/management/types +@load policy/frameworks/management/node/config + +@load ./api +@load ./config + +module Management::Supervisor; + +# stdout/stderr state for a given node. +type NodeOutputStreams: record { + # Line buffers for stdout and stderr. Their length is capped + # to the most recent Management::Supervisor::output_max_lines. + stdout: Queue::Queue; + stderr: Queue::Queue; + + # + stdout_file: file &optional; + stderr_file: file &optional; +}; + +# This tracks output state for the current nodes. +global g_outputs: table[string] of NodeOutputStreams; + +function make_node_output_streams(node: string): NodeOutputStreams + { + local stdout = Queue::init([$max_len = Management::Supervisor::output_max_lines]); + local stderr = Queue::init([$max_len = Management::Supervisor::output_max_lines]); + + local res = NodeOutputStreams($stdout=stdout, $stderr=stderr); + local status = Supervisor::status(node); + + if ( node !in status$nodes ) + return res; + + local ns = status$nodes[node]; + local directory = "."; + + if ( ns$node?$directory ) + directory = ns$node$directory; + + if ( Management::Node::stdout_file != "" ) + res$stdout_file = open(build_path(directory, Management::Node::stdout_file)); + if ( Management::Node::stderr_file != "" ) + res$stderr_file = open(build_path(directory, Management::Node::stderr_file)); + + return res; + } + +hook Supervisor::stdout_hook(node: string, msg: string) + { + if ( node !in g_outputs ) + g_outputs[node] = make_node_output_streams(node); + + # Write to the stdout file if we have one. The flush is clunky, but + # seems worth it: it's too confusing for errors to have happened and not + # yet shown up in the file. (The Supervisor's built-in file redirection + # does this too.) + if ( g_outputs[node]?$stdout_file ) + { + print g_outputs[node]$stdout_file, msg; + flush_all(); + } + + # Update the sliding window of recent output lines. + Queue::put(g_outputs[node]$stdout, msg); + + # Don't print this message in the Supervisor's own stdout + break; + } + +hook Supervisor::stderr_hook(node: string, msg: string) + { + if ( node !in g_outputs ) + g_outputs[node] = make_node_output_streams(node); + + if ( g_outputs[node]?$stderr_file ) + { + print g_outputs[node]$stderr_file, msg; + flush_all(); + } + + Queue::put(g_outputs[node]$stderr, msg); + + # Don't print this message in the Supervisor's own stdout + break; + } + +event Supervisor::node_status(node: string, pid: count) + { + # The node just started or restarted. If we have collected any output + # for its past life, send it via a notify_node_exit event. + if ( node in g_outputs ) + { + local stdout_lines: vector of string; + local stderr_lines: vector of string; + + Queue::get_vector(g_outputs[node]$stdout, stdout_lines); + Queue::get_vector(g_outputs[node]$stderr, stderr_lines); + + if ( |stdout_lines| > 0 || |stderr_lines| > 0 ) + { + local outputs = Management::NodeOutputs( + $stdout = join_string_vec(stdout_lines, "\n"), + $stderr = join_string_vec(stderr_lines, "\n")); + + Broker::publish(topic_prefix, Management::Supervisor::API::notify_node_exit, node, outputs); + } + + if ( g_outputs[node]?$stdout_file ) + close(g_outputs[node]$stdout_file); + if ( g_outputs[node]?$stderr_file ) + close(g_outputs[node]$stderr_file); + } + + g_outputs[node] = make_node_output_streams(node); + } diff --git a/scripts/policy/frameworks/management/types.zeek b/scripts/policy/frameworks/management/types.zeek index db3ac8da55..c4076cd8f7 100644 --- a/scripts/policy/frameworks/management/types.zeek +++ b/scripts/policy/frameworks/management/types.zeek @@ -104,6 +104,18 @@ export { type ResultVec: vector of Result; + ## In :zeek:see:`Management::Controller::API::set_configuration_response`, + ## events, each :zeek:see:`Management::Result` indicates the outcome of a + ## requested cluster node. If a node does not launch properly (meaning + ## it doesn't check in with the agent on thee machine it's running on), + ## the result will indicate failure, and its data field will be an + ## instance of this record, capturing the stdout and stderr output of + ## the failing node. + type NodeOutputs: record { + stdout: string; ##< The stdout stream of a Zeek process + stderr: string; ##< The stderr stream of a Zeek process + }; + ## Given a :zeek:see:`Management::Result` record, ## this function returns a string summarizing it. global result_to_string: function(res: Result): string; diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 2dc50cd8af..eb089a9aa2 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -29,6 +29,10 @@ @load frameworks/management/node/api.zeek @load frameworks/management/node/config.zeek # @load frameworks/management/node/main.zeek +@load frameworks/management/supervisor/__load__.zeek +@load frameworks/management/supervisor/api.zeek +@load frameworks/management/supervisor/config.zeek +@load frameworks/management/supervisor/main.zeek @load frameworks/management/request.zeek @load frameworks/management/types.zeek @load frameworks/management/util.zeek diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index 6e8c09e1bd..d179d2c6ff 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -2,8 +2,8 @@ ### NOTE: This file has been sorted with diff-sort. warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:15 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:66 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:66 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:70 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:70 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 461b2d5856..e96b72d873 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -01e1d1ad94cea81091c74e829d86815fdef0dd62 +de28fe5db9a733f8b4b53f84127bab888a184f6a