mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Management framework: allow dispatching "actions" on cluster nodes.
This adds request/response event pairs to enable the controller to dispatch "actions" (pre-implemented Zeek script actions) on subsets of Zeek cluster nodes and collect the results. Using generic events to carry multiple such "run X on the nodes" scenarios simplifies adding these in the future.
This commit is contained in:
parent
0020cc4af0
commit
788348f9d6
6 changed files with 297 additions and 32 deletions
|
@ -36,6 +36,25 @@ export {
|
|||
requests: set[string] &default=set();
|
||||
};
|
||||
|
||||
## Request state for node dispatch requests, to track the requested
|
||||
## action and received responses. Node dispatches are requests to
|
||||
## execute pre-implemented actions on every node in the cluster,
|
||||
## and report their outcomes. See
|
||||
## :zeek:see:`Management::Agent::API::node_dispatch_request` and
|
||||
## :zeek:see:`Management::Agent::API::node_dispatch_response` for the
|
||||
## agent/controller interaction.
|
||||
type NodeDispatchState: record {
|
||||
## The dispatched action. The first string is a command,
|
||||
## any remaining strings its arguments.
|
||||
action: vector of string;
|
||||
|
||||
## Request state for every controller/agent transaction.
|
||||
## The set of strings tracks the node names from which
|
||||
## we still expect responses, before we can respond back
|
||||
## to the client.
|
||||
requests: set[string] &default=set();
|
||||
};
|
||||
|
||||
## Dummy state for internal state-keeping test cases.
|
||||
type TestState: record { };
|
||||
}
|
||||
|
@ -43,6 +62,7 @@ export {
|
|||
redef record Management::Request::Request += {
|
||||
set_configuration_state: SetConfigurationState &optional;
|
||||
get_nodes_state: GetNodesState &optional;
|
||||
node_dispatch_state: NodeDispatchState &optional;
|
||||
test_state: TestState &optional;
|
||||
};
|
||||
|
||||
|
@ -485,7 +505,7 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme
|
|||
if ( Management::Request::is_null(areq) )
|
||||
return;
|
||||
|
||||
# Release the request, which is now done.
|
||||
# Release the request, since this agent is now done.
|
||||
Management::Request::finish(areq$id);
|
||||
|
||||
# Find the original request from the client
|
||||
|
@ -554,22 +574,75 @@ event Management::Controller::API::get_nodes_request(reqid: string)
|
|||
}
|
||||
}
|
||||
|
||||
event Management::Agent::API::node_dispatch_response(reqid: string, results: Management::ResultVec)
|
||||
{
|
||||
Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_response %s", reqid));
|
||||
|
||||
# Retrieve state for the request we just got a response to
|
||||
local areq = Management::Request::lookup(reqid);
|
||||
if ( Management::Request::is_null(areq) )
|
||||
return;
|
||||
|
||||
# Release the request, since this agent is now done.
|
||||
Management::Request::finish(areq$id);
|
||||
|
||||
# Find the original request from the client
|
||||
local req = Management::Request::lookup(areq$parent_id);
|
||||
if ( Management::Request::is_null(req) )
|
||||
return;
|
||||
|
||||
# Add this agent's results to the overall response
|
||||
for ( i in results )
|
||||
{
|
||||
# Same special treatment for Broker values that are of
|
||||
# type "any": confirm their (known) type here.
|
||||
switch req$node_dispatch_state$action[0]
|
||||
{
|
||||
default:
|
||||
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||
req$node_dispatch_state$action[0]));
|
||||
break;
|
||||
}
|
||||
|
||||
req$results[|req$results|] = results[i];
|
||||
}
|
||||
|
||||
# Mark this request as done
|
||||
if ( areq$id in req$node_dispatch_state$requests )
|
||||
delete req$node_dispatch_state$requests[areq$id];
|
||||
|
||||
# If we still have pending queries out to the agents, do nothing: we'll
|
||||
# handle this soon, or our request will time out and we respond with
|
||||
# error.
|
||||
if ( |req$node_dispatch_state$requests| > 0 )
|
||||
return;
|
||||
|
||||
# Send response event to the client based upon the dispatch type.
|
||||
switch req$node_dispatch_state$action[0]
|
||||
{
|
||||
default:
|
||||
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||
req$node_dispatch_state$action[0]));
|
||||
break;
|
||||
}
|
||||
|
||||
Management::Request::finish(req$id);
|
||||
}
|
||||
|
||||
event Management::Request::request_expired(req: Management::Request::Request)
|
||||
{
|
||||
# Various handlers for timed-out request state. We use the state members
|
||||
# to identify how to respond. No need to clean up the request itself,
|
||||
# since we're getting here via the request module's expiration
|
||||
# mechanism that handles the cleanup.
|
||||
local res: Management::Result;
|
||||
local res = Management::Result($reqid=req$id,
|
||||
$success = F,
|
||||
$error = "request timed out");
|
||||
|
||||
if ( req?$set_configuration_state )
|
||||
{
|
||||
# This timeout means we no longer have a pending request.
|
||||
g_config_reqid_pending = "";
|
||||
|
||||
res = Management::Result($reqid=req$id);
|
||||
res$success = F;
|
||||
res$error = "request timed out";
|
||||
req$results += res;
|
||||
|
||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
||||
|
@ -579,9 +652,6 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
|||
|
||||
if ( req?$get_nodes_state )
|
||||
{
|
||||
res = Management::Result($reqid=req$id);
|
||||
res$success = F;
|
||||
res$error = "request timed out";
|
||||
req$results += res;
|
||||
|
||||
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
|
||||
|
@ -589,12 +659,21 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
|||
event Management::Controller::API::get_nodes_response(req$id, req$results);
|
||||
}
|
||||
|
||||
if ( req?$node_dispatch_state )
|
||||
{
|
||||
req$results += res;
|
||||
|
||||
switch req$node_dispatch_state$action[0]
|
||||
{
|
||||
default:
|
||||
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||
req$node_dispatch_state$action[0]));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ( req?$test_state )
|
||||
{
|
||||
res = Management::Result($reqid=req$id);
|
||||
res$success = F;
|
||||
res$error = "request timed out";
|
||||
|
||||
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
|
||||
event Management::Controller::API::test_timeout_response(req$id, res);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue