diff --git a/CHANGES b/CHANGES index 44fcd72914..708368571a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,21 @@ +5.0.0-dev.277 | 2022-04-18 16:38:27 -0700 + + * Management framework updates (Christian Kreibich, Corelight) + + - bump external testsuite + - allow selecting cluster nodes in get_id_value + - minor tweaks to logging component + - bump zeek-client to pull in get-id-value command + - improve handling of node run states + - add get_id_value dispatch + - allow dispatching "actions" on cluster nodes. + - some renaming to avoid the term "data cluster" + - allow agents to communicate with cluster nodes + + * Avoid whitespace around function type strings in JSON rendering (Christian Kreibich, Corelight) + + * Disable TSan CI task temporarily while we sort out some intermittent test failures (Tim Wojtulewicz, Corelight) + 5.0.0-dev.265 | 2022-04-18 12:45:08 -0700 * state-holding fix: track unique identifiers for Func's in CompHash's, not Func's themselves (Vern Paxson, Corelight) diff --git a/VERSION b/VERSION index b1682deded..f22dc5df41 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.0.0-dev.265 +5.0.0-dev.277 diff --git a/auxil/zeek-client b/auxil/zeek-client index c2af7381c5..a08d9978ac 160000 --- a/auxil/zeek-client +++ b/auxil/zeek-client @@ -1 +1 @@ -Subproject commit c2af7381c584b6545517843872747598bb0e25d5 +Subproject commit a08d9978ac6ff6481ad1e6b18f0376568c08f8c1 diff --git a/scripts/policy/frameworks/management/agent/api.zeek b/scripts/policy/frameworks/management/agent/api.zeek index 876f121500..8ba47ee67d 100644 --- a/scripts/policy/frameworks/management/agent/api.zeek +++ b/scripts/policy/frameworks/management/agent/api.zeek @@ -60,8 +60,41 @@ export { ## records, covering the nodes at this instance. The result may also ## indicate failure, with error messages indicating what went wrong. ## - global get_nodes_response: event(reqid: string, - result: Management::Result); + global get_nodes_response: event(reqid: string, result: Management::Result); + + + ## The controller sends this to every agent to request a dispatch (the + ## execution of a pre-implemented activity) to all cluster nodes. This + ## is the generic controller-agent "back-end" implementation of explicit + ## client-controller "front-end" interactions, including: + ## + ## - :zeek:see:`Management::Controller::API::get_id_value_request`: two + ## arguments, the first being "get_id_value" and the second the name + ## of the ID to look up. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to retrieve + ## the values from. An empty set, supplied by default, means + ## retrieval from all nodes managed by the agent. + global node_dispatch_request: event(reqid: string, action: vector of string, + nodes: set[string] &default=set()); + + ## Response to a node_dispatch_request event. Each agent sends this back + ## to the controller to report the dispatch outcomes on all nodes managed + ## by that agent. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one Zeek cluster node managed by this + ## agent. Upon success, each :zeek:see:`Management::Result` record's + ## data member contains the dispatches' response in a data type + ## appropriate for the respective dispatch. + global node_dispatch_response: event(reqid: string, result: Management::ResultVec); + ## The controller sends this event to confirm to the agent that it is ## part of the current cluster topology. The agent acknowledges with the diff --git a/scripts/policy/frameworks/management/agent/main.zeek b/scripts/policy/frameworks/management/agent/main.zeek index 75d0247a36..f3a8bd2809 100644 --- a/scripts/policy/frameworks/management/agent/main.zeek +++ b/scripts/policy/frameworks/management/agent/main.zeek @@ -5,6 +5,8 @@ @load base/frameworks/broker @load policy/frameworks/management +@load policy/frameworks/management/node/api +@load policy/frameworks/management/node/config @load ./api @load ./config @@ -19,10 +21,22 @@ export { type SupervisorState: record { node: string; ##< Name of the node the Supervisor is acting on. }; + + ## Request state for node dispatches, tracking the requested action + ## as well as received responses. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every node managed by this agent. + requests: set[string] &default=set(); + }; } redef record Management::Request::Request += { supervisor_state: SupervisorState &optional; + node_dispatch_state: NodeDispatchState &optional; }; # Tag our logs correctly @@ -37,10 +51,10 @@ global g_instances: table[string] of Management::Instance; # A map for the nodes we run on this instance, via this agent. global g_nodes: table[string] of Management::Node; -# The node map employed by the supervisor to describe the cluster -# topology to newly forked nodes. We refresh it when we receive -# new configurations. -global g_data_cluster: table[string] of Supervisor::ClusterEndpoint; +# The complete node map employed by the supervisor to describe the cluster +# topology to newly forked nodes. We refresh it when we receive new +# configurations. +global g_cluster: table[string] of Supervisor::ClusterEndpoint; event SupervisorControl::create_response(reqid: string, result: string) @@ -118,11 +132,10 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M for ( nodename in g_nodes ) supervisor_destroy(nodename); + # Refresh the cluster and nodes tables g_nodes = table(); + g_cluster = table(); - # Refresh the data cluster and nodes tables - - g_data_cluster = table(); for ( node in config$nodes ) { if ( node$instance == Management::Agent::name ) @@ -144,7 +157,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M if ( node?$interface ) cep$interface = node$interface; - g_data_cluster[node$name] = cep; + g_cluster[node$name] = cep; } # Apply the new configuration via the supervisor @@ -152,6 +165,8 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M for ( nodename in g_nodes ) { node = g_nodes[nodename]; + node$state = Management::PENDING; + nc = Supervisor::NodeConfig($name=nodename); if ( Management::Agent::cluster_directory != "" ) @@ -166,10 +181,15 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M if ( node?$env ) nc$env = node$env; + # Always add the policy/management/node scripts to any cluster + # node, since we require it to be able to communicate with the + # node. + nc$scripts[|nc$scripts|] = "policy/frameworks/management/node"; + # XXX could use options to enable per-node overrides for # directory, stdout, stderr, others? - nc$cluster = g_data_cluster; + nc$cluster = g_cluster; supervisor_create(nc); } @@ -209,7 +229,7 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat local cns = Management::NodeStatus( $node=node, $state=Management::PENDING); - # Identify the role of the node. For data cluster roles (worker, + # Identify the role of the node. For cluster roles (worker, # manager, etc) we derive this from the cluster node table. For # agent and controller, we identify via environment variables # that the controller framework establishes upon creation (see @@ -218,6 +238,11 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat { cns$cluster_role = sns$node$cluster[node]$role; + # For cluster nodes, copy run state from g_nodes, our + # live node status table. + if ( node in g_nodes ) + cns$state = g_nodes[node]$state; + # The supervisor's responses use 0/tcp (not 0/unknown) # when indicating an unused port because its internal # serialization always assumes TCP. @@ -232,12 +257,22 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat if ( role == "CONTROLLER" ) { cns$mgmt_role = Management::CONTROLLER; + + # Automatically declare the controller in running state + # here -- we'd not have received a request that brought + # us here otherwise. + cns$state = Management::RUNNING; + # The controller always listens, so the Zeek client can connect. cns$p = Management::Agent::endpoint_info()$network$bound_port; } else if ( role == "AGENT" ) { cns$mgmt_role = Management::AGENT; + + # Similarly to above, always declare agent running. We are. :) + cns$state = Management::RUNNING; + # If we have a controller address, the agent connects to it # and does not listen. See zeek_init() below for similar logic. if ( Management::Agent::controller$address == "0.0.0.0" ) @@ -249,13 +284,9 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat } } - # A PID is available if a supervised node has fully launched - # and is therefore running. + # A PID is available if a supervised node has fully launched. if ( sns?$pid ) - { cns$pid = sns$pid; - cns$state = Management::RUNNING; - } node_statuses += cns; } @@ -278,6 +309,166 @@ event Management::Agent::API::get_nodes_request(reqid: string) Management::Log::info(fmt("issued supervisor status, %s", req$id)); } +event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result) + { + local node = "unknown node"; + if ( result?$node ) + node = result$node; + + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s from %s", reqid, node)); + + # Retrieve state for the request we just got a response to + local nreq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(nreq) ) + return; + + # Find the original request from the controller + local req = Management::Request::lookup(nreq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Mark the responding node as done. Nodes normally fill their own name + # into the result; we only double-check for resilience. Nodes failing to + # report themselves would eventually lead to request timeout. + if ( result?$node ) + { + if ( result$node in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[result$node]; + else + { + # An unknown or duplicate response -- do nothing. + Management::Log::debug(fmt("response %s not expected, ignoring", reqid)); + return; + } + } + + # The usual special treatment for Broker values that are of type "any": + # confirm their type here based on the requested dispatch command. + switch req$node_dispatch_state$action[0] + { + case "get_id_value": + if ( result?$data ) + result$data = result$data as string; + break; + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + # The result has the reporting node filled in but not the agent/instance + # (which the node doesn't know about), so add it now. + result$instance = Management::Agent::instance()$name; + + # Add this result to the overall response + req$results[|req$results|] = result; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Release the agent-nodes request state, since we now have all responses. + Management::Request::finish(nreq$id); + + # Send response event back to controller and clean up main request state. + Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s", + Management::Request::to_string(req))); + event Management::Agent::API::node_dispatch_response(req$id, req$results); + Management::Request::finish(req$id); + } + +event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string]) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s %s", reqid, action, nodes)); + + local node: string; + local cluster_nodes: set[string]; + local nodes_final: set[string]; + + for ( node in g_nodes ) + add cluster_nodes[node]; + + # If this request includes cluster nodes to query, check if this agent + # manages any of those nodes. If it doesn't, respond with an empty + # results vector immediately. Note that any globally unknown nodes + # that the client might have requested already got filtered by the + # controller, so we don't need to worry about them here. + + if ( |nodes| > 0 ) + { + nodes_final = nodes & cluster_nodes; + + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no node overlap", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, vector()); + return; + } + } + else if ( |g_nodes| == 0 ) + { + # Special case: the client did not request specific nodes. If + # we aren't running any nodes, respond right away, since there's + # nothing to dispatch to. + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no nodes registered", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, vector()); + return; + } + else + { + # We send to all known nodes. + nodes_final = cluster_nodes; + } + + local res: Management::Result; + local req = Management::Request::create(reqid); + + req$node_dispatch_state = NodeDispatchState($action=action); + + # Build up dispatch state for tracking responses. We only dispatch to + # nodes that are in state RUNNING, as those have confirmed they're ready + # to communicate. For others, establish error state in now. + for ( node in nodes_final ) + { + if ( g_nodes[node]$state == Management::RUNNING ) + add req$node_dispatch_state$requests[node]; + else + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = fmt("cluster node %s not in runnning state", node); + req$results += res; + } + } + + # Corner case: nothing is in state RUNNING. + if ( |req$node_dispatch_state$requests| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_response %s, no nodes running", + reqid)); + event Management::Agent::API::node_dispatch_response(reqid, req$results); + Management::Request::finish(req$id); + return; + } + + # We use a single request record to track all node responses, and a + # single event that Broker publishes to all nodes. We know when all + # nodes have responded by checking the requests set we built up above. + local nreq = Management::Request::create(); + nreq$parent_id = reqid; + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action)); + Broker::publish(Management::Node::node_topic, + Management::Node::API::node_dispatch_request, nreq$id, action, nodes); + } + event Management::Agent::API::agent_welcome_request(reqid: string) { Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid)); @@ -311,6 +502,14 @@ event Management::Agent::API::agent_standby_request(reqid: string) event Management::Agent::API::agent_standby_response(reqid, res); } +event Management::Node::API::notify_node_hello(node: string) + { + Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node)); + + if ( node in g_nodes ) + g_nodes[node]$state = Management::RUNNING; + } + event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { # This does not (cannot?) immediately verify that the new peer @@ -342,28 +541,39 @@ event zeek_init() Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry); - # Agents need receive communication targeted at it, and any responses - # from the supervisor. + # Agents need receive communication targeted at it, any responses + # from the supervisor, and any responses from cluster nodes. Broker::subscribe(agent_topic); Broker::subscribe(SupervisorControl::topic_prefix); + Broker::subscribe(Management::Node::node_topic); # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. - Broker::auto_publish(agent_topic, Management::Agent::API::get_nodes_response); - Broker::auto_publish(agent_topic, Management::Agent::API::set_configuration_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_welcome_response); - Broker::auto_publish(agent_topic, Management::Agent::API::agent_standby_response); + local agent_to_controller_events: vector of any = [ + Management::Agent::API::get_nodes_response, + Management::Agent::API::set_configuration_response, + Management::Agent::API::agent_welcome_response, + Management::Agent::API::agent_standby_response, + Management::Agent::API::node_dispatch_response, + Management::Agent::API::notify_agent_hello, + Management::Agent::API::notify_change, + Management::Agent::API::notify_error, + Management::Agent::API::notify_log + ]; - Broker::auto_publish(agent_topic, Management::Agent::API::notify_agent_hello); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_change); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_error); - Broker::auto_publish(agent_topic, Management::Agent::API::notify_log); + for ( i in agent_to_controller_events ) + Broker::auto_publish(agent_topic, agent_to_controller_events[i]); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); - Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + local agent_to_sup_events: vector of any = [ + SupervisorControl::create_request, + SupervisorControl::status_request, + SupervisorControl::destroy_request, + SupervisorControl::restart_request, + SupervisorControl::stop_request + ]; + + for ( i in agent_to_sup_events ) + Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]); # Establish connectivity with the controller. if ( Management::Agent::controller$address != "0.0.0.0" ) @@ -373,11 +583,10 @@ event zeek_init() Management::Agent::controller$bound_port, Management::connect_retry); } - else - { - # Controller connects to us; listen for it. - Broker::listen(cat(epi$network$address), epi$network$bound_port); - } + + # The agent always listens, to allow cluster nodes to peer with it. + # If the controller connects to us, it also uses this port. + Broker::listen(cat(epi$network$address), epi$network$bound_port); Management::Log::info("agent is live"); } diff --git a/scripts/policy/frameworks/management/controller/api.zeek b/scripts/policy/frameworks/management/controller/api.zeek index 575f90009a..0ce3259a4c 100644 --- a/scripts/policy/frameworks/management/controller/api.zeek +++ b/scripts/policy/frameworks/management/controller/api.zeek @@ -71,7 +71,7 @@ export { ## ## reqid: the request identifier used in the request event. ## - ## result: a :zeek:type`vector` of :zeek:see:`Management::Result` + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` ## records. Each record covers one cluster instance. Each record's data ## member is a vector of :zeek:see:`Management::NodeStatus` ## records, covering the nodes at that instance. Results may also indicate @@ -80,6 +80,36 @@ export { result: Management::ResultVec); + ## zeek-client sends this event to retrieve the current value of a + ## variable in Zeek's global namespace, referenced by the given + ## identifier (i.e., variable name). The controller asks all agents + ## to retrieve this value from each cluster node, accumulates the + ## returned responses, and responds with a get_id_value_response + ## event back to the client. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## id: the name of the variable whose value to retrieve. + ## + ## nodes: a set of cluster node names (e.g. "worker-01") to retrieve + ## the values from. An empty set, supplied by default, means + ## retrieval from all current cluster nodes. + global get_id_value_request: event(reqid: string, id: string, + nodes: set[string] &default=set()); + + ## Response to a get_id_value_request event. The controller sends this + ## back to the client. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:type:`vector` of :zeek:see:`Management::Result` + ## records. Each record covers one Zeek cluster node. Each record's + ## data field contains a string with the JSON rendering (as produced + ## by :zeek:id:`to_json`, including the error strings it potentially + ## returns). + global get_id_value_response: event(reqid: string, result: Management::ResultVec); + + # Testing events. These don't provide operational value but expose # internal functionality, triggered by test cases. diff --git a/scripts/policy/frameworks/management/controller/main.zeek b/scripts/policy/frameworks/management/controller/main.zeek index 5aa5292b04..441e03517c 100644 --- a/scripts/policy/frameworks/management/controller/main.zeek +++ b/scripts/policy/frameworks/management/controller/main.zeek @@ -36,6 +36,29 @@ export { requests: set[string] &default=set(); }; + ## Request state for node dispatch requests, to track the requested + ## action and received responses. Node dispatches are requests to + ## execute pre-implemented actions on every node in the cluster, + ## and report their outcomes. See + ## :zeek:see:`Management::Agent::API::node_dispatch_request` and + ## :zeek:see:`Management::Agent::API::node_dispatch_response` for the + ## agent/controller interaction, and + ## :zeek:see:`Management::Controller::API::get_id_value_request` and + ## :zeek:see:`Management::Controller::API::get_id_value_response` + ## for an example of a specific API the controller generalizes into + ## a dispatch. + type NodeDispatchState: record { + ## The dispatched action. The first string is a command, + ## any remaining strings its arguments. + action: vector of string; + + ## Request state for every controller/agent transaction. + ## The set of strings tracks the node names from which + ## we still expect responses, before we can respond back + ## to the client. + requests: set[string] &default=set(); + }; + ## Dummy state for internal state-keeping test cases. type TestState: record { }; } @@ -43,6 +66,7 @@ export { redef record Management::Request::Request += { set_configuration_state: SetConfigurationState &optional; get_nodes_state: GetNodesState &optional; + node_dispatch_state: NodeDispatchState &optional; test_state: TestState &optional; }; @@ -206,6 +230,17 @@ function is_instance_connectivity_change(inst: Management::Instance): bool return F; } +function filter_config_nodes_by_name(nodes: set[string]): set[string] + { + local res: set[string]; + local cluster_nodes: set[string]; + + for ( node in g_config_current$nodes ) + add cluster_nodes[node$name]; + + return nodes & cluster_nodes; + } + event Management::Controller::API::notify_agents_ready(instances: set[string]) { local insts = Management::Util::set_to_vector(instances); @@ -386,10 +421,10 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf g_config_reqid_pending = req$id; # Compare the instance configuration to our current one. If it matches, - # we can proceed to deploying the new data cluster topology. If it does + # we can proceed to deploying the new cluster topology. If it does # not, we need to establish connectivity with agents we connect to, or # wait until all instances that connect to us have done so. Either triggers - # a notify_agents_ready event, upon which we then deploy the data cluster. + # a notify_agents_ready event, upon which we then deploy the topology. # The current & new set of instance names. local insts_current: set[string]; @@ -485,7 +520,7 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme if ( Management::Request::is_null(areq) ) return; - # Release the request, which is now done. + # Release the request, since this agent is now done. Management::Request::finish(areq$id); # Find the original request from the client @@ -554,22 +589,159 @@ event Management::Controller::API::get_nodes_request(reqid: string) } } +event Management::Agent::API::node_dispatch_response(reqid: string, results: Management::ResultVec) + { + Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = Management::Request::lookup(reqid); + if ( Management::Request::is_null(areq) ) + return; + + # Release the request, since this agent is now done. + Management::Request::finish(areq$id); + + # Find the original request from the client + local req = Management::Request::lookup(areq$parent_id); + if ( Management::Request::is_null(req) ) + return; + + # Add this agent's results to the overall response + for ( i in results ) + { + # Same special treatment for Broker values that are of + # type "any": confirm their (known) type here. + switch req$node_dispatch_state$action[0] + { + case "get_id_value": + if ( results[i]?$data ) + results[i]$data = results[i]$data as string; + break; + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + req$results[|req$results|] = results[i]; + } + + # Mark this request as done + if ( areq$id in req$node_dispatch_state$requests ) + delete req$node_dispatch_state$requests[areq$id]; + + # If we still have pending queries out to the agents, do nothing: we'll + # handle this soon, or our request will time out and we respond with + # error. + if ( |req$node_dispatch_state$requests| > 0 ) + return; + + # Send response event to the client based upon the dispatch type. + switch req$node_dispatch_state$action[0] + { + case "get_id_value": + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + break; + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + + Management::Request::finish(req$id); + } + +event Management::Controller::API::get_id_value_request(reqid: string, id: string, nodes: set[string]) + { + Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id)); + + # Special case: if we have no instances, respond right away. + if ( |g_instances| == 0 ) + { + Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid)); + event Management::Controller::API::get_id_value_response(reqid, vector( + Management::Result($reqid=reqid, $success=F, $error="no instances connected"))); + return; + } + + local action = vector("get_id_value", id); + local req = Management::Request::create(reqid); + req$node_dispatch_state = NodeDispatchState($action=action); + + local nodes_final: set[string]; + local node: string; + local res: Management::Result; + + # Input sanitization: check for any requested nodes that aren't part of + # the current configuration. We send back error results for those and + # don't propagate them to the agents. + if ( |nodes| > 0 ) + { + # Requested nodes that are in the current configuration: + nodes_final = filter_config_nodes_by_name(nodes); + # Requested nodes that are not in current configuration: + local nodes_invalid = nodes - nodes_final; + + # Assemble error results for all invalid nodes + for ( node in nodes_invalid ) + { + res = Management::Result($reqid=reqid, $node=node); + res$success = F; + res$error = "unknown cluster node"; + req$results += res; + } + + # If only invalid nodes got requested, we're now done. + if ( |nodes_final| == 0 ) + { + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + Management::Request::finish(req$id); + return; + } + } + + # Send dispatch requests to all agents, with the final set of nodes + for ( name in g_instances ) + { + if ( name !in g_instances_ready ) + next; + + local agent_topic = Management::Agent::topic_prefix + "/" + name; + local areq = Management::Request::create(); + + areq$parent_id = req$id; + add req$node_dispatch_state$requests[areq$id]; + + Management::Log::info(fmt( + "tx Management::Agent::API::node_dispatch_request %s %s to %s", + areq$id, action, name)); + + Broker::publish(agent_topic, + Management::Agent::API::node_dispatch_request, + areq$id, action, nodes_final); + } + } + event Management::Request::request_expired(req: Management::Request::Request) { # Various handlers for timed-out request state. We use the state members # to identify how to respond. No need to clean up the request itself, # since we're getting here via the request module's expiration # mechanism that handles the cleanup. - local res: Management::Result; + local res = Management::Result($reqid=req$id, + $success = F, + $error = "request timed out"); if ( req?$set_configuration_state ) { # This timeout means we no longer have a pending request. g_config_reqid_pending = ""; - - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s", @@ -579,9 +751,6 @@ event Management::Request::request_expired(req: Management::Request::Request) if ( req?$get_nodes_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; req$results += res; Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", @@ -589,12 +758,27 @@ event Management::Request::request_expired(req: Management::Request::Request) event Management::Controller::API::get_nodes_response(req$id, req$results); } + if ( req?$node_dispatch_state ) + { + req$results += res; + + switch req$node_dispatch_state$action[0] + { + case "get_id_value": + Management::Log::info(fmt( + "tx Management::Controller::API::get_id_value_response %s", + Management::Request::to_string(req))); + event Management::Controller::API::get_id_value_response(req$id, req$results); + break; + default: + Management::Log::error(fmt("unexpected dispatch command %s", + req$node_dispatch_state$action[0])); + break; + } + } + if ( req?$test_state ) { - res = Management::Result($reqid=req$id); - res$success = F; - res$error = "request timed out"; - Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id)); event Management::Controller::API::test_timeout_response(req$id, res); } @@ -638,6 +822,7 @@ event zeek_init() Management::Controller::API::get_instances_response, Management::Controller::API::set_configuration_response, Management::Controller::API::get_nodes_response, + Management::Controller::API::get_id_value_response, Management::Controller::API::test_timeout_response ]; diff --git a/scripts/policy/frameworks/management/log.zeek b/scripts/policy/frameworks/management/log.zeek index e69c55b122..e8732df1bc 100644 --- a/scripts/policy/frameworks/management/log.zeek +++ b/scripts/policy/frameworks/management/log.zeek @@ -16,10 +16,10 @@ export { ## The controller/agent log supports four different log levels. type Level: enum { - DEBUG, - INFO, - WARNING, - ERROR, + DEBUG = 10, + INFO = 20, + WARNING = 30, + ERROR = 40, }; ## The record type containing the column fields of the agent/controller log. @@ -36,8 +36,9 @@ export { message: string; } &log; - ## The log level in use for this node. - global log_level = DEBUG &redef; + ## The log level in use for this node. This is the minimum + ## log level required to produce output. + global log_level = INFO &redef; ## A debug-level log message writer. ## @@ -82,6 +83,7 @@ global l2s: table[Level] of string = { global r2s: table[Management::Role] of string = { [Management::AGENT] = "AGENT", [Management::CONTROLLER] = "CONTROLLER", + [Management::NODE] = "NODE", }; function debug(message: string) diff --git a/scripts/policy/frameworks/management/node/__load__.zeek b/scripts/policy/frameworks/management/node/__load__.zeek new file mode 100644 index 0000000000..a10fe855df --- /dev/null +++ b/scripts/policy/frameworks/management/node/__load__.zeek @@ -0,0 +1 @@ +@load ./main diff --git a/scripts/policy/frameworks/management/node/api.zeek b/scripts/policy/frameworks/management/node/api.zeek new file mode 100644 index 0000000000..f952f312ef --- /dev/null +++ b/scripts/policy/frameworks/management/node/api.zeek @@ -0,0 +1,48 @@ +##! The Management event API of cluster nodes. The API consists of request/ +##! response event pairs, like elsewhere in the Management, Supervisor, and +##! Control frameworks. + +@load policy/frameworks/management/types + +module Management::Node::API; + +export { + ## Management agents send this event to every Zeek cluster node to run a + ## "dispatch" -- a particular, pre-implemented action. This is the agent-node + ## complement to :zeek:see:`Management::Agent::API::node_dispatch_request`. + ## + ## reqid: a request identifier string, echoed in the response event. + ## + ## action: the requested dispatch command, with any arguments. + ## + ## nodes: the cluster node names this dispatch targets. An empty set, + ## supplied by default, means it applies to all nodes. Since nodes + ## receive all dispatch requests, they can use any node names provided + ## here to filter themselves out of responding. + global node_dispatch_request: event(reqid: string, action: vector of string, + nodes: set[string] &default=set()); + + ## Response to a node_dispatch_request event. The nodes send this back + ## to the agent. This is the agent-node equivalent of + ## :zeek:see:`Management::Agent::API::node_dispatch_response`. + ## + ## reqid: the request identifier used in the request event. + ## + ## result: a :zeek:see:`Management::Result` record covering one Zeek + ## cluster node managed by the agent. Upon success, the data field + ## contains a value appropriate for the requested dispatch. + global node_dispatch_response: event(reqid: string, result: Management::Result); + + + # Notification events, node -> agent + + ## The cluster nodes send this event upon peering as a "check-in" to + ## the agent, to indicate the node is now available to communicate + ## with. It is an agent-level equivalent of :zeek:see:`Broker::peer_added`, + ## and similar to :zeek:see:`Management::Agent::API::notify_agent_hello` + ## for agents. + ## + ## node: the name of the node, as given in :zeek:see:`Cluster::node`. + ## + global notify_node_hello: event(node: string); +} diff --git a/scripts/policy/frameworks/management/node/config.zeek b/scripts/policy/frameworks/management/node/config.zeek new file mode 100644 index 0000000000..d17fd663a1 --- /dev/null +++ b/scripts/policy/frameworks/management/node/config.zeek @@ -0,0 +1,9 @@ +##! Configuration settings for nodes controlled by the Management framework. + +module Management::Node; + +export { + ## The nodes' Broker topic. Cluster nodes automatically subscribe + ## to it, to receive request events from the Management framework. + const node_topic = "zeek/management/node" &redef; +} diff --git a/scripts/policy/frameworks/management/node/main.zeek b/scripts/policy/frameworks/management/node/main.zeek new file mode 100644 index 0000000000..cd9819ff3d --- /dev/null +++ b/scripts/policy/frameworks/management/node/main.zeek @@ -0,0 +1,121 @@ +##! This module provides Management framework functionality present in every +##! cluster node, to allowing Management agents to interact with the nodes. + +@load base/frameworks/cluster + +@load policy/frameworks/management/agent/config +@load policy/frameworks/management/log + +@load ./api +@load ./config + +module Management::Node; + +# Tag our logs correctly +redef Management::Log::role = Management::NODE; + +## The type of dispatch callbacks. These implement a particular dispatch action, +## using the provided string vector as arguments, filling results into the +## provided result record. +type DispatchCallback: function(args: vector of string, res: Management::Result); + +## Implementation of the "get_id_value" dispatch. Its only argument is the name +## of the ID to look up. +function dispatch_get_id_value(args: vector of string, res: Management::Result) + { + if ( |args| == 0 ) + { + res$success = F; + res$error = "get_id_value expects name of global identifier"; + return; + } + + local val = lookup_ID(args[0]); + + # The following lookup_ID() result strings indicate errors: + if ( type_name(val) == "string" ) + { + local valstr: string = val; + if ( valstr == "" || valstr == "" ) + { + res$success = F; + res$error = valstr[1:-1]; + } + } + + if ( res$success ) + res$data = to_json(val); + } + +global g_dispatch_table: table[string] of DispatchCallback = { + ["get_id_value"] = dispatch_get_id_value, +}; + +event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string, + nodes: set[string]) + { + Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes)); + + if ( |nodes| > 0 && Cluster::node !in nodes ) + { + Management::Log::debug(fmt( + "dispatch %s not targeting this node (%s !in %s), skipping", + reqid, Cluster::node, nodes)); + return; + } + + local res = Management::Result( + $reqid = reqid, $node = Cluster::node); + + if ( |action| == 0 ) + { + res$success = F; + res$error = "no dispatch arguments provided"; + } + else if ( action[0] !in g_dispatch_table ) + { + res$success = F; + res$error = fmt("dispatch %s unknown", action[0]); + } + + if ( ! res$success ) + { + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + return; + } + + g_dispatch_table[action[0]](action[1:], res); + + Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s", + Management::result_to_string(res))); + event Management::Node::API::node_dispatch_response(reqid, res); + } + +event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) + { + local epi = Management::Agent::endpoint_info(); + + # If this is the agent peering, notify it that we're ready + if ( peer$network$address == epi$network$address && + peer$network$bound_port == epi$network$bound_port ) + event Management::Node::API::notify_node_hello(Cluster::node); + } + +event zeek_init() + { + local epi = Management::Agent::endpoint_info(); + + Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry); + Broker::subscribe(node_topic); + + # Events automatically sent to the Management agent. + local events: vector of any = [ + Management::Node::API::node_dispatch_response, + Management::Node::API::notify_node_hello + ]; + + for ( i in events ) + Broker::auto_publish(node_topic, events[i]); + } diff --git a/scripts/policy/frameworks/management/types.zeek b/scripts/policy/frameworks/management/types.zeek index 824ea7dfb4..6d89fbda1a 100644 --- a/scripts/policy/frameworks/management/types.zeek +++ b/scripts/policy/frameworks/management/types.zeek @@ -6,12 +6,13 @@ module Management; export { ## Management infrastructure node type. This intentionally does not - ## include the data cluster node types (worker, logger, etc) -- those + ## include the managed cluster node types (worker, logger, etc) -- those ## continue to be managed by the cluster framework. type Role: enum { NONE, ##< No active role in cluster management AGENT, ##< A cluster management agent. CONTROLLER, ##< The cluster's controller. + NODE, ##< A managed cluster node (worker, manager, etc). }; ## A Zeek-side option with value. diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 16bc4f3ede..0f2de90609 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -24,6 +24,10 @@ @load frameworks/management/__load__.zeek @load frameworks/management/config.zeek @load frameworks/management/log.zeek +# @load frameworks/management/node/__load__.zeek +@load frameworks/management/node/api.zeek +@load frameworks/management/node/config.zeek +# @load frameworks/management/node/main.zeek @load frameworks/management/request.zeek @load frameworks/management/types.zeek @load frameworks/management/util.zeek diff --git a/scripts/zeekygen/__load__.zeek b/scripts/zeekygen/__load__.zeek index ad28277176..39314a04ac 100644 --- a/scripts/zeekygen/__load__.zeek +++ b/scripts/zeekygen/__load__.zeek @@ -7,6 +7,8 @@ @load frameworks/control/controller.zeek @load frameworks/management/agent/main.zeek @load frameworks/management/controller/main.zeek +@load frameworks/management/node/__load__.zeek +@load frameworks/management/node/main.zeek @load frameworks/files/extract-all-files.zeek @load policy/misc/dump-events.zeek @load policy/protocols/conn/speculative-service.zeek diff --git a/src/Val.cc b/src/Val.cc index 3d3290a200..1d0055f71b 100644 --- a/src/Val.cc +++ b/src/Val.cc @@ -420,8 +420,9 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val* } rapidjson::Value j; + auto tag = val->GetType()->Tag(); - switch ( val->GetType()->Tag() ) + switch ( tag ) { case TYPE_BOOL: writer.Bool(val->AsBool()); @@ -475,8 +476,15 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val* ODesc d; d.SetStyle(RAW_STYLE); val->Describe(&d); - writer.String(util::json_escape_utf8( - std::string(reinterpret_cast(d.Bytes()), d.Len()))); + std::string desc(reinterpret_cast(d.Bytes()), d.Len()); + + // None of our function types should have surrounding + // whitespace, but ODesc might produce it due to its + // many output modes and flags. Strip it. + if ( tag == TYPE_FUNC ) + desc = util::strstrip(desc); + + writer.String(util::json_escape_utf8(desc)); break; } diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index a72d64757e..bc9bd28f83 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,9 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. -warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:13 "Remove in v5.1. Use log-certs-base64.zeek instead." +warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:15 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") diff --git a/testing/external/commit-hash.zeek-testing-cluster b/testing/external/commit-hash.zeek-testing-cluster index 407428c53f..f8de34839f 100644 --- a/testing/external/commit-hash.zeek-testing-cluster +++ b/testing/external/commit-hash.zeek-testing-cluster @@ -1 +1 @@ -3528e248d7d35e102c39c8e8050fc1a6dfa477bf +1b515f3f60abed5c505a970cae380560ce6304c1