From 337c7267e003083e6677b60be603f7da340ecf90 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 23 Mar 2022 16:27:28 -0700 Subject: [PATCH 01/10] Management framework: allow agents to communicate with cluster nodes This provides Broker-level plumbing that allows agents to reach out to their managed Zeek nodes and collect responses. As a first event, it establishes Management::Node::API::notify_agent_hello, to notify the agent when the cluster node is ready to communicate. Also a bit of comment rewording to replace use of "data cluster" with simply "cluster", to avoid ambiguity with data nodes in SumStats, and expansion of test-all-policy.zeek and related/dependent tests, since we're introducing new scripts. --- .../frameworks/management/agent/main.zeek | 25 +++++++----- .../management/controller/main.zeek | 4 +- scripts/policy/frameworks/management/log.zeek | 1 + .../frameworks/management/node/__load__.zeek | 1 + .../frameworks/management/node/api.zeek | 21 ++++++++++ .../frameworks/management/node/config.zeek | 9 +++++ .../frameworks/management/node/main.zeek | 39 +++++++++++++++++++ .../policy/frameworks/management/types.zeek | 3 +- scripts/test-all-policy.zeek | 4 ++ scripts/zeekygen/__load__.zeek | 2 + .../Baseline/coverage.bare-mode-errors/errors | 6 +-- 11 files changed, 100 insertions(+), 15 deletions(-) create mode 100644 scripts/policy/frameworks/management/node/__load__.zeek create mode 100644 scripts/policy/frameworks/management/node/api.zeek create mode 100644 scripts/policy/frameworks/management/node/config.zeek create mode 100644 scripts/policy/frameworks/management/node/main.zeek diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 75d0247a36..f5e033db2e 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -5,6 +5,8 @@ @load base/frameworks/broker @load policy/frameworks/management +@load policy/frameworks/management/node/api +@load policy/frameworks/management/node/config @load ./api @load ./config @@ -120,7 +122,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M g_nodes = table(); - # Refresh the data cluster and nodes tables + # Refresh the cluster and nodes tables g_data_cluster = table(); for ( node in config$nodes ) @@ -166,6 +168,11 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M if ( node?$env ) nc$env = node$env; + # Always add the policy/management/node scripts to any cluster + # node, since we require it to be able to communicate with the + # node. + nc$scripts[|nc$scripts|] = "policy/frameworks/management/node"; + # XXX could use options to enable per-node overrides for # directory, stdout, stderr, others? @@ -209,7 +216,7 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat local cns = Management::NodeStatus( $node=node, $state=Management::PENDING); - # Identify the role of the node. For data cluster roles (worker, + # Identify the role of the node. For cluster roles (worker, # manager, etc) we derive this from the cluster node table. For # agent and controller, we identify via environment variables # that the controller framework establishes upon creation (see @@ -342,10 +349,11 @@ event zeek_init() Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry); - # Agents need receive communication targeted at it, and any responses - # from the supervisor. + # Agents need receive communication targeted at it, any responses + # from the supervisor, and any responses from cluster nodes. Broker::subscribe(agent_topic); Broker::subscribe(SupervisorControl::topic_prefix); + Broker::subscribe(Management::Node::node_topic); # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. @@ -373,11 +381,10 @@ event zeek_init() Management::Agent::controller$bound_port, Management::connect_retry); } - else - { - # Controller connects to us; listen for it. - Broker::listen(cat(epi$network$address), epi$network$bound_port); - } + + # The agent always listens, to allow cluster nodes to peer with it. + # If the controller connects to us, it also uses this port. + Broker::listen(cat(epi$network$address), epi$network$bound_port); Management::Log::info("agent is live"); } diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 5aa5292b04..cac27ac814 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -386,10 +386,10 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf g_config_reqid_pending = req$id; # Compare the instance configuration to our current one. If it matches, - # we can proceed to deploying the new data cluster topology. If it does + # we can proceed to deploying the new cluster topology. If it does # not, we need to establish connectivity with agents we connect to, or # wait until all instances that connect to us have done so. Either triggers - # a notify_agents_ready event, upon which we then deploy the data cluster. + # a notify_agents_ready event, upon which we then deploy the topology. # The current & new set of instance names. local insts_current: set[string]; diff --git a/scripts/policy/frameworks/management/log.zeek b/scripts/policy/frameworks/management/log.zeek index e69c55b122..7d3b565b32 100644 --- a/scripts/policy/frameworks/management/log.zeek +++ b/scripts/policy/frameworks/management/log.zeek @@ -82,6 +82,7 @@ global l2s: table[Level] of string = { global r2s: table[Management::Role] of string = { [Management::AGENT] = "AGENT", [Management::CONTROLLER] = "CONTROLLER", + [Management::NODE] = "NODE", }; function debug(message: string) diff --git a/scripts/policy/frameworks/management/node/__load__.zeek b/scripts/policy/frameworks/management/node/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/management/node/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/management/node/api.zeek b/scripts/policy/frameworks/management/node/api.zeek new file mode 100644 index 0000000000..4eac9d0771 --- /dev/null +++ b/scripts/policy/frameworks/management/node/api.zeek @@ -0,0 +1,21 @@ +##! The Management event API of cluster nodes. The API consists of request/ +##! response event pairs, like elsewhere in the Management, Supervisor, and +##! Control frameworks. + +@load policy/frameworks/management/types + +module Management::Node::API; + +export { + # Notification events, node -> agent + + ## The cluster nodes send this event upon peering as a "check-in" to + ## the agent, to indicate the node is now available to communicate + ## with. It is an agent-level equivalent of :zeek:see:`Broker::peer_added`, + ## and similar to :zeek:see:`Management::Agent::API::notify_agent_hello` + ## for agents. + ## + ## node: the name of the node, as given in :zeek:see:`Cluster::node`. + ## + global notify_node_hello: event(node: string); +} diff --git a/scripts/policy/frameworks/management/node/config.zeek b/scripts/policy/frameworks/management/node/config.zeek new file mode 100644 index 0000000000..d17fd663a1 --- /dev/null +++ b/scripts/policy/frameworks/management/node/config.zeek @@ -0,0 +1,9 @@ +##! Configuration settings for nodes controlled by the Management framework. + +module Management::Node; + +export { + ## The nodes' Broker topic. Cluster nodes automatically subscribe + ## to it, to receive request events from the Management framework. + const node_topic = "zeek/management/node" &redef; +} diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek new file mode 100644 index 0000000000..92d41acfbe --- /dev/null +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -0,0 +1,39 @@ +##! This module provides Management framework functionality that needs to be +##! present in every cluster node to allow Management agents to interact with +##! the cluster nodes they manage. + +@load policy/frameworks/management/agent/config +@load policy/frameworks/management/log + +@load ./config + +module Management::Node; + +# Tag our logs correctly +redef Management::Log::role = Management::NODE; + +event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) + { + local epi = Management::Agent::endpoint_info(); + + # If this is the agent peering, notify it that we're ready + if ( peer$network$address == epi$network$address && + peer$network$bound_port == epi$network$bound_port ) + event Management::Node::API::notify_node_hello(Cluster::node); + } + +event zeek_init() + { + local epi = Management::Agent::endpoint_info(); + + Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry); + Broker::subscribe(node_topic); + + # Events automatically sent to the Management agent. + local events: vector of any = [ + Management::Node::API::notify_node_hello + ]; + + for ( i in events ) + Broker::auto_publish(node_topic, events[i]); + } diff --git a/scripts/policy/frameworks/management/types.zeek b/scripts/policy/frameworks/management/types.zeek index 824ea7dfb4..6d89fbda1a 100644 --- a/scripts/policy/frameworks/management/types.zeek +++ b/scripts/policy/frameworks/management/types.zeek @@ -6,12 +6,13 @@ module Management; export { ## Management infrastructure node type. This intentionally does not - ## include the data cluster node types (worker, logger, etc) -- those + ## include the managed cluster node types (worker, logger, etc) -- those ## continue to be managed by the cluster framework. type Role: enum { NONE, ##< No active role in cluster management AGENT, ##< A cluster management agent. CONTROLLER, ##< The cluster's controller. + NODE, ##< A managed cluster node (worker, manager, etc). }; ## A Zeek-side option with value. diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 16bc4f3ede..0f2de90609 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -24,6 +24,10 @@ @load frameworks/management/__load__.zeek @load frameworks/management/config.zeek @load frameworks/management/log.zeek +# @load frameworks/management/node/__load__.zeek +@load frameworks/management/node/api.zeek +@load frameworks/management/node/config.zeek +# @load frameworks/management/node/main.zeek @load frameworks/management/request.zeek @load frameworks/management/types.zeek @load frameworks/management/util.zeek diff --git a/scripts/zeekygen/__load__.zeek b/scripts/zeekygen/__load__.zeek index ad28277176..39314a04ac 100644 --- a/scripts/zeekygen/__load__.zeek +++ b/scripts/zeekygen/__load__.zeek @@ -7,6 +7,8 @@ @load frameworks/control/controller.zeek @load frameworks/management/agent/main.zeek @load frameworks/management/controller/main.zeek +@load frameworks/management/node/__load__.zeek +@load frameworks/management/node/main.zeek @load frameworks/files/extract-all-files.zeek @load policy/misc/dump-events.zeek @load policy/protocols/conn/speculative-service.zeek diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index a72d64757e..bc9bd28f83 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,9 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. -warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:13 "Remove in v5.1. Use log-certs-base64.zeek instead." +warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:15 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") From 0020cc4af09d0f9eaf380a34e3c1b7dcd091cbfe Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 11 Apr 2022 21:06:29 -0700 Subject: [PATCH 02/10] Management framework: some renaming to avoid the term "data cluster" --- scripts/policy/frameworks/management/agent/main.zeek | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index f5e033db2e..d57fd7b166 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -42,7 +42,7 @@ global g_nodes: table[string] of Management::Node; # The node map employed by the supervisor to describe the cluster # topology to newly forked nodes. We refresh it when we receive # new configurations. -global g_data_cluster: table[string] of Supervisor::ClusterEndpoint; +global g_cluster: table[string] of Supervisor::ClusterEndpoint; event SupervisorControl::create_response(reqid: string, result: string) @@ -124,7 +124,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M # Refresh the cluster and nodes tables - g_data_cluster = table(); + g_cluster = table(); for ( node in config$nodes ) { if ( node$instance == Management::Agent::name ) @@ -146,7 +146,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M if ( node?$interface ) cep$interface = node$interface; - g_data_cluster[node$name] = cep; + g_cluster[node$name] = cep; } # Apply the new configuration via the supervisor @@ -176,7 +176,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M # XXX could use options to enable per-node overrides for # directory, stdout, stderr, others? - nc$cluster = g_data_cluster; + nc$cluster = g_cluster; supervisor_create(nc); } From 788348f9d68b43c8d115f129395953156e57b21e Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 12 Apr 2022 21:56:55 -0700 Subject: [PATCH 03/10] Management framework: allow dispatching "actions" on cluster nodes. This adds request/response event pairs to enable the controller to dispatch "actions" (pre-implemented Zeek script actions) on subsets of Zeek cluster nodes and collect the results. Using generic events to carry multiple such "run X on the nodes" scenarios simplifies adding these in the future. --- .../frameworks/management/agent/api.zeek | 28 +++- .../frameworks/management/agent/main.zeek | 123 ++++++++++++++++-- .../frameworks/management/controller/api.zeek | 2 +- .../management/controller/main.zeek | 105 +++++++++++++-- .../frameworks/management/node/api.zeek | 21 +++ .../frameworks/management/node/main.zeek | 50 ++++++- 6 files changed, 297 insertions(+), 32 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 876f121500..6607c27144 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -60,8 +60,32 @@ export { ## records, covering the nodes at this instance. The result may also ## indicate failure, with error messages indicating what went wrong. ## - global get_nodes_response: event(reqid: string, - result: Management::Result); + global get_nodes_response: event(reqid: string, result: Management::Result); + + + ## The controller sends this to every agent to request a dispatch (the + ## execution of a pre-implemented activity) to all cluster nodes. This + ## is the generic controller-agent "back-end" implementation of explicit + ## client-controller "front-end" interactions. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + global node_dispatch_request: event(reqid: string, action: vector of string); + + ## Response to a node_dispatch_request event. Each agent sends this back + ## to the controller to report the dispatch outcomes on all nodes managed + ## by that agent. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one Zeek cluster node managed by this + ## agent. Upon success, each :zeek:see:`Management::Result` record's + ## data member contains the dispatches' response in a data type + ## appropriate for the respective dispatch. + global node_dispatch_response: event(reqid: string, result: Management::ResultVec); + ## The controller sends this event to confirm to the agent that it is ## part of the current cluster topology. The agent acknowledges with the diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index d57fd7b166..2dafd0414d 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -21,10 +21,22 @@ export { type SupervisorState: record { node: string; ##< Name of the node the Supervisor is acting on. }; + + ## Request state for node dispatches, tracking the requested action + ## as well as received responses. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every node managed by this agent. + requests: set[string] &default=set(); + }; } redef record Management::Request::Request += { supervisor_state: SupervisorState &optional; + node_dispatch_state: NodeDispatchState &optional; }; # Tag our logs correctly @@ -285,6 +297,81 @@ event Management::Agent::API::get_nodes_request(reqid: string) Management::Log::info(fmt("issued supervisor status, %s", req$id)); } +event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) + { + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local nreq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(nreq) ) + return; + + # Find the original request from the controller + local req = Management::Request::lookup(nreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Mark the responding node as done. Nodes normally fill their own name + # into the result; we only double-check for resilience. Nodes failing to + # report themselves would eventually lead to request timeout. + if ( result?$node && result$node in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[result$node]; + + # The usual special treatment for Broker values that are of type "any": + # confirm their type here based on the requested dispatch command. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + # The result has the reporting node filled in but not the agent/instance + # (which the node doesn't know about), so add it now. + result$instance = Management::Agent::instance()$name; + + # Add this result to the overall response + req$results[|req$results|] = result; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Release the agent-nodes request state, since we now have all responses. + Management::Request::finish(nreq$id); + + # Send response event back to controller and clean up main request state. + Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s", + Management::Request::to_string(req))); + event Management::Agent::API::node_dispatch_response(req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s", reqid, action)); + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$node_dispatch_state = NodeDispatchState($action=action); + + for ( node in g_nodes ) + add req$node_dispatch_state$requests[node]; + + # We use a single request record to track all node responses. We know + # when all nodes have responded via the requests set we built up above. + local nreq = Management::Request::create(); + nreq$parent_id = reqid; + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action)); + Broker::publish(Management::Node::node_topic, + Management::Node::API::node_dispatch_request, nreq$id, action); + } + event Management::Agent::API::agent_welcome_request(reqid: string) { Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid)); @@ -357,21 +444,31 @@ event zeek_init() # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. - Broker::auto_publish(agent_topic, Management::Agent::API::get_nodes_response); - Broker::auto_publish(agent_topic, Management::Agent::API::set_configuration_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_welcome_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_standby_response); + local agent_to_controller_events: vector of any = [ + Management::Agent::API::get_nodes_response, + Management::Agent::API::set_configuration_response, + Management::Agent::API::agent_welcome_response, + Management::Agent::API::agent_standby_response, + Management::Agent::API::node_dispatch_response, + Management::Agent::API::notify_agent_hello, + Management::Agent::API::notify_change, + Management::Agent::API::notify_error, + Management::Agent::API::notify_log + ]; - Broker::auto_publish(agent_topic, Management::Agent::API::notify_agent_hello); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_change); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_error); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_log); + for ( i in agent_to_controller_events ) + Broker::auto_publish(agent_topic, agent_to_controller_events[i]); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + local agent_to_sup_events: vector of any = [ + SupervisorControl::create_request, + SupervisorControl::status_request, + SupervisorControl::destroy_request, + SupervisorControl::restart_request, + SupervisorControl::stop_request + ]; + + for ( i in agent_to_sup_events ) + Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]); # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 575f90009a..9b374d54f4 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -71,7 +71,7 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type`vector` of :zeek:see:`Management::Result` + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one cluster instance. Each record's data ## member is a vector of :zeek:see:`Management::NodeStatus` ## records, covering the nodes at that instance. Results may also indicate diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index cac27ac814..d7a292d7f4 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -36,6 +36,25 @@ export { requests: set[string] &default=set(); }; + ## Request state for node dispatch requests, to track the requested + ## action and received responses. Node dispatches are requests to + ## execute pre-implemented actions on every node in the cluster, + ## and report their outcomes. See + ## :zeek:see:`Management::Agent::API::node_dispatch_request` and + ## :zeek:see:`Management::Agent::API::node_dispatch_response` for the + ## agent/controller interaction. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every controller/agent transaction. + ## The set of strings tracks the node names from which + ## we still expect responses, before we can respond back + ## to the client. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -43,6 +62,7 @@ export { redef record Management::Request::Request += { set_configuration_state: SetConfigurationState &optional; get_nodes_state: GetNodesState &optional; + node_dispatch_state: NodeDispatchState &optional; test_state: TestState &optional; }; @@ -485,7 +505,7 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme if ( Management::Request::is_null(areq) ) return; - # Release the request, which is now done. + # Release the request, since this agent is now done. Management::Request::finish(areq$id); # Find the original request from the client @@ -554,22 +574,75 @@ event Management::Controller::API::get_nodes_request(reqid: string) } } +event Management::Agent::API::node_dispatch_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + { + # Same special treatment for Broker values that are of + # type "any": confirm their (known) type here. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + req$results[|req$results|] = results[i]; + } + + # Mark this request as done + if ( areq$id in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Send response event to the client based upon the dispatch type. + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + Management::Request::finish(req$id); + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members # to identify how to respond. No need to clean up the request itself, # since we're getting here via the request module's expiration # mechanism that handles the cleanup. - local res: Management::Result; + local res = Management::Result($reqid=req$id, + $success = F, + $error = "request timed out"); if ( req?$set_configuration_state ) { # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; - - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", @@ -579,9 +652,6 @@ event Management::Request::request_expired(req: Management::Request::Request) if ( req?$get_nodes_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", @@ -589,12 +659,21 @@ event Management::Request::request_expired(req: Management::Request::Request) event Management::Controller::API::get_nodes_response(req$id, req$results); } + if ( req?$node_dispatch_state ) + { + req$results += res; + + switch req$node_dispatch_state$action[0] + { + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + } + if ( req?$test_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; - Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); event Management::Controller::API::test_timeout_response(req$id, res); } diff --git a/scripts/policy/frameworks/management/node/api.zeek b/scripts/policy/frameworks/management/node/api.zeek index 4eac9d0771..85cb010463 100644 --- a/scripts/policy/frameworks/management/node/api.zeek +++ b/scripts/policy/frameworks/management/node/api.zeek @@ -7,6 +7,27 @@ module Management::Node::API; export { + ## Management agents send this event to every Zeek cluster node to run a + ## "dispatch" -- a particular, pre-implemented action. This is the agent-node + ## complement to :zeek:see:`Management::Agent::API::node_dispatch_request`. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + global node_dispatch_request: event(reqid: string, action: vector of string); + + ## Response to a node_dispatch_request event. The nodes send this back + ## to the agent. This is the agent-node equivalent of + ## :zeek:see:`Management::Agent::API::node_dispatch_response`. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:see:`Management::Result` record covering one Zeek + ## cluster node managed by the agent. Upon success, the data field + ## contains a value appropriate for the requested dispatch. + global node_dispatch_response: event(reqid: string, result: Management::Result); + + # Notification events, node -> agent ## The cluster nodes send this event upon peering as a "check-in" to diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek index 92d41acfbe..68749c11b5 100644 --- a/scripts/policy/frameworks/management/node/main.zeek +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -1,10 +1,12 @@ -##! This module provides Management framework functionality that needs to be -##! present in every cluster node to allow Management agents to interact with -##! the cluster nodes they manage. +##! This module provides Management framework functionality present in every +##! cluster node, to allowing Management agents to interact with the nodes. + +@load base/frameworks/cluster @load policy/frameworks/management/agent/config @load policy/frameworks/management/log +@load ./api @load ./config module Management::Node; @@ -12,6 +14,47 @@ module Management::Node; # Tag our logs correctly redef Management::Log::role = Management::NODE; +## The type of dispatch callbacks. These implement a particular dispatch action, +## using the provided string vector as arguments, filling results into the +## provided result record. +type DispatchCallback: function(args: vector of string, res: Management::Result); + +global g_dispatch_table: table[string] of DispatchCallback = { +}; + +event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string) + { + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s", reqid, action)); + + local res = Management::Result( + $reqid = reqid, $node = Cluster::node); + + if ( |action| == 0 ) + { + res$success = F; + res$error = "no dispatch arguments provided"; + } + else if ( action[0] !in g_dispatch_table ) + { + res$success = F; + res$error = fmt("dispatch %s unknown", action[0]); + } + + if ( ! res$success ) + { + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + return; + } + + g_dispatch_table[action[0]](action[1:], res); + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + } + event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { local epi = Management::Agent::endpoint_info(); @@ -31,6 +74,7 @@ event zeek_init() # Events automatically sent to the Management agent. local events: vector of any = [ + Management::Node::API::node_dispatch_response, Management::Node::API::notify_node_hello ]; From 497b2723d780bf89aa2340f9beed3880ad6589c8 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 12 Apr 2022 22:01:02 -0700 Subject: [PATCH 04/10] Management framework: add get_id_value dispatch This adds support for retrieving the value of a global identifier from any subset of cluster nodes. It relies on the lookup_ID() BiF to retrieve the val, and to_json() to render the value to an easily parsed string. Ideally we'd send the val directly, but this hits several roadblocks, including the fact that Broker won't serialize arbitrary values. --- .../frameworks/management/agent/api.zeek | 6 +- .../frameworks/management/agent/main.zeek | 4 ++ .../frameworks/management/controller/api.zeek | 25 ++++++++ .../management/controller/main.zeek | 59 ++++++++++++++++++- .../frameworks/management/node/main.zeek | 29 +++++++++ 5 files changed, 121 insertions(+), 2 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 6607c27144..f29b05e7cc 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -66,7 +66,11 @@ export { ## The controller sends this to every agent to request a dispatch (the ## execution of a pre-implemented activity) to all cluster nodes. This ## is the generic controller-agent "back-end" implementation of explicit - ## client-controller "front-end" interactions. + ## client-controller "front-end" interactions, including: + ## + ## - :zeek:see:`Management::Controller::API::get_id_value_request`: two + ## arguments, the first being "get_id_value" and the second the name + ## of the ID to look up. ## ## reqid: a request identifier string, echoed in the response event. ## diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 2dafd0414d..004b874888 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -321,6 +321,10 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # confirm their type here based on the requested dispatch command. switch req$node_dispatch_state$action[0] { + case "get_id_value": + if ( result?$data ) + result$data = result$data as string; + break; default: Management::Log::error(fmt("unexpected dispatch command %s", req$node_dispatch_state$action[0])); diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 9b374d54f4..e9bdff1893 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -80,6 +80,31 @@ export { result: Management::ResultVec); + ## zeek-client sends this event to retrieve the current value of a + ## variable in Zeek's global namespace, referenced by the given + ## identifier (i.e., variable name). The controller asks all agents + ## to retrieve this value from each cluster node, accumulates the + ## returned responses, and responds with a get_id_value_response + ## event back to the client. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## id: the name of the variable whose value to retrieve. + global get_id_value_request: event(reqid: string, id: string); + + ## Response to a get_id_value_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one Zeek cluster node. Each record's + ## data field contains a string with the JSON rendering (as produced + ## by :zeek:id:`to_json`, including the error strings it potentially + ## returns). + global get_id_value_response: event(reqid: string, result: Management::ResultVec); + + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index d7a292d7f4..2f2ad42ff4 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -42,7 +42,11 @@ export { ## and report their outcomes. See ## :zeek:see:`Management::Agent::API::node_dispatch_request` and ## :zeek:see:`Management::Agent::API::node_dispatch_response` for the - ## agent/controller interaction. + ## agent/controller interaction, and + ## :zeek:see:`Management::Controller::API::get_id_value_request` and + ## :zeek:see:`Management::Controller::API::get_id_value_response` + ## for an example of a specific API the controller generalizes into + ## a dispatch. type NodeDispatchState: record { ## The dispatched action. The first string is a command, ## any remaining strings its arguments. @@ -598,6 +602,10 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man # type "any": confirm their (known) type here. switch req$node_dispatch_state$action[0] { + case "get_id_value": + if ( results[i]?$data ) + results[i]$data = results[i]$data as string; + break; default: Management::Log::error(fmt("unexpected dispatch command %s", req$node_dispatch_state$action[0])); @@ -620,6 +628,12 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man # Send response event to the client based upon the dispatch type. switch req$node_dispatch_state$action[0] { + case "get_id_value": + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + break; default: Management::Log::error(fmt("unexpected dispatch command %s", req$node_dispatch_state$action[0])); @@ -629,6 +643,42 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man Management::Request::finish(req$id); } +event Management::Controller::API::get_id_value_request(reqid: string, id: string) + { + Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id)); + + # Special case: if we have no instances, respond right away. + if ( |g_instances| == 0 ) + { + Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); + event Management::Controller::API::get_id_value_response(reqid, vector( + Management::Result($reqid=reqid, $success=F, $error="no instances connected"))); + return; + } + + local action = vector("get_id_value", id); + local req = Management::Request::create(reqid); + req$node_dispatch_state = NodeDispatchState($action=action); + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = Management::Agent::topic_prefix + "/" + name; + local areq = Management::Request::create(); + + areq$parent_id = req$id; + add req$node_dispatch_state$requests[areq$id]; + + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_request %s %s to %s", + areq$id, action, name)); + + Broker::publish(agent_topic, Management::Agent::API::node_dispatch_request, areq$id, action); + } + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -665,6 +715,12 @@ event Management::Request::request_expired(req: Management::Request::Request) switch req$node_dispatch_state$action[0] { + case "get_id_value": + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + break; default: Management::Log::error(fmt("unexpected dispatch command %s", req$node_dispatch_state$action[0])); @@ -717,6 +773,7 @@ event zeek_init() Management::Controller::API::get_instances_response, Management::Controller::API::set_configuration_response, Management::Controller::API::get_nodes_response, + Management::Controller::API::get_id_value_response, Management::Controller::API::test_timeout_response ]; diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek index 68749c11b5..7bc321c869 100644 --- a/scripts/policy/frameworks/management/node/main.zeek +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -19,7 +19,36 @@ redef Management::Log::role = Management::NODE; ## provided result record. type DispatchCallback: function(args: vector of string, res: Management::Result); +## Implementation of the "get_id_value" dispatch. Its only argument is the name +## of the ID to look up. +function dispatch_get_id_value(args: vector of string, res: Management::Result) + { + if ( |args| == 0 ) + { + res$success = F; + res$error = "get_id_value expects name of global identifier"; + return; + } + + local val = lookup_ID(args[0]); + + # The following lookup_ID() result strings indicate errors: + if ( type_name(val) == "string" ) + { + local valstr: string = val; + if ( valstr == "" || valstr == "" ) + { + res$success = F; + res$error = valstr[1:-1]; + } + } + + if ( res$success ) + res$data = to_json(val); + } + global g_dispatch_table: table[string] of DispatchCallback = { + ["get_id_value"] = dispatch_get_id_value, }; event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string) From fcef7f4925c0c6eb893af3bba7b9e9ac11d7a9f3 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Fri, 15 Apr 2022 18:27:26 -0700 Subject: [PATCH 05/10] Management framework: improve handling of node run states When agents receive a configuration, we don't currently honor requested run states (there's no such thing as registering a node but not running it, for example). To reflect this, we now start off nodes in state PENDING as we launch them via the Supervisor, and move them to RUNNING when they check in with us via Management::Node::API::notify_node_hello. --- .../frameworks/management/agent/main.zeek | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 004b874888..7771e7bdf5 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -132,11 +132,10 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M for ( nodename in g_nodes ) supervisor_destroy(nodename); - g_nodes = table(); - # Refresh the cluster and nodes tables - + g_nodes = table(); g_cluster = table(); + for ( node in config$nodes ) { if ( node$instance == Management::Agent::name ) @@ -166,6 +165,8 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M for ( nodename in g_nodes ) { node = g_nodes[nodename]; + node$state = Management::PENDING; + nc = Supervisor::NodeConfig($name=nodename); if ( Management::Agent::cluster_directory != "" ) @@ -237,6 +238,11 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat { cns$cluster_role = sns$node$cluster[node]$role; + # For cluster nodes, copy run state from g_nodes, our + # live node status table. + if ( node in g_nodes ) + cns$state = g_nodes[node]$state; + # The supervisor's responses use 0/tcp (not 0/unknown) # when indicating an unused port because its internal # serialization always assumes TCP. @@ -251,12 +257,22 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat if ( role == "CONTROLLER" ) { cns$mgmt_role = Management::CONTROLLER; + + # Automatically declare the controller in running state + # here -- we'd not have received a request that brought + # us here otherwise. + cns$state = Management::RUNNING; + # The controller always listens, so the Zeek client can connect. cns$p = Management::Agent::endpoint_info()$network$bound_port; } else if ( role == "AGENT" ) { cns$mgmt_role = Management::AGENT; + + # Similarly to above, always declare agent running. We are. :) + cns$state = Management::RUNNING; + # If we have a controller address, the agent connects to it # and does not listen. See zeek_init() below for similar logic. if ( Management::Agent::controller$address == "0.0.0.0" ) @@ -268,13 +284,9 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat } } - # A PID is available if a supervised node has fully launched - # and is therefore running. + # A PID is available if a supervised node has fully launched. if ( sns?$pid ) - { cns$pid = sns$pid; - cns$state = Management::RUNNING; - } node_statuses += cns; } @@ -409,6 +421,14 @@ event Management::Agent::API::agent_standby_request(reqid: string) event Management::Agent::API::agent_standby_response(reqid, res); } +event Management::Node::API::notify_node_hello(node: string) + { + Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); + + if ( node in g_nodes ) + g_nodes[node]$state = Management::RUNNING; + } + event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { # This does not (cannot?) immediately verify that the new peer From 76ff976e83ba270146b9c806dccace72cc3bdad6 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 11 Apr 2022 22:09:04 -0700 Subject: [PATCH 06/10] Avoid whitespace around function type strings in JSON rendering Callable types were rendered with a trailing "\n" in to_json() output. Tweaking the Describe() calls to stop producing the newline is prone to test failures, so this focuses on the JSON string production to suppress it, which doesn't affect any tests. --- src/Val.cc | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Val.cc b/src/Val.cc index 3d3290a200..1d0055f71b 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -420,8 +420,9 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val* } rapidjson::Value j; + auto tag = val->GetType()->Tag(); - switch ( val->GetType()->Tag() ) + switch ( tag ) { case TYPE_BOOL: writer.Bool(val->AsBool()); @@ -475,8 +476,15 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val* ODesc d; d.SetStyle(RAW_STYLE); val->Describe(&d); - writer.String(util::json_escape_utf8( - std::string(reinterpret_cast(d.Bytes()), d.Len()))); + std::string desc(reinterpret_cast(d.Bytes()), d.Len()); + + // None of our function types should have surrounding + // whitespace, but ODesc might produce it due to its + // many output modes and flags. Strip it. + if ( tag == TYPE_FUNC ) + desc = util::strstrip(desc); + + writer.String(util::json_escape_utf8(desc)); break; } From 748db5cf735b84b2b79a27d31b28064cc32add5b Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 23 Mar 2022 16:55:21 -0700 Subject: [PATCH 07/10] Management framework: bump zeek-client to pull in get-id-value command --- auxil/zeek-client | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auxil/zeek-client b/auxil/zeek-client index c2af7381c5..cef49bc607 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit c2af7381c584b6545517843872747598bb0e25d5 +Subproject commit cef49bc607025306f4017816b350b8c833aaa753 From 438cd9b9f7bf8aedfa0d179c721ce9d5af63eefe Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 11 Apr 2022 22:18:56 -0700 Subject: [PATCH 08/10] Management framework: minor tweaks to logging component Use an enum with explicitly assigned values since we rely on enum_to_int() to reason about log levels, and bump the default level from DEBUG to INFO. --- scripts/policy/frameworks/management/log.zeek | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/scripts/policy/frameworks/management/log.zeek b/scripts/policy/frameworks/management/log.zeek index 7d3b565b32..e8732df1bc 100644 --- a/scripts/policy/frameworks/management/log.zeek +++ b/scripts/policy/frameworks/management/log.zeek @@ -16,10 +16,10 @@ export { ## The controller/agent log supports four different log levels. type Level: enum { - DEBUG, - INFO, - WARNING, - ERROR, + DEBUG = 10, + INFO = 20, + WARNING = 30, + ERROR = 40, }; ## The record type containing the column fields of the agent/controller log. @@ -36,8 +36,9 @@ export { message: string; } &log; - ## The log level in use for this node. - global log_level = DEBUG &redef; + ## The log level in use for this node. This is the minimum + ## log level required to produce output. + global log_level = INFO &redef; ## A debug-level log message writer. ## From 7edd1a2651c06d6742774132d8a2a5916e02b873 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 12 Apr 2022 19:41:03 -0700 Subject: [PATCH 09/10] Management framework: allow selecting cluster nodes in get_id_value This adds an optional set of cluster node names to narrow the querying to. It similarly expands the dispatch mechanism, since it likely most sense for any such request to apply only to a subset of nodes. Requests for invalid nodes trigger Response records in error state. --- .../frameworks/management/agent/api.zeek | 7 +- .../frameworks/management/agent/main.zeek | 107 +++++++++++++++--- .../frameworks/management/controller/api.zeek | 7 +- .../management/controller/main.zeek | 53 ++++++++- .../frameworks/management/node/api.zeek | 8 +- .../frameworks/management/node/main.zeek | 13 ++- 6 files changed, 175 insertions(+), 20 deletions(-) diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index f29b05e7cc..8ba47ee67d 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -75,7 +75,12 @@ export { ## reqid: a request identifier string, echoed in the response event. ## ## action: the requested dispatch command, with any arguments. - global node_dispatch_request: event(reqid: string, action: vector of string); + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to retrieve + ## the values from. An empty set, supplied by default, means + ## retrieval from all nodes managed by the agent. + global node_dispatch_request: event(reqid: string, action: vector of string, + nodes: set[string] &default=set()); ## Response to a node_dispatch_request event. Each agent sends this back ## to the controller to report the dispatch outcomes on all nodes managed diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 7771e7bdf5..f3a8bd2809 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -51,9 +51,9 @@ global g_instances: table[string] of Management::Instance; # A map for the nodes we run on this instance, via this agent. global g_nodes: table[string] of Management::Node; -# The node map employed by the supervisor to describe the cluster -# topology to newly forked nodes. We refresh it when we receive -# new configurations. +# The complete node map employed by the supervisor to describe the cluster +# topology to newly forked nodes. We refresh it when we receive new +# configurations. global g_cluster: table[string] of Supervisor::ClusterEndpoint; @@ -311,7 +311,11 @@ event Management::Agent::API::get_nodes_request(reqid: string) event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) { - Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s", reqid)); + local node = "unknown node"; + if ( result?$node ) + node = result$node; + + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s from %s", reqid, node)); # Retrieve state for the request we just got a response to local nreq = Management::Request::lookup(reqid); @@ -326,8 +330,17 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag # Mark the responding node as done. Nodes normally fill their own name # into the result; we only double-check for resilience. Nodes failing to # report themselves would eventually lead to request timeout. - if ( result?$node && result$node in req$node_dispatch_state$requests ) - delete req$node_dispatch_state$requests[result$node]; + if ( result?$node ) + { + if ( result$node in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[result$node]; + else + { + # An unknown or duplicate response -- do nothing. + Management::Log::debug(fmt("response %s not expected, ignoring", reqid)); + return; + } + } # The usual special treatment for Broker values that are of type "any": # confirm their type here based on the requested dispatch command. @@ -366,26 +379,94 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag Management::Request::finish(req$id); } -event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string) +event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string]) { - Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s", reqid, action)); + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s %s", reqid, action, nodes)); + + local node: string; + local cluster_nodes: set[string]; + local nodes_final: set[string]; + + for ( node in g_nodes ) + add cluster_nodes[node]; + + # If this request includes cluster nodes to query, check if this agent + # manages any of those nodes. If it doesn't, respond with an empty + # results vector immediately. Note that any globally unknown nodes + # that the client might have requested already got filtered by the + # controller, so we don't need to worry about them here. + + if ( |nodes| > 0 ) + { + nodes_final = nodes & cluster_nodes; + + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no node overlap", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, vector()); + return; + } + } + else if ( |g_nodes| == 0 ) + { + # Special case: the client did not request specific nodes. If + # we aren't running any nodes, respond right away, since there's + # nothing to dispatch to. + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no nodes registered", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, vector()); + return; + } + else + { + # We send to all known nodes. + nodes_final = cluster_nodes; + } local res: Management::Result; local req = Management::Request::create(reqid); req$node_dispatch_state = NodeDispatchState($action=action); - for ( node in g_nodes ) - add req$node_dispatch_state$requests[node]; + # Build up dispatch state for tracking responses. We only dispatch to + # nodes that are in state RUNNING, as those have confirmed they're ready + # to communicate. For others, establish error state in now. + for ( node in nodes_final ) + { + if ( g_nodes[node]$state == Management::RUNNING ) + add req$node_dispatch_state$requests[node]; + else + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = fmt("cluster node %s not in runnning state", node); + req$results += res; + } + } - # We use a single request record to track all node responses. We know - # when all nodes have responded via the requests set we built up above. + # Corner case: nothing is in state RUNNING. + if ( |req$node_dispatch_state$requests| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no nodes running", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, req$results); + Management::Request::finish(req$id); + return; + } + + # We use a single request record to track all node responses, and a + # single event that Broker publishes to all nodes. We know when all + # nodes have responded by checking the requests set we built up above. local nreq = Management::Request::create(); nreq$parent_id = reqid; Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action)); Broker::publish(Management::Node::node_topic, - Management::Node::API::node_dispatch_request, nreq$id, action); + Management::Node::API::node_dispatch_request, nreq$id, action, nodes); } event Management::Agent::API::agent_welcome_request(reqid: string) diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index e9bdff1893..0ce3259a4c 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -90,7 +90,12 @@ export { ## reqid: a request identifier string, echoed in the response event. ## ## id: the name of the variable whose value to retrieve. - global get_id_value_request: event(reqid: string, id: string); + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to retrieve + ## the values from. An empty set, supplied by default, means + ## retrieval from all current cluster nodes. + global get_id_value_request: event(reqid: string, id: string, + nodes: set[string] &default=set()); ## Response to a get_id_value_request event. The controller sends this ## back to the client. diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 2f2ad42ff4..441e03517c 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -230,6 +230,17 @@ function is_instance_connectivity_change(inst: Management::Instance): bool return F; } +function filter_config_nodes_by_name(nodes: set[string]): set[string] + { + local res: set[string]; + local cluster_nodes: set[string]; + + for ( node in g_config_current$nodes ) + add cluster_nodes[node$name]; + + return nodes & cluster_nodes; + } + event Management::Controller::API::notify_agents_ready(instances: set[string]) { local insts = Management::Util::set_to_vector(instances); @@ -643,7 +654,7 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man Management::Request::finish(req$id); } -event Management::Controller::API::get_id_value_request(reqid: string, id: string) +event Management::Controller::API::get_id_value_request(reqid: string, id: string, nodes: set[string]) { Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id)); @@ -660,6 +671,42 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin local req = Management::Request::create(reqid); req$node_dispatch_state = NodeDispatchState($action=action); + local nodes_final: set[string]; + local node: string; + local res: Management::Result; + + # Input sanitization: check for any requested nodes that aren't part of + # the current configuration. We send back error results for those and + # don't propagate them to the agents. + if ( |nodes| > 0 ) + { + # Requested nodes that are in the current configuration: + nodes_final = filter_config_nodes_by_name(nodes); + # Requested nodes that are not in current configuration: + local nodes_invalid = nodes - nodes_final; + + # Assemble error results for all invalid nodes + for ( node in nodes_invalid ) + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = "unknown cluster node"; + req$results += res; + } + + # If only invalid nodes got requested, we're now done. + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + Management::Request::finish(req$id); + return; + } + } + + # Send dispatch requests to all agents, with the final set of nodes for ( name in g_instances ) { if ( name !in g_instances_ready ) @@ -675,7 +722,9 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin "tx Management::Agent::API::node_dispatch_request %s %s to %s", areq$id, action, name)); - Broker::publish(agent_topic, Management::Agent::API::node_dispatch_request, areq$id, action); + Broker::publish(agent_topic, + Management::Agent::API::node_dispatch_request, + areq$id, action, nodes_final); } } diff --git a/scripts/policy/frameworks/management/node/api.zeek b/scripts/policy/frameworks/management/node/api.zeek index 85cb010463..f952f312ef 100644 --- a/scripts/policy/frameworks/management/node/api.zeek +++ b/scripts/policy/frameworks/management/node/api.zeek @@ -14,7 +14,13 @@ export { ## reqid: a request identifier string, echoed in the response event. ## ## action: the requested dispatch command, with any arguments. - global node_dispatch_request: event(reqid: string, action: vector of string); + ## + ## nodes: the cluster node names this dispatch targets. An empty set, + ## supplied by default, means it applies to all nodes. Since nodes + ## receive all dispatch requests, they can use any node names provided + ## here to filter themselves out of responding. + global node_dispatch_request: event(reqid: string, action: vector of string, + nodes: set[string] &default=set()); ## Response to a node_dispatch_request event. The nodes send this back ## to the agent. This is the agent-node equivalent of diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek index 7bc321c869..cd9819ff3d 100644 --- a/scripts/policy/frameworks/management/node/main.zeek +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -51,9 +51,18 @@ global g_dispatch_table: table[string] of DispatchCallback = { ["get_id_value"] = dispatch_get_id_value, }; -event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string) +event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, + nodes: set[string]) { - Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s", reqid, action)); + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes)); + + if ( |nodes| > 0 && Cluster::node !in nodes ) + { + Management::Log::debug(fmt( + "dispatch %s not targeting this node (%s !in %s), skipping", + reqid, Cluster::node, nodes)); + return; + } local res = Management::Result( $reqid = reqid, $node = Cluster::node); From e2d0db73a5dc9e3e0a1e5c800cc40146786711e7 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Fri, 15 Apr 2022 18:45:22 -0700 Subject: [PATCH 10/10] Management framework: bump external testsuite --- testing/external/commit-hash.zeek-testing-cluster | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 407428c53f..fd5057b5a2 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -3528e248d7d35e102c39c8e8050fc1a6dfa477bf +ed820f196ac5e44e1af4cb35a043337db9898458