From 7db8634c8ba8bc8b6ca77d67d1f70ac40192880a Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Mon, 31 Jan 2022 17:51:33 -0800 Subject: [PATCH] Add ClusterController::API::get_nodes_request/response event pair This allows querying the status of Zeek nodes currently running in a cluster. The controller relays the request to all instances and accumulates their responses. The response back to the client contains one Result record per instance response, each of which carrying a ClusterController::Types::NodeState vector in its $data member to convey the state of each node at that instance. The NodeState record tracks the name of the node, its role in the controller (if any), its role in the data cluster (if any), as well as PID and listening port, if any. --- .../policy/frameworks/cluster/agent/api.zeek | 23 +++- .../policy/frameworks/cluster/agent/main.zeek | 94 +++++++++++++++ .../frameworks/cluster/controller/api.zeek | 23 ++++ .../frameworks/cluster/controller/main.zeek | 110 +++++++++++++++++- .../frameworks/cluster/controller/types.zeek | 34 +++++- 5 files changed, 272 insertions(+), 12 deletions(-) diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek index 7957677457..02b3b28ea9 100644 --- a/scripts/policy/frameworks/cluster/agent/api.zeek +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -13,7 +13,6 @@ export { ## controller and agent. const version = 1; - # Agent API events ## The controller sends this event to convey a new cluster configuration @@ -42,6 +41,28 @@ export { result: ClusterController::Types::Result); + ## The controller sends this event to request a list of + ## :zeek:see:`ClusterController::Types::NodeStatus` records that capture + ## the status of Supervisor-managed nodes running on this instance. + ## instances. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global get_nodes_request: event(reqid: string); + + ## Response to a get_nodes_request event. The agent sends this back to the + ## controller. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:see:`ClusterController::Types::Result` record. Its data + ## member is a vector of :zeek:see:`ClusterController::Types::NodeState` + ## records, covering the nodes at this instance. The result may also + ## indicate failure, with error messages indicating what went wrong. + ## + global get_nodes_response: event(reqid: string, + result: ClusterController::Types::Result); + ## The controller sends this event to confirm to the agent that it is ## part of the current cluster topology. The agent acknowledges with the ## corresponding response event. diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek index e28d516520..9376ae967c 100644 --- a/scripts/policy/frameworks/cluster/agent/main.zeek +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -186,6 +186,94 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste } } +event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + ClusterController::Request::finish(reqid); + + local res = ClusterController::Types::Result( + $reqid = req$parent_id, $instance = ClusterAgent::name); + + local node_statuses: ClusterController::Types::NodeStatusVec; + + for ( node in result$nodes ) + { + local sns = result$nodes[node]; # Supervisor node status + local cns = ClusterController::Types::NodeStatus( + $node=node, $state=ClusterController::Types::PENDING); + + # Identify the role of the node. For data cluster roles (worker, + # manager, etc) we derive this from the cluster node table. For + # agent and controller, we identify via environment variables + # that the controller framework establishes upon creation (see + # the respective boot.zeek scripts). + if ( node in sns$node$cluster ) + { + cns$cluster_role = sns$node$cluster[node]$role; + + # The supervisor's responses use 0/tcp (not 0/unknown) + # when indicating an unused port because its internal + # serialization always assumes TCP. + if ( sns$node$cluster[node]$p != 0/tcp ) + cns$p = sns$node$cluster[node]$p; + } + else + { + if ( "ZEEK_CLUSTER_MGMT_NODE" in sns$node$env ) + { + local role = sns$node$env["ZEEK_CLUSTER_MGMT_NODE"]; + if ( role == "CONTROLLER" ) + { + cns$mgmt_role = ClusterController::Types::CONTROLLER; + # The controller always listens, so the Zeek client can connect. + cns$p = ClusterController::endpoint_info()$network$bound_port; + } + else if ( role == "AGENT" ) + { + cns$mgmt_role = ClusterController::Types::AGENT; + # If we have a controller address, the agent connects to it + # and does not listen. See zeek_init() below for similar logic. + if ( ClusterAgent::controller$address == "0.0.0.0" ) + cns$p = ClusterAgent::endpoint_info()$network$bound_port; + } + else + ClusterController::Log::warning(fmt( + "unexpected cluster management node type '%'", role)); + } + } + + # A PID is available if a supervised node has fully launched + # and is therefore running. + if ( sns?$pid ) + { + cns$pid = sns$pid; + cns$state = ClusterController::Types::RUNNING; + } + + node_statuses += cns; + } + + res$data = node_statuses; + + ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_response %s", + ClusterController::Types::result_to_string(res))); + event ClusterAgent::API::get_nodes_response(req$parent_id, res); + } + +event ClusterAgent::API::get_nodes_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_request %s", reqid)); + + local req = ClusterController::Request::create(); + req$parent_id = reqid; + + event SupervisorControl::status_request(req$id, ""); + ClusterController::Log::info(fmt("issued supervisor status, %s", req$id)); + } + event ClusterAgent::API::agent_welcome_request(reqid: string) { ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid)); @@ -231,6 +319,10 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) to_addr(epi$network$address), ClusterAgent::API::version); } +# XXX We may want a request timeout event handler here. It's arguably cleaner to +# send supervisor failure events back to the controller than to rely on its +# controller-agent request timeout to kick in. + event zeek_init() { local epi = ClusterAgent::endpoint_info(); @@ -253,6 +345,7 @@ event zeek_init() # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. + Broker::auto_publish(agent_topic, ClusterAgent::API::get_nodes_response); Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response); Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response); @@ -263,6 +356,7 @@ event zeek_init() Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); diff --git a/scripts/policy/frameworks/cluster/controller/api.zeek b/scripts/policy/frameworks/cluster/controller/api.zeek index 27c41d33ff..4ed77ffdb2 100644 --- a/scripts/policy/frameworks/cluster/controller/api.zeek +++ b/scripts/policy/frameworks/cluster/controller/api.zeek @@ -57,6 +57,29 @@ export { result: ClusterController::Types::ResultVec); + ## zeek-client sends this event to request a list of + ## :zeek:see:`ClusterController::Types::NodeStatus` records that capture + ## the status of Supervisor-managed nodes running on the cluster's + ## instances. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global get_nodes_request: event(reqid: string); + + ## Response to a get_nodes_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type`vector` of :zeek:see:`ClusterController::Types::Result` + ## records. Each record covers one cluster instance. Each record's data + ## member is a vector of :zeek:see:`ClusterController::Types::NodeState` + ## records, covering the nodes at that instance. Results may also indicate + ## failure, with error messages indicating what went wrong. + global get_nodes_response: event(reqid: string, + result: ClusterController::Types::ResultVec); + + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek index b0fe565553..3e93d715fd 100644 --- a/scripts/policy/frameworks/cluster/controller/main.zeek +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -21,11 +21,17 @@ type SetConfigurationState: record { requests: set[string] &default=set(); }; +# Request state specific to the get_nodes request/response events +type GetNodesState: record { + requests: set[string] &default=set(); +}; + # Dummy state for testing events. type TestState: record { }; redef record ClusterController::Request::Request += { set_configuration_state: SetConfigurationState &optional; + get_nodes_state: GetNodesState &optional; test_state: TestState &optional; }; @@ -457,6 +463,84 @@ event ClusterController::API::get_instances_request(reqid: string) event ClusterController::API::get_instances_response(reqid, res); } +event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterController::Types::Result) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(areq) ) + return; + + # Release the request, which is now done. + ClusterController::Request::finish(areq$id); + + # Find the original request from the client + local req = ClusterController::Request::lookup(areq$parent_id); + if ( ClusterController::Request::is_null(req) ) + return; + + # Zeek's ingestion of an any-typed val via Broker yields an opaque + # Broker DataVal. When Zeek forwards this val via another event it stays + # in this opaque form. To avoid forcing recipients to distinguish + # whether the val is of the actual, intended (any-)type or a Broker + # DataVal wrapper, we explicitly cast it back to our intended Zeek + # type. This test case demonstrates: broker.remote_event_vector_any + result$data = result$data as ClusterController::Types::NodeStatusVec; + + # Add this result to the overall response + req$results[|req$results|] = result; + + # Mark this request as done by removing it from the table of pending + # ones. The following if-check should always be true. + if ( areq$id in req$get_nodes_state$requests ) + delete req$get_nodes_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$get_nodes_state$requests| > 0 ) + return; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", + ClusterController::Request::to_string(req))); + event ClusterController::API::get_nodes_response(req$id, req$results); + ClusterController::Request::finish(req$id); + } + +event ClusterController::API::get_nodes_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterController::API::get_nodes_request %s", reqid)); + + # Special case: if we have no instances, respond right away. + if ( |g_instances| == 0 ) + { + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", reqid)); + event ClusterController::API::get_nodes_response(reqid, vector( + ClusterController::Types::Result($reqid=reqid, $success=F, + $error="no instances connected"))); + return; + } + + local req = ClusterController::Request::create(reqid); + req$get_nodes_state = GetNodesState(); + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = ClusterAgent::topic_prefix + "/" + name; + local areq = ClusterController::Request::create(); + + areq$parent_id = req$id; + add req$get_nodes_state$requests[areq$id]; + + ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_request %s to %s", areq$id, name)); + Broker::publish(agent_topic, ClusterAgent::API::get_nodes_request, areq$id); + } + } + event ClusterController::Request::request_expired(req: ClusterController::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -480,6 +564,18 @@ event ClusterController::Request::request_expired(req: ClusterController::Reques event ClusterController::API::set_configuration_response(req$id, req$results); } + if ( req?$get_nodes_state ) + { + res = ClusterController::Types::Result($reqid=req$id); + res$success = F; + res$error = "request timed out"; + req$results += res; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", + ClusterController::Request::to_string(req))); + event ClusterController::API::get_nodes_response(req$id, req$results); + } + if ( req?$test_state ) { res = ClusterController::Types::Result($reqid=req$id); @@ -525,13 +621,15 @@ event zeek_init() Broker::subscribe(ClusterController::topic); # Events sent to the client: + local events: vector of any = [ + ClusterController::API::get_instances_response, + ClusterController::API::set_configuration_response, + ClusterController::API::get_nodes_response, + ClusterController::API::test_timeout_response + ]; - Broker::auto_publish(ClusterController::topic, - ClusterController::API::get_instances_response); - Broker::auto_publish(ClusterController::topic, - ClusterController::API::set_configuration_response); - Broker::auto_publish(ClusterController::topic, - ClusterController::API::test_timeout_response); + for ( i in events ) + Broker::auto_publish(ClusterController::topic, events[i]); ClusterController::Log::info("controller is live"); } diff --git a/scripts/policy/frameworks/cluster/controller/types.zeek b/scripts/policy/frameworks/cluster/controller/types.zeek index e1021aa951..ad7fce9c1e 100644 --- a/scripts/policy/frameworks/cluster/controller/types.zeek +++ b/scripts/policy/frameworks/cluster/controller/types.zeek @@ -9,9 +9,9 @@ export { ## include the data cluster node types (worker, logger, etc) -- those ## continue to be managed by the cluster framework. type Role: enum { - NONE, - AGENT, - CONTROLLER, + NONE, ##< No active role in cluster management + AGENT, ##< A cluster management agent. + CONTROLLER, ##< The cluster's controller. }; ## A Zeek-side option with value. @@ -35,8 +35,11 @@ export { type InstanceVec: vector of Instance; ## State that a Cluster Node can be in. State changes trigger an - ## API notification (see notify_change()). + ## API notification (see notify_change()). The Pending state corresponds + ## to the Supervisor not yet reporting a PID for a node when it has not + ## yet fully launched. type State: enum { + PENDING, ##< Not yet running RUNNING, ##< Running and operating normally STOPPED, ##< Explicitly stopped FAILED, ##< Failed to start; and permanently halted @@ -61,7 +64,6 @@ export { ## Data structure capturing a cluster's complete configuration. type Configuration: record { id: string &default=unique_id(""); ##< Unique identifier for a particular configuration - ## The instances in the cluster. instances: set[Instance] &default=set(); @@ -69,6 +71,26 @@ export { nodes: set[Node] &default=set(); }; + ## The status of a Supervisor-managed node, as reported to the client in + ## a get_nodes_request/get_nodes_response transaction. + type NodeStatus: record { + ## Cluster-unique, human-readable node name + node: string; + ## Current run state of the node. + state: State; + ## Role the node plays in cluster management. + mgmt_role: Role &default=NONE; + ## Role the node plays in the data cluster. + cluster_role: Supervisor::ClusterRole &default=Supervisor::NONE; + ## Process ID of the node. This is optional because the Supervisor may not have + ## a PID when a node is still bootstrapping. + pid: int &optional; + ## The node's Broker peering listening port, if any. + p: port &optional; + }; + + type NodeStatusVec: vector of NodeStatus; + ## Return value for request-response API event pairs type Result: record { reqid: string; ##< Request ID of operation this result refers to @@ -81,6 +103,8 @@ export { type ResultVec: vector of Result; + ## Given a :zeek:see:`ClusterController::Types::Result` record, + ## this function returns a string summarizing it. global result_to_string: function(res: Result): string; }