mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Management framework: allow selecting cluster nodes in get_id_value
This adds an optional set of cluster node names to narrow the querying to. It similarly expands the dispatch mechanism, since it likely most sense for any such request to apply only to a subset of nodes. Requests for invalid nodes trigger Response records in error state.
This commit is contained in:
parent
438cd9b9f7
commit
7edd1a2651
6 changed files with 175 additions and 20 deletions
|
@ -75,7 +75,12 @@ export {
|
|||
## reqid: a request identifier string, echoed in the response event.
|
||||
##
|
||||
## action: the requested dispatch command, with any arguments.
|
||||
global node_dispatch_request: event(reqid: string, action: vector of string);
|
||||
##
|
||||
## nodes: a set of cluster node names (e.g. "worker-01") to retrieve
|
||||
## the values from. An empty set, supplied by default, means
|
||||
## retrieval from all nodes managed by the agent.
|
||||
global node_dispatch_request: event(reqid: string, action: vector of string,
|
||||
nodes: set[string] &default=set());
|
||||
|
||||
## Response to a node_dispatch_request event. Each agent sends this back
|
||||
## to the controller to report the dispatch outcomes on all nodes managed
|
||||
|
|
|
@ -51,9 +51,9 @@ global g_instances: table[string] of Management::Instance;
|
|||
# A map for the nodes we run on this instance, via this agent.
|
||||
global g_nodes: table[string] of Management::Node;
|
||||
|
||||
# The node map employed by the supervisor to describe the cluster
|
||||
# topology to newly forked nodes. We refresh it when we receive
|
||||
# new configurations.
|
||||
# The complete node map employed by the supervisor to describe the cluster
|
||||
# topology to newly forked nodes. We refresh it when we receive new
|
||||
# configurations.
|
||||
global g_cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||
|
||||
|
||||
|
@ -311,7 +311,11 @@ event Management::Agent::API::get_nodes_request(reqid: string)
|
|||
|
||||
event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result)
|
||||
{
|
||||
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s", reqid));
|
||||
local node = "unknown node";
|
||||
if ( result?$node )
|
||||
node = result$node;
|
||||
|
||||
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s from %s", reqid, node));
|
||||
|
||||
# Retrieve state for the request we just got a response to
|
||||
local nreq = Management::Request::lookup(reqid);
|
||||
|
@ -326,8 +330,17 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag
|
|||
# Mark the responding node as done. Nodes normally fill their own name
|
||||
# into the result; we only double-check for resilience. Nodes failing to
|
||||
# report themselves would eventually lead to request timeout.
|
||||
if ( result?$node && result$node in req$node_dispatch_state$requests )
|
||||
if ( result?$node )
|
||||
{
|
||||
if ( result$node in req$node_dispatch_state$requests )
|
||||
delete req$node_dispatch_state$requests[result$node];
|
||||
else
|
||||
{
|
||||
# An unknown or duplicate response -- do nothing.
|
||||
Management::Log::debug(fmt("response %s not expected, ignoring", reqid));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# The usual special treatment for Broker values that are of type "any":
|
||||
# confirm their type here based on the requested dispatch command.
|
||||
|
@ -366,26 +379,94 @@ event Management::Node::API::node_dispatch_response(reqid: string, result: Manag
|
|||
Management::Request::finish(req$id);
|
||||
}
|
||||
|
||||
event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string)
|
||||
event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string])
|
||||
{
|
||||
Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s", reqid, action));
|
||||
Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s %s", reqid, action, nodes));
|
||||
|
||||
local node: string;
|
||||
local cluster_nodes: set[string];
|
||||
local nodes_final: set[string];
|
||||
|
||||
for ( node in g_nodes )
|
||||
add cluster_nodes[node];
|
||||
|
||||
# If this request includes cluster nodes to query, check if this agent
|
||||
# manages any of those nodes. If it doesn't, respond with an empty
|
||||
# results vector immediately. Note that any globally unknown nodes
|
||||
# that the client might have requested already got filtered by the
|
||||
# controller, so we don't need to worry about them here.
|
||||
|
||||
if ( |nodes| > 0 )
|
||||
{
|
||||
nodes_final = nodes & cluster_nodes;
|
||||
|
||||
if ( |nodes_final| == 0 )
|
||||
{
|
||||
Management::Log::info(fmt(
|
||||
"tx Management::Agent::API::node_dispatch_response %s, no node overlap",
|
||||
reqid));
|
||||
event Management::Agent::API::node_dispatch_response(reqid, vector());
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if ( |g_nodes| == 0 )
|
||||
{
|
||||
# Special case: the client did not request specific nodes. If
|
||||
# we aren't running any nodes, respond right away, since there's
|
||||
# nothing to dispatch to.
|
||||
Management::Log::info(fmt(
|
||||
"tx Management::Agent::API::node_dispatch_response %s, no nodes registered",
|
||||
reqid));
|
||||
event Management::Agent::API::node_dispatch_response(reqid, vector());
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
# We send to all known nodes.
|
||||
nodes_final = cluster_nodes;
|
||||
}
|
||||
|
||||
local res: Management::Result;
|
||||
local req = Management::Request::create(reqid);
|
||||
|
||||
req$node_dispatch_state = NodeDispatchState($action=action);
|
||||
|
||||
for ( node in g_nodes )
|
||||
# Build up dispatch state for tracking responses. We only dispatch to
|
||||
# nodes that are in state RUNNING, as those have confirmed they're ready
|
||||
# to communicate. For others, establish error state in now.
|
||||
for ( node in nodes_final )
|
||||
{
|
||||
if ( g_nodes[node]$state == Management::RUNNING )
|
||||
add req$node_dispatch_state$requests[node];
|
||||
else
|
||||
{
|
||||
res = Management::Result($reqid=reqid, $node=node);
|
||||
res$success = F;
|
||||
res$error = fmt("cluster node %s not in runnning state", node);
|
||||
req$results += res;
|
||||
}
|
||||
}
|
||||
|
||||
# We use a single request record to track all node responses. We know
|
||||
# when all nodes have responded via the requests set we built up above.
|
||||
# Corner case: nothing is in state RUNNING.
|
||||
if ( |req$node_dispatch_state$requests| == 0 )
|
||||
{
|
||||
Management::Log::info(fmt(
|
||||
"tx Management::Agent::API::node_dispatch_response %s, no nodes running",
|
||||
reqid));
|
||||
event Management::Agent::API::node_dispatch_response(reqid, req$results);
|
||||
Management::Request::finish(req$id);
|
||||
return;
|
||||
}
|
||||
|
||||
# We use a single request record to track all node responses, and a
|
||||
# single event that Broker publishes to all nodes. We know when all
|
||||
# nodes have responded by checking the requests set we built up above.
|
||||
local nreq = Management::Request::create();
|
||||
nreq$parent_id = reqid;
|
||||
|
||||
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action));
|
||||
Broker::publish(Management::Node::node_topic,
|
||||
Management::Node::API::node_dispatch_request, nreq$id, action);
|
||||
Management::Node::API::node_dispatch_request, nreq$id, action, nodes);
|
||||
}
|
||||
|
||||
event Management::Agent::API::agent_welcome_request(reqid: string)
|
||||
|
|
|
@ -90,7 +90,12 @@ export {
|
|||
## reqid: a request identifier string, echoed in the response event.
|
||||
##
|
||||
## id: the name of the variable whose value to retrieve.
|
||||
global get_id_value_request: event(reqid: string, id: string);
|
||||
##
|
||||
## nodes: a set of cluster node names (e.g. "worker-01") to retrieve
|
||||
## the values from. An empty set, supplied by default, means
|
||||
## retrieval from all current cluster nodes.
|
||||
global get_id_value_request: event(reqid: string, id: string,
|
||||
nodes: set[string] &default=set());
|
||||
|
||||
## Response to a get_id_value_request event. The controller sends this
|
||||
## back to the client.
|
||||
|
|
|
@ -230,6 +230,17 @@ function is_instance_connectivity_change(inst: Management::Instance): bool
|
|||
return F;
|
||||
}
|
||||
|
||||
function filter_config_nodes_by_name(nodes: set[string]): set[string]
|
||||
{
|
||||
local res: set[string];
|
||||
local cluster_nodes: set[string];
|
||||
|
||||
for ( node in g_config_current$nodes )
|
||||
add cluster_nodes[node$name];
|
||||
|
||||
return nodes & cluster_nodes;
|
||||
}
|
||||
|
||||
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||
{
|
||||
local insts = Management::Util::set_to_vector(instances);
|
||||
|
@ -643,7 +654,7 @@ event Management::Agent::API::node_dispatch_response(reqid: string, results: Man
|
|||
Management::Request::finish(req$id);
|
||||
}
|
||||
|
||||
event Management::Controller::API::get_id_value_request(reqid: string, id: string)
|
||||
event Management::Controller::API::get_id_value_request(reqid: string, id: string, nodes: set[string])
|
||||
{
|
||||
Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id));
|
||||
|
||||
|
@ -660,6 +671,42 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
|
|||
local req = Management::Request::create(reqid);
|
||||
req$node_dispatch_state = NodeDispatchState($action=action);
|
||||
|
||||
local nodes_final: set[string];
|
||||
local node: string;
|
||||
local res: Management::Result;
|
||||
|
||||
# Input sanitization: check for any requested nodes that aren't part of
|
||||
# the current configuration. We send back error results for those and
|
||||
# don't propagate them to the agents.
|
||||
if ( |nodes| > 0 )
|
||||
{
|
||||
# Requested nodes that are in the current configuration:
|
||||
nodes_final = filter_config_nodes_by_name(nodes);
|
||||
# Requested nodes that are not in current configuration:
|
||||
local nodes_invalid = nodes - nodes_final;
|
||||
|
||||
# Assemble error results for all invalid nodes
|
||||
for ( node in nodes_invalid )
|
||||
{
|
||||
res = Management::Result($reqid=reqid, $node=node);
|
||||
res$success = F;
|
||||
res$error = "unknown cluster node";
|
||||
req$results += res;
|
||||
}
|
||||
|
||||
# If only invalid nodes got requested, we're now done.
|
||||
if ( |nodes_final| == 0 )
|
||||
{
|
||||
Management::Log::info(fmt(
|
||||
"tx Management::Controller::API::get_id_value_response %s",
|
||||
Management::Request::to_string(req)));
|
||||
event Management::Controller::API::get_id_value_response(req$id, req$results);
|
||||
Management::Request::finish(req$id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# Send dispatch requests to all agents, with the final set of nodes
|
||||
for ( name in g_instances )
|
||||
{
|
||||
if ( name !in g_instances_ready )
|
||||
|
@ -675,7 +722,9 @@ event Management::Controller::API::get_id_value_request(reqid: string, id: strin
|
|||
"tx Management::Agent::API::node_dispatch_request %s %s to %s",
|
||||
areq$id, action, name));
|
||||
|
||||
Broker::publish(agent_topic, Management::Agent::API::node_dispatch_request, areq$id, action);
|
||||
Broker::publish(agent_topic,
|
||||
Management::Agent::API::node_dispatch_request,
|
||||
areq$id, action, nodes_final);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,13 @@ export {
|
|||
## reqid: a request identifier string, echoed in the response event.
|
||||
##
|
||||
## action: the requested dispatch command, with any arguments.
|
||||
global node_dispatch_request: event(reqid: string, action: vector of string);
|
||||
##
|
||||
## nodes: the cluster node names this dispatch targets. An empty set,
|
||||
## supplied by default, means it applies to all nodes. Since nodes
|
||||
## receive all dispatch requests, they can use any node names provided
|
||||
## here to filter themselves out of responding.
|
||||
global node_dispatch_request: event(reqid: string, action: vector of string,
|
||||
nodes: set[string] &default=set());
|
||||
|
||||
## Response to a node_dispatch_request event. The nodes send this back
|
||||
## to the agent. This is the agent-node equivalent of
|
||||
|
|
|
@ -51,9 +51,18 @@ global g_dispatch_table: table[string] of DispatchCallback = {
|
|||
["get_id_value"] = dispatch_get_id_value,
|
||||
};
|
||||
|
||||
event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string)
|
||||
event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string,
|
||||
nodes: set[string])
|
||||
{
|
||||
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s", reqid, action));
|
||||
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes));
|
||||
|
||||
if ( |nodes| > 0 && Cluster::node !in nodes )
|
||||
{
|
||||
Management::Log::debug(fmt(
|
||||
"dispatch %s not targeting this node (%s !in %s), skipping",
|
||||
reqid, Cluster::node, nodes));
|
||||
return;
|
||||
}
|
||||
|
||||
local res = Management::Result(
|
||||
$reqid = reqid, $node = Cluster::node);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue