diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek index 7957677457..02b3b28ea9 100644 --- a/scripts/policy/frameworks/cluster/agent/api.zeek +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -13,7 +13,6 @@ export { ## controller and agent. const version = 1; - # Agent API events ## The controller sends this event to convey a new cluster configuration @@ -42,6 +41,28 @@ export { result: ClusterController::Types::Result); + ## The controller sends this event to request a list of + ## :zeek:see:`ClusterController::Types::NodeStatus` records that capture + ## the status of Supervisor-managed nodes running on this instance. + ## instances. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global get_nodes_request: event(reqid: string); + + ## Response to a get_nodes_request event. The agent sends this back to the + ## controller. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:see:`ClusterController::Types::Result` record. Its data + ## member is a vector of :zeek:see:`ClusterController::Types::NodeState` + ## records, covering the nodes at this instance. The result may also + ## indicate failure, with error messages indicating what went wrong. + ## + global get_nodes_response: event(reqid: string, + result: ClusterController::Types::Result); + ## The controller sends this event to confirm to the agent that it is ## part of the current cluster topology. The agent acknowledges with the ## corresponding response event. diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek index e28d516520..9376ae967c 100644 --- a/scripts/policy/frameworks/cluster/agent/main.zeek +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -186,6 +186,94 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste } } +event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + ClusterController::Request::finish(reqid); + + local res = ClusterController::Types::Result( + $reqid = req$parent_id, $instance = ClusterAgent::name); + + local node_statuses: ClusterController::Types::NodeStatusVec; + + for ( node in result$nodes ) + { + local sns = result$nodes[node]; # Supervisor node status + local cns = ClusterController::Types::NodeStatus( + $node=node, $state=ClusterController::Types::PENDING); + + # Identify the role of the node. For data cluster roles (worker, + # manager, etc) we derive this from the cluster node table. For + # agent and controller, we identify via environment variables + # that the controller framework establishes upon creation (see + # the respective boot.zeek scripts). + if ( node in sns$node$cluster ) + { + cns$cluster_role = sns$node$cluster[node]$role; + + # The supervisor's responses use 0/tcp (not 0/unknown) + # when indicating an unused port because its internal + # serialization always assumes TCP. + if ( sns$node$cluster[node]$p != 0/tcp ) + cns$p = sns$node$cluster[node]$p; + } + else + { + if ( "ZEEK_CLUSTER_MGMT_NODE" in sns$node$env ) + { + local role = sns$node$env["ZEEK_CLUSTER_MGMT_NODE"]; + if ( role == "CONTROLLER" ) + { + cns$mgmt_role = ClusterController::Types::CONTROLLER; + # The controller always listens, so the Zeek client can connect. + cns$p = ClusterController::endpoint_info()$network$bound_port; + } + else if ( role == "AGENT" ) + { + cns$mgmt_role = ClusterController::Types::AGENT; + # If we have a controller address, the agent connects to it + # and does not listen. See zeek_init() below for similar logic. + if ( ClusterAgent::controller$address == "0.0.0.0" ) + cns$p = ClusterAgent::endpoint_info()$network$bound_port; + } + else + ClusterController::Log::warning(fmt( + "unexpected cluster management node type '%'", role)); + } + } + + # A PID is available if a supervised node has fully launched + # and is therefore running. + if ( sns?$pid ) + { + cns$pid = sns$pid; + cns$state = ClusterController::Types::RUNNING; + } + + node_statuses += cns; + } + + res$data = node_statuses; + + ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_response %s", + ClusterController::Types::result_to_string(res))); + event ClusterAgent::API::get_nodes_response(req$parent_id, res); + } + +event ClusterAgent::API::get_nodes_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_request %s", reqid)); + + local req = ClusterController::Request::create(); + req$parent_id = reqid; + + event SupervisorControl::status_request(req$id, ""); + ClusterController::Log::info(fmt("issued supervisor status, %s", req$id)); + } + event ClusterAgent::API::agent_welcome_request(reqid: string) { ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid)); @@ -231,6 +319,10 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) to_addr(epi$network$address), ClusterAgent::API::version); } +# XXX We may want a request timeout event handler here. It's arguably cleaner to +# send supervisor failure events back to the controller than to rely on its +# controller-agent request timeout to kick in. + event zeek_init() { local epi = ClusterAgent::endpoint_info(); @@ -253,6 +345,7 @@ event zeek_init() # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. + Broker::auto_publish(agent_topic, ClusterAgent::API::get_nodes_response); Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response); Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response); @@ -263,6 +356,7 @@ event zeek_init() Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); diff --git a/scripts/policy/frameworks/cluster/controller/api.zeek b/scripts/policy/frameworks/cluster/controller/api.zeek index 27c41d33ff..4ed77ffdb2 100644 --- a/scripts/policy/frameworks/cluster/controller/api.zeek +++ b/scripts/policy/frameworks/cluster/controller/api.zeek @@ -57,6 +57,29 @@ export { result: ClusterController::Types::ResultVec); + ## zeek-client sends this event to request a list of + ## :zeek:see:`ClusterController::Types::NodeStatus` records that capture + ## the status of Supervisor-managed nodes running on the cluster's + ## instances. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + global get_nodes_request: event(reqid: string); + + ## Response to a get_nodes_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type`vector` of :zeek:see:`ClusterController::Types::Result` + ## records. Each record covers one cluster instance. Each record's data + ## member is a vector of :zeek:see:`ClusterController::Types::NodeState` + ## records, covering the nodes at that instance. Results may also indicate + ## failure, with error messages indicating what went wrong. + global get_nodes_response: event(reqid: string, + result: ClusterController::Types::ResultVec); + + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek index b0fe565553..3e93d715fd 100644 --- a/scripts/policy/frameworks/cluster/controller/main.zeek +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -21,11 +21,17 @@ type SetConfigurationState: record { requests: set[string] &default=set(); }; +# Request state specific to the get_nodes request/response events +type GetNodesState: record { + requests: set[string] &default=set(); +}; + # Dummy state for testing events. type TestState: record { }; redef record ClusterController::Request::Request += { set_configuration_state: SetConfigurationState &optional; + get_nodes_state: GetNodesState &optional; test_state: TestState &optional; }; @@ -457,6 +463,84 @@ event ClusterController::API::get_instances_request(reqid: string) event ClusterController::API::get_instances_response(reqid, res); } +event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterController::Types::Result) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(areq) ) + return; + + # Release the request, which is now done. + ClusterController::Request::finish(areq$id); + + # Find the original request from the client + local req = ClusterController::Request::lookup(areq$parent_id); + if ( ClusterController::Request::is_null(req) ) + return; + + # Zeek's ingestion of an any-typed val via Broker yields an opaque + # Broker DataVal. When Zeek forwards this val via another event it stays + # in this opaque form. To avoid forcing recipients to distinguish + # whether the val is of the actual, intended (any-)type or a Broker + # DataVal wrapper, we explicitly cast it back to our intended Zeek + # type. This test case demonstrates: broker.remote_event_vector_any + result$data = result$data as ClusterController::Types::NodeStatusVec; + + # Add this result to the overall response + req$results[|req$results|] = result; + + # Mark this request as done by removing it from the table of pending + # ones. The following if-check should always be true. + if ( areq$id in req$get_nodes_state$requests ) + delete req$get_nodes_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$get_nodes_state$requests| > 0 ) + return; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", + ClusterController::Request::to_string(req))); + event ClusterController::API::get_nodes_response(req$id, req$results); + ClusterController::Request::finish(req$id); + } + +event ClusterController::API::get_nodes_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterController::API::get_nodes_request %s", reqid)); + + # Special case: if we have no instances, respond right away. + if ( |g_instances| == 0 ) + { + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", reqid)); + event ClusterController::API::get_nodes_response(reqid, vector( + ClusterController::Types::Result($reqid=reqid, $success=F, + $error="no instances connected"))); + return; + } + + local req = ClusterController::Request::create(reqid); + req$get_nodes_state = GetNodesState(); + + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = ClusterAgent::topic_prefix + "/" + name; + local areq = ClusterController::Request::create(); + + areq$parent_id = req$id; + add req$get_nodes_state$requests[areq$id]; + + ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_request %s to %s", areq$id, name)); + Broker::publish(agent_topic, ClusterAgent::API::get_nodes_request, areq$id); + } + } + event ClusterController::Request::request_expired(req: ClusterController::Request::Request) { # Various handlers for timed-out request state. We use the state members @@ -480,6 +564,18 @@ event ClusterController::Request::request_expired(req: ClusterController::Reques event ClusterController::API::set_configuration_response(req$id, req$results); } + if ( req?$get_nodes_state ) + { + res = ClusterController::Types::Result($reqid=req$id); + res$success = F; + res$error = "request timed out"; + req$results += res; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", + ClusterController::Request::to_string(req))); + event ClusterController::API::get_nodes_response(req$id, req$results); + } + if ( req?$test_state ) { res = ClusterController::Types::Result($reqid=req$id); @@ -525,13 +621,15 @@ event zeek_init() Broker::subscribe(ClusterController::topic); # Events sent to the client: + local events: vector of any = [ + ClusterController::API::get_instances_response, + ClusterController::API::set_configuration_response, + ClusterController::API::get_nodes_response, + ClusterController::API::test_timeout_response + ]; - Broker::auto_publish(ClusterController::topic, - ClusterController::API::get_instances_response); - Broker::auto_publish(ClusterController::topic, - ClusterController::API::set_configuration_response); - Broker::auto_publish(ClusterController::topic, - ClusterController::API::test_timeout_response); + for ( i in events ) + Broker::auto_publish(ClusterController::topic, events[i]); ClusterController::Log::info("controller is live"); } diff --git a/scripts/policy/frameworks/cluster/controller/types.zeek b/scripts/policy/frameworks/cluster/controller/types.zeek index e1021aa951..ad7fce9c1e 100644 --- a/scripts/policy/frameworks/cluster/controller/types.zeek +++ b/scripts/policy/frameworks/cluster/controller/types.zeek @@ -9,9 +9,9 @@ export { ## include the data cluster node types (worker, logger, etc) -- those ## continue to be managed by the cluster framework. type Role: enum { - NONE, - AGENT, - CONTROLLER, + NONE, ##< No active role in cluster management + AGENT, ##< A cluster management agent. + CONTROLLER, ##< The cluster's controller. }; ## A Zeek-side option with value. @@ -35,8 +35,11 @@ export { type InstanceVec: vector of Instance; ## State that a Cluster Node can be in. State changes trigger an - ## API notification (see notify_change()). + ## API notification (see notify_change()). The Pending state corresponds + ## to the Supervisor not yet reporting a PID for a node when it has not + ## yet fully launched. type State: enum { + PENDING, ##< Not yet running RUNNING, ##< Running and operating normally STOPPED, ##< Explicitly stopped FAILED, ##< Failed to start; and permanently halted @@ -61,7 +64,6 @@ export { ## Data structure capturing a cluster's complete configuration. type Configuration: record { id: string &default=unique_id(""); ##< Unique identifier for a particular configuration - ## The instances in the cluster. instances: set[Instance] &default=set(); @@ -69,6 +71,26 @@ export { nodes: set[Node] &default=set(); }; + ## The status of a Supervisor-managed node, as reported to the client in + ## a get_nodes_request/get_nodes_response transaction. + type NodeStatus: record { + ## Cluster-unique, human-readable node name + node: string; + ## Current run state of the node. + state: State; + ## Role the node plays in cluster management. + mgmt_role: Role &default=NONE; + ## Role the node plays in the data cluster. + cluster_role: Supervisor::ClusterRole &default=Supervisor::NONE; + ## Process ID of the node. This is optional because the Supervisor may not have + ## a PID when a node is still bootstrapping. + pid: int &optional; + ## The node's Broker peering listening port, if any. + p: port &optional; + }; + + type NodeStatusVec: vector of NodeStatus; + ## Return value for request-response API event pairs type Result: record { reqid: string; ##< Request ID of operation this result refers to @@ -81,6 +103,8 @@ export { type ResultVec: vector of Result; + ## Given a :zeek:see:`ClusterController::Types::Result` record, + ## this function returns a string summarizing it. global result_to_string: function(res: Result): string; }