mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge branch 'topic/christian/management-print-cmd'
* topic/christian/management-print-cmd: Management framework: bump external testsuite Management framework: allow selecting cluster nodes in get_id_value Management framework: minor tweaks to logging component Management framework: bump zeek-client to pull in get-id-value command Avoid whitespace around function type strings in JSON rendering Management framework: improve handling of node run states Management framework: add get_id_value dispatch Management framework: allow dispatching "actions" on cluster nodes. Management framework: some renaming to avoid the term "data cluster" Management framework: allow agents to communicate with cluster nodes
This commit is contained in:
commit
2d73edb04c
18 changed files with 740 additions and 69 deletions
18
CHANGES
18
CHANGES
|
@ -1,3 +1,21 @@
|
||||||
|
5.0.0-dev.277 | 2022-04-18 16:38:27 -0700
|
||||||
|
|
||||||
|
* Management framework updates (Christian Kreibich, Corelight)
|
||||||
|
|
||||||
|
- bump external testsuite
|
||||||
|
- allow selecting cluster nodes in get_id_value
|
||||||
|
- minor tweaks to logging component
|
||||||
|
- bump zeek-client to pull in get-id-value command
|
||||||
|
- improve handling of node run states
|
||||||
|
- add get_id_value dispatch
|
||||||
|
- allow dispatching "actions" on cluster nodes.
|
||||||
|
- some renaming to avoid the term "data cluster"
|
||||||
|
- allow agents to communicate with cluster nodes
|
||||||
|
|
||||||
|
* Avoid whitespace around function type strings in JSON rendering (Christian Kreibich, Corelight)
|
||||||
|
|
||||||
|
* Disable TSan CI task temporarily while we sort out some intermittent test failures (Tim Wojtulewicz, Corelight)
|
||||||
|
|
||||||
5.0.0-dev.265 | 2022-04-18 12:45:08 -0700
|
5.0.0-dev.265 | 2022-04-18 12:45:08 -0700
|
||||||
|
|
||||||
* state-holding fix: track unique identifiers for Func's in CompHash's, not Func's themselves (Vern Paxson, Corelight)
|
* state-holding fix: track unique identifiers for Func's in CompHash's, not Func's themselves (Vern Paxson, Corelight)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
5.0.0-dev.265
|
5.0.0-dev.277
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit c2af7381c584b6545517843872747598bb0e25d5
|
Subproject commit a08d9978ac6ff6481ad1e6b18f0376568c08f8c1
|
|
@ -60,8 +60,41 @@ export {
|
||||||
## records, covering the nodes at this instance. The result may also
|
## records, covering the nodes at this instance. The result may also
|
||||||
## indicate failure, with error messages indicating what went wrong.
|
## indicate failure, with error messages indicating what went wrong.
|
||||||
##
|
##
|
||||||
global get_nodes_response: event(reqid: string,
|
global get_nodes_response: event(reqid: string, result: Management::Result);
|
||||||
result: Management::Result);
|
|
||||||
|
|
||||||
|
## The controller sends this to every agent to request a dispatch (the
|
||||||
|
## execution of a pre-implemented activity) to all cluster nodes. This
|
||||||
|
## is the generic controller-agent "back-end" implementation of explicit
|
||||||
|
## client-controller "front-end" interactions, including:
|
||||||
|
##
|
||||||
|
## - :zeek:see:`Management::Controller::API::get_id_value_request`: two
|
||||||
|
## arguments, the first being "get_id_value" and the second the name
|
||||||
|
## of the ID to look up.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
## action: the requested dispatch command, with any arguments.
|
||||||
|
##
|
||||||
|
## nodes: a set of cluster node names (e.g. "worker-01") to retrieve
|
||||||
|
## the values from. An empty set, supplied by default, means
|
||||||
|
## retrieval from all nodes managed by the agent.
|
||||||
|
global node_dispatch_request: event(reqid: string, action: vector of string,
|
||||||
|
nodes: set[string] &default=set());
|
||||||
|
|
||||||
|
## Response to a node_dispatch_request event. Each agent sends this back
|
||||||
|
## to the controller to report the dispatch outcomes on all nodes managed
|
||||||
|
## by that agent.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result`
|
||||||
|
## records. Each record covers one Zeek cluster node managed by this
|
||||||
|
## agent. Upon success, each :zeek:see:`Management::Result` record's
|
||||||
|
## data member contains the dispatches' response in a data type
|
||||||
|
## appropriate for the respective dispatch.
|
||||||
|
global node_dispatch_response: event(reqid: string, result: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
## The controller sends this event to confirm to the agent that it is
|
## The controller sends this event to confirm to the agent that it is
|
||||||
## part of the current cluster topology. The agent acknowledges with the
|
## part of the current cluster topology. The agent acknowledges with the
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
|
|
||||||
@load base/frameworks/broker
|
@load base/frameworks/broker
|
||||||
@load policy/frameworks/management
|
@load policy/frameworks/management
|
||||||
|
@load policy/frameworks/management/node/api
|
||||||
|
@load policy/frameworks/management/node/config
|
||||||
|
|
||||||
@load ./api
|
@load ./api
|
||||||
@load ./config
|
@load ./config
|
||||||
|
@ -19,10 +21,22 @@ export {
|
||||||
type SupervisorState: record {
|
type SupervisorState: record {
|
||||||
node: string; ##< Name of the node the Supervisor is acting on.
|
node: string; ##< Name of the node the Supervisor is acting on.
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Request state for node dispatches, tracking the requested action
|
||||||
|
## as well as received responses.
|
||||||
|
type NodeDispatchState: record {
|
||||||
|
## The dispatched action. The first string is a command,
|
||||||
|
## any remaining strings its arguments.
|
||||||
|
action: vector of string;
|
||||||
|
|
||||||
|
## Request state for every node managed by this agent.
|
||||||
|
requests: set[string] &default=set();
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
redef record Management::Request::Request += {
|
redef record Management::Request::Request += {
|
||||||
supervisor_state: SupervisorState &optional;
|
supervisor_state: SupervisorState &optional;
|
||||||
|
node_dispatch_state: NodeDispatchState &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
# Tag our logs correctly
|
# Tag our logs correctly
|
||||||
|
@ -37,10 +51,10 @@ global g_instances: table[string] of Management::Instance;
|
||||||
# A map for the nodes we run on this instance, via this agent.
|
# A map for the nodes we run on this instance, via this agent.
|
||||||
global g_nodes: table[string] of Management::Node;
|
global g_nodes: table[string] of Management::Node;
|
||||||
|
|
||||||
# The node map employed by the supervisor to describe the cluster
|
# The complete node map employed by the supervisor to describe the cluster
|
||||||
# topology to newly forked nodes. We refresh it when we receive
|
# topology to newly forked nodes. We refresh it when we receive new
|
||||||
# new configurations.
|
# configurations.
|
||||||
global g_data_cluster: table[string] of Supervisor::ClusterEndpoint;
|
global g_cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||||
|
|
||||||
|
|
||||||
event SupervisorControl::create_response(reqid: string, result: string)
|
event SupervisorControl::create_response(reqid: string, result: string)
|
||||||
|
@ -118,11 +132,10 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
|
||||||
for ( nodename in g_nodes )
|
for ( nodename in g_nodes )
|
||||||
supervisor_destroy(nodename);
|
supervisor_destroy(nodename);
|
||||||
|
|
||||||
|
# Refresh the cluster and nodes tables
|
||||||
g_nodes = table();
|
g_nodes = table();
|
||||||
|
g_cluster = table();
|
||||||
|
|
||||||
# Refresh the data cluster and nodes tables
|
|
||||||
|
|
||||||
g_data_cluster = table();
|
|
||||||
for ( node in config$nodes )
|
for ( node in config$nodes )
|
||||||
{
|
{
|
||||||
if ( node$instance == Management::Agent::name )
|
if ( node$instance == Management::Agent::name )
|
||||||
|
@ -144,7 +157,7 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
|
||||||
if ( node?$interface )
|
if ( node?$interface )
|
||||||
cep$interface = node$interface;
|
cep$interface = node$interface;
|
||||||
|
|
||||||
g_data_cluster[node$name] = cep;
|
g_cluster[node$name] = cep;
|
||||||
}
|
}
|
||||||
|
|
||||||
# Apply the new configuration via the supervisor
|
# Apply the new configuration via the supervisor
|
||||||
|
@ -152,6 +165,8 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
|
||||||
for ( nodename in g_nodes )
|
for ( nodename in g_nodes )
|
||||||
{
|
{
|
||||||
node = g_nodes[nodename];
|
node = g_nodes[nodename];
|
||||||
|
node$state = Management::PENDING;
|
||||||
|
|
||||||
nc = Supervisor::NodeConfig($name=nodename);
|
nc = Supervisor::NodeConfig($name=nodename);
|
||||||
|
|
||||||
if ( Management::Agent::cluster_directory != "" )
|
if ( Management::Agent::cluster_directory != "" )
|
||||||
|
@ -166,10 +181,15 @@ event Management::Agent::API::set_configuration_request(reqid: string, config: M
|
||||||
if ( node?$env )
|
if ( node?$env )
|
||||||
nc$env = node$env;
|
nc$env = node$env;
|
||||||
|
|
||||||
|
# Always add the policy/management/node scripts to any cluster
|
||||||
|
# node, since we require it to be able to communicate with the
|
||||||
|
# node.
|
||||||
|
nc$scripts[|nc$scripts|] = "policy/frameworks/management/node";
|
||||||
|
|
||||||
# XXX could use options to enable per-node overrides for
|
# XXX could use options to enable per-node overrides for
|
||||||
# directory, stdout, stderr, others?
|
# directory, stdout, stderr, others?
|
||||||
|
|
||||||
nc$cluster = g_data_cluster;
|
nc$cluster = g_cluster;
|
||||||
supervisor_create(nc);
|
supervisor_create(nc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +229,7 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
|
||||||
local cns = Management::NodeStatus(
|
local cns = Management::NodeStatus(
|
||||||
$node=node, $state=Management::PENDING);
|
$node=node, $state=Management::PENDING);
|
||||||
|
|
||||||
# Identify the role of the node. For data cluster roles (worker,
|
# Identify the role of the node. For cluster roles (worker,
|
||||||
# manager, etc) we derive this from the cluster node table. For
|
# manager, etc) we derive this from the cluster node table. For
|
||||||
# agent and controller, we identify via environment variables
|
# agent and controller, we identify via environment variables
|
||||||
# that the controller framework establishes upon creation (see
|
# that the controller framework establishes upon creation (see
|
||||||
|
@ -218,6 +238,11 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
|
||||||
{
|
{
|
||||||
cns$cluster_role = sns$node$cluster[node]$role;
|
cns$cluster_role = sns$node$cluster[node]$role;
|
||||||
|
|
||||||
|
# For cluster nodes, copy run state from g_nodes, our
|
||||||
|
# live node status table.
|
||||||
|
if ( node in g_nodes )
|
||||||
|
cns$state = g_nodes[node]$state;
|
||||||
|
|
||||||
# The supervisor's responses use 0/tcp (not 0/unknown)
|
# The supervisor's responses use 0/tcp (not 0/unknown)
|
||||||
# when indicating an unused port because its internal
|
# when indicating an unused port because its internal
|
||||||
# serialization always assumes TCP.
|
# serialization always assumes TCP.
|
||||||
|
@ -232,12 +257,22 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
|
||||||
if ( role == "CONTROLLER" )
|
if ( role == "CONTROLLER" )
|
||||||
{
|
{
|
||||||
cns$mgmt_role = Management::CONTROLLER;
|
cns$mgmt_role = Management::CONTROLLER;
|
||||||
|
|
||||||
|
# Automatically declare the controller in running state
|
||||||
|
# here -- we'd not have received a request that brought
|
||||||
|
# us here otherwise.
|
||||||
|
cns$state = Management::RUNNING;
|
||||||
|
|
||||||
# The controller always listens, so the Zeek client can connect.
|
# The controller always listens, so the Zeek client can connect.
|
||||||
cns$p = Management::Agent::endpoint_info()$network$bound_port;
|
cns$p = Management::Agent::endpoint_info()$network$bound_port;
|
||||||
}
|
}
|
||||||
else if ( role == "AGENT" )
|
else if ( role == "AGENT" )
|
||||||
{
|
{
|
||||||
cns$mgmt_role = Management::AGENT;
|
cns$mgmt_role = Management::AGENT;
|
||||||
|
|
||||||
|
# Similarly to above, always declare agent running. We are. :)
|
||||||
|
cns$state = Management::RUNNING;
|
||||||
|
|
||||||
# If we have a controller address, the agent connects to it
|
# If we have a controller address, the agent connects to it
|
||||||
# and does not listen. See zeek_init() below for similar logic.
|
# and does not listen. See zeek_init() below for similar logic.
|
||||||
if ( Management::Agent::controller$address == "0.0.0.0" )
|
if ( Management::Agent::controller$address == "0.0.0.0" )
|
||||||
|
@ -249,13 +284,9 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# A PID is available if a supervised node has fully launched
|
# A PID is available if a supervised node has fully launched.
|
||||||
# and is therefore running.
|
|
||||||
if ( sns?$pid )
|
if ( sns?$pid )
|
||||||
{
|
|
||||||
cns$pid = sns$pid;
|
cns$pid = sns$pid;
|
||||||
cns$state = Management::RUNNING;
|
|
||||||
}
|
|
||||||
|
|
||||||
node_statuses += cns;
|
node_statuses += cns;
|
||||||
}
|
}
|
||||||
|
@ -278,6 +309,166 @@ event Management::Agent::API::get_nodes_request(reqid: string)
|
||||||
Management::Log::info(fmt("issued supervisor status, %s", req$id));
|
Management::Log::info(fmt("issued supervisor status, %s", req$id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Management::Node::API::node_dispatch_response(reqid: string, result: Management::Result)
|
||||||
|
{
|
||||||
|
local node = "unknown node";
|
||||||
|
if ( result?$node )
|
||||||
|
node = result$node;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_response %s from %s", reqid, node));
|
||||||
|
|
||||||
|
# Retrieve state for the request we just got a response to
|
||||||
|
local nreq = Management::Request::lookup(reqid);
|
||||||
|
if ( Management::Request::is_null(nreq) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Find the original request from the controller
|
||||||
|
local req = Management::Request::lookup(nreq$parent_id);
|
||||||
|
if ( Management::Request::is_null(req) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Mark the responding node as done. Nodes normally fill their own name
|
||||||
|
# into the result; we only double-check for resilience. Nodes failing to
|
||||||
|
# report themselves would eventually lead to request timeout.
|
||||||
|
if ( result?$node )
|
||||||
|
{
|
||||||
|
if ( result$node in req$node_dispatch_state$requests )
|
||||||
|
delete req$node_dispatch_state$requests[result$node];
|
||||||
|
else
|
||||||
|
{
|
||||||
|
# An unknown or duplicate response -- do nothing.
|
||||||
|
Management::Log::debug(fmt("response %s not expected, ignoring", reqid));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# The usual special treatment for Broker values that are of type "any":
|
||||||
|
# confirm their type here based on the requested dispatch command.
|
||||||
|
switch req$node_dispatch_state$action[0]
|
||||||
|
{
|
||||||
|
case "get_id_value":
|
||||||
|
if ( result?$data )
|
||||||
|
result$data = result$data as string;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||||
|
req$node_dispatch_state$action[0]));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
# The result has the reporting node filled in but not the agent/instance
|
||||||
|
# (which the node doesn't know about), so add it now.
|
||||||
|
result$instance = Management::Agent::instance()$name;
|
||||||
|
|
||||||
|
# Add this result to the overall response
|
||||||
|
req$results[|req$results|] = result;
|
||||||
|
|
||||||
|
# If we still have pending queries out to the agents, do nothing: we'll
|
||||||
|
# handle this soon, or our request will time out and we respond with
|
||||||
|
# error.
|
||||||
|
if ( |req$node_dispatch_state$requests| > 0 )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Release the agent-nodes request state, since we now have all responses.
|
||||||
|
Management::Request::finish(nreq$id);
|
||||||
|
|
||||||
|
# Send response event back to controller and clean up main request state.
|
||||||
|
Management::Log::info(fmt("tx Management::Agent::API::node_dispatch_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
event Management::Agent::API::node_dispatch_response(req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Agent::API::node_dispatch_request(reqid: string, action: vector of string, nodes: set[string])
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_request %s %s %s", reqid, action, nodes));
|
||||||
|
|
||||||
|
local node: string;
|
||||||
|
local cluster_nodes: set[string];
|
||||||
|
local nodes_final: set[string];
|
||||||
|
|
||||||
|
for ( node in g_nodes )
|
||||||
|
add cluster_nodes[node];
|
||||||
|
|
||||||
|
# If this request includes cluster nodes to query, check if this agent
|
||||||
|
# manages any of those nodes. If it doesn't, respond with an empty
|
||||||
|
# results vector immediately. Note that any globally unknown nodes
|
||||||
|
# that the client might have requested already got filtered by the
|
||||||
|
# controller, so we don't need to worry about them here.
|
||||||
|
|
||||||
|
if ( |nodes| > 0 )
|
||||||
|
{
|
||||||
|
nodes_final = nodes & cluster_nodes;
|
||||||
|
|
||||||
|
if ( |nodes_final| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::node_dispatch_response %s, no node overlap",
|
||||||
|
reqid));
|
||||||
|
event Management::Agent::API::node_dispatch_response(reqid, vector());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ( |g_nodes| == 0 )
|
||||||
|
{
|
||||||
|
# Special case: the client did not request specific nodes. If
|
||||||
|
# we aren't running any nodes, respond right away, since there's
|
||||||
|
# nothing to dispatch to.
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::node_dispatch_response %s, no nodes registered",
|
||||||
|
reqid));
|
||||||
|
event Management::Agent::API::node_dispatch_response(reqid, vector());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
# We send to all known nodes.
|
||||||
|
nodes_final = cluster_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
local res: Management::Result;
|
||||||
|
local req = Management::Request::create(reqid);
|
||||||
|
|
||||||
|
req$node_dispatch_state = NodeDispatchState($action=action);
|
||||||
|
|
||||||
|
# Build up dispatch state for tracking responses. We only dispatch to
|
||||||
|
# nodes that are in state RUNNING, as those have confirmed they're ready
|
||||||
|
# to communicate. For others, establish error state in now.
|
||||||
|
for ( node in nodes_final )
|
||||||
|
{
|
||||||
|
if ( g_nodes[node]$state == Management::RUNNING )
|
||||||
|
add req$node_dispatch_state$requests[node];
|
||||||
|
else
|
||||||
|
{
|
||||||
|
res = Management::Result($reqid=reqid, $node=node);
|
||||||
|
res$success = F;
|
||||||
|
res$error = fmt("cluster node %s not in runnning state", node);
|
||||||
|
req$results += res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Corner case: nothing is in state RUNNING.
|
||||||
|
if ( |req$node_dispatch_state$requests| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::node_dispatch_response %s, no nodes running",
|
||||||
|
reqid));
|
||||||
|
event Management::Agent::API::node_dispatch_response(reqid, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
# We use a single request record to track all node responses, and a
|
||||||
|
# single event that Broker publishes to all nodes. We know when all
|
||||||
|
# nodes have responded by checking the requests set we built up above.
|
||||||
|
local nreq = Management::Request::create();
|
||||||
|
nreq$parent_id = reqid;
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_request %s %s", nreq$id, action));
|
||||||
|
Broker::publish(Management::Node::node_topic,
|
||||||
|
Management::Node::API::node_dispatch_request, nreq$id, action, nodes);
|
||||||
|
}
|
||||||
|
|
||||||
event Management::Agent::API::agent_welcome_request(reqid: string)
|
event Management::Agent::API::agent_welcome_request(reqid: string)
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid));
|
Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid));
|
||||||
|
@ -311,6 +502,14 @@ event Management::Agent::API::agent_standby_request(reqid: string)
|
||||||
event Management::Agent::API::agent_standby_response(reqid, res);
|
event Management::Agent::API::agent_standby_response(reqid, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Management::Node::API::notify_node_hello(node: string)
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Node::API::notify_node_hello %s", node));
|
||||||
|
|
||||||
|
if ( node in g_nodes )
|
||||||
|
g_nodes[node]$state = Management::RUNNING;
|
||||||
|
}
|
||||||
|
|
||||||
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
{
|
{
|
||||||
# This does not (cannot?) immediately verify that the new peer
|
# This does not (cannot?) immediately verify that the new peer
|
||||||
|
@ -342,28 +541,39 @@ event zeek_init()
|
||||||
|
|
||||||
Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry);
|
Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry);
|
||||||
|
|
||||||
# Agents need receive communication targeted at it, and any responses
|
# Agents need receive communication targeted at it, any responses
|
||||||
# from the supervisor.
|
# from the supervisor, and any responses from cluster nodes.
|
||||||
Broker::subscribe(agent_topic);
|
Broker::subscribe(agent_topic);
|
||||||
Broker::subscribe(SupervisorControl::topic_prefix);
|
Broker::subscribe(SupervisorControl::topic_prefix);
|
||||||
|
Broker::subscribe(Management::Node::node_topic);
|
||||||
|
|
||||||
# Auto-publish a bunch of events. Glob patterns or module-level
|
# Auto-publish a bunch of events. Glob patterns or module-level
|
||||||
# auto-publish would be helpful here.
|
# auto-publish would be helpful here.
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::get_nodes_response);
|
local agent_to_controller_events: vector of any = [
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::set_configuration_response);
|
Management::Agent::API::get_nodes_response,
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::agent_welcome_response);
|
Management::Agent::API::set_configuration_response,
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::agent_standby_response);
|
Management::Agent::API::agent_welcome_response,
|
||||||
|
Management::Agent::API::agent_standby_response,
|
||||||
|
Management::Agent::API::node_dispatch_response,
|
||||||
|
Management::Agent::API::notify_agent_hello,
|
||||||
|
Management::Agent::API::notify_change,
|
||||||
|
Management::Agent::API::notify_error,
|
||||||
|
Management::Agent::API::notify_log
|
||||||
|
];
|
||||||
|
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::notify_agent_hello);
|
for ( i in agent_to_controller_events )
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::notify_change);
|
Broker::auto_publish(agent_topic, agent_to_controller_events[i]);
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::notify_error);
|
|
||||||
Broker::auto_publish(agent_topic, Management::Agent::API::notify_log);
|
|
||||||
|
|
||||||
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request);
|
local agent_to_sup_events: vector of any = [
|
||||||
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request);
|
SupervisorControl::create_request,
|
||||||
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request);
|
SupervisorControl::status_request,
|
||||||
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request);
|
SupervisorControl::destroy_request,
|
||||||
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request);
|
SupervisorControl::restart_request,
|
||||||
|
SupervisorControl::stop_request
|
||||||
|
];
|
||||||
|
|
||||||
|
for ( i in agent_to_sup_events )
|
||||||
|
Broker::auto_publish(SupervisorControl::topic_prefix, agent_to_sup_events[i]);
|
||||||
|
|
||||||
# Establish connectivity with the controller.
|
# Establish connectivity with the controller.
|
||||||
if ( Management::Agent::controller$address != "0.0.0.0" )
|
if ( Management::Agent::controller$address != "0.0.0.0" )
|
||||||
|
@ -373,11 +583,10 @@ event zeek_init()
|
||||||
Management::Agent::controller$bound_port,
|
Management::Agent::controller$bound_port,
|
||||||
Management::connect_retry);
|
Management::connect_retry);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
# The agent always listens, to allow cluster nodes to peer with it.
|
||||||
# Controller connects to us; listen for it.
|
# If the controller connects to us, it also uses this port.
|
||||||
Broker::listen(cat(epi$network$address), epi$network$bound_port);
|
Broker::listen(cat(epi$network$address), epi$network$bound_port);
|
||||||
}
|
|
||||||
|
|
||||||
Management::Log::info("agent is live");
|
Management::Log::info("agent is live");
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ export {
|
||||||
##
|
##
|
||||||
## reqid: the request identifier used in the request event.
|
## reqid: the request identifier used in the request event.
|
||||||
##
|
##
|
||||||
## result: a :zeek:type`vector` of :zeek:see:`Management::Result`
|
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result`
|
||||||
## records. Each record covers one cluster instance. Each record's data
|
## records. Each record covers one cluster instance. Each record's data
|
||||||
## member is a vector of :zeek:see:`Management::NodeStatus`
|
## member is a vector of :zeek:see:`Management::NodeStatus`
|
||||||
## records, covering the nodes at that instance. Results may also indicate
|
## records, covering the nodes at that instance. Results may also indicate
|
||||||
|
@ -80,6 +80,36 @@ export {
|
||||||
result: Management::ResultVec);
|
result: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
|
## zeek-client sends this event to retrieve the current value of a
|
||||||
|
## variable in Zeek's global namespace, referenced by the given
|
||||||
|
## identifier (i.e., variable name). The controller asks all agents
|
||||||
|
## to retrieve this value from each cluster node, accumulates the
|
||||||
|
## returned responses, and responds with a get_id_value_response
|
||||||
|
## event back to the client.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
## id: the name of the variable whose value to retrieve.
|
||||||
|
##
|
||||||
|
## nodes: a set of cluster node names (e.g. "worker-01") to retrieve
|
||||||
|
## the values from. An empty set, supplied by default, means
|
||||||
|
## retrieval from all current cluster nodes.
|
||||||
|
global get_id_value_request: event(reqid: string, id: string,
|
||||||
|
nodes: set[string] &default=set());
|
||||||
|
|
||||||
|
## Response to a get_id_value_request event. The controller sends this
|
||||||
|
## back to the client.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## result: a :zeek:type:`vector` of :zeek:see:`Management::Result`
|
||||||
|
## records. Each record covers one Zeek cluster node. Each record's
|
||||||
|
## data field contains a string with the JSON rendering (as produced
|
||||||
|
## by :zeek:id:`to_json`, including the error strings it potentially
|
||||||
|
## returns).
|
||||||
|
global get_id_value_response: event(reqid: string, result: Management::ResultVec);
|
||||||
|
|
||||||
|
|
||||||
# Testing events. These don't provide operational value but expose
|
# Testing events. These don't provide operational value but expose
|
||||||
# internal functionality, triggered by test cases.
|
# internal functionality, triggered by test cases.
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,29 @@ export {
|
||||||
requests: set[string] &default=set();
|
requests: set[string] &default=set();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Request state for node dispatch requests, to track the requested
|
||||||
|
## action and received responses. Node dispatches are requests to
|
||||||
|
## execute pre-implemented actions on every node in the cluster,
|
||||||
|
## and report their outcomes. See
|
||||||
|
## :zeek:see:`Management::Agent::API::node_dispatch_request` and
|
||||||
|
## :zeek:see:`Management::Agent::API::node_dispatch_response` for the
|
||||||
|
## agent/controller interaction, and
|
||||||
|
## :zeek:see:`Management::Controller::API::get_id_value_request` and
|
||||||
|
## :zeek:see:`Management::Controller::API::get_id_value_response`
|
||||||
|
## for an example of a specific API the controller generalizes into
|
||||||
|
## a dispatch.
|
||||||
|
type NodeDispatchState: record {
|
||||||
|
## The dispatched action. The first string is a command,
|
||||||
|
## any remaining strings its arguments.
|
||||||
|
action: vector of string;
|
||||||
|
|
||||||
|
## Request state for every controller/agent transaction.
|
||||||
|
## The set of strings tracks the node names from which
|
||||||
|
## we still expect responses, before we can respond back
|
||||||
|
## to the client.
|
||||||
|
requests: set[string] &default=set();
|
||||||
|
};
|
||||||
|
|
||||||
## Dummy state for internal state-keeping test cases.
|
## Dummy state for internal state-keeping test cases.
|
||||||
type TestState: record { };
|
type TestState: record { };
|
||||||
}
|
}
|
||||||
|
@ -43,6 +66,7 @@ export {
|
||||||
redef record Management::Request::Request += {
|
redef record Management::Request::Request += {
|
||||||
set_configuration_state: SetConfigurationState &optional;
|
set_configuration_state: SetConfigurationState &optional;
|
||||||
get_nodes_state: GetNodesState &optional;
|
get_nodes_state: GetNodesState &optional;
|
||||||
|
node_dispatch_state: NodeDispatchState &optional;
|
||||||
test_state: TestState &optional;
|
test_state: TestState &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -206,6 +230,17 @@ function is_instance_connectivity_change(inst: Management::Instance): bool
|
||||||
return F;
|
return F;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function filter_config_nodes_by_name(nodes: set[string]): set[string]
|
||||||
|
{
|
||||||
|
local res: set[string];
|
||||||
|
local cluster_nodes: set[string];
|
||||||
|
|
||||||
|
for ( node in g_config_current$nodes )
|
||||||
|
add cluster_nodes[node$name];
|
||||||
|
|
||||||
|
return nodes & cluster_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||||
{
|
{
|
||||||
local insts = Management::Util::set_to_vector(instances);
|
local insts = Management::Util::set_to_vector(instances);
|
||||||
|
@ -386,10 +421,10 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
||||||
g_config_reqid_pending = req$id;
|
g_config_reqid_pending = req$id;
|
||||||
|
|
||||||
# Compare the instance configuration to our current one. If it matches,
|
# Compare the instance configuration to our current one. If it matches,
|
||||||
# we can proceed to deploying the new data cluster topology. If it does
|
# we can proceed to deploying the new cluster topology. If it does
|
||||||
# not, we need to establish connectivity with agents we connect to, or
|
# not, we need to establish connectivity with agents we connect to, or
|
||||||
# wait until all instances that connect to us have done so. Either triggers
|
# wait until all instances that connect to us have done so. Either triggers
|
||||||
# a notify_agents_ready event, upon which we then deploy the data cluster.
|
# a notify_agents_ready event, upon which we then deploy the topology.
|
||||||
|
|
||||||
# The current & new set of instance names.
|
# The current & new set of instance names.
|
||||||
local insts_current: set[string];
|
local insts_current: set[string];
|
||||||
|
@ -485,7 +520,7 @@ event Management::Agent::API::get_nodes_response(reqid: string, result: Manageme
|
||||||
if ( Management::Request::is_null(areq) )
|
if ( Management::Request::is_null(areq) )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
# Release the request, which is now done.
|
# Release the request, since this agent is now done.
|
||||||
Management::Request::finish(areq$id);
|
Management::Request::finish(areq$id);
|
||||||
|
|
||||||
# Find the original request from the client
|
# Find the original request from the client
|
||||||
|
@ -554,22 +589,159 @@ event Management::Controller::API::get_nodes_request(reqid: string)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Management::Agent::API::node_dispatch_response(reqid: string, results: Management::ResultVec)
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Agent::API::node_dispatch_response %s", reqid));
|
||||||
|
|
||||||
|
# Retrieve state for the request we just got a response to
|
||||||
|
local areq = Management::Request::lookup(reqid);
|
||||||
|
if ( Management::Request::is_null(areq) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Release the request, since this agent is now done.
|
||||||
|
Management::Request::finish(areq$id);
|
||||||
|
|
||||||
|
# Find the original request from the client
|
||||||
|
local req = Management::Request::lookup(areq$parent_id);
|
||||||
|
if ( Management::Request::is_null(req) )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Add this agent's results to the overall response
|
||||||
|
for ( i in results )
|
||||||
|
{
|
||||||
|
# Same special treatment for Broker values that are of
|
||||||
|
# type "any": confirm their (known) type here.
|
||||||
|
switch req$node_dispatch_state$action[0]
|
||||||
|
{
|
||||||
|
case "get_id_value":
|
||||||
|
if ( results[i]?$data )
|
||||||
|
results[i]$data = results[i]$data as string;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||||
|
req$node_dispatch_state$action[0]));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
req$results[|req$results|] = results[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
# Mark this request as done
|
||||||
|
if ( areq$id in req$node_dispatch_state$requests )
|
||||||
|
delete req$node_dispatch_state$requests[areq$id];
|
||||||
|
|
||||||
|
# If we still have pending queries out to the agents, do nothing: we'll
|
||||||
|
# handle this soon, or our request will time out and we respond with
|
||||||
|
# error.
|
||||||
|
if ( |req$node_dispatch_state$requests| > 0 )
|
||||||
|
return;
|
||||||
|
|
||||||
|
# Send response event to the client based upon the dispatch type.
|
||||||
|
switch req$node_dispatch_state$action[0]
|
||||||
|
{
|
||||||
|
case "get_id_value":
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::get_id_value_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
event Management::Controller::API::get_id_value_response(req$id, req$results);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||||
|
req$node_dispatch_state$action[0]));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Management::Controller::API::get_id_value_request(reqid: string, id: string, nodes: set[string])
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Controller::API::get_id_value_request %s %s", reqid, id));
|
||||||
|
|
||||||
|
# Special case: if we have no instances, respond right away.
|
||||||
|
if ( |g_instances| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("tx Management::Controller::API::get_id_value_response %s", reqid));
|
||||||
|
event Management::Controller::API::get_id_value_response(reqid, vector(
|
||||||
|
Management::Result($reqid=reqid, $success=F, $error="no instances connected")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
local action = vector("get_id_value", id);
|
||||||
|
local req = Management::Request::create(reqid);
|
||||||
|
req$node_dispatch_state = NodeDispatchState($action=action);
|
||||||
|
|
||||||
|
local nodes_final: set[string];
|
||||||
|
local node: string;
|
||||||
|
local res: Management::Result;
|
||||||
|
|
||||||
|
# Input sanitization: check for any requested nodes that aren't part of
|
||||||
|
# the current configuration. We send back error results for those and
|
||||||
|
# don't propagate them to the agents.
|
||||||
|
if ( |nodes| > 0 )
|
||||||
|
{
|
||||||
|
# Requested nodes that are in the current configuration:
|
||||||
|
nodes_final = filter_config_nodes_by_name(nodes);
|
||||||
|
# Requested nodes that are not in current configuration:
|
||||||
|
local nodes_invalid = nodes - nodes_final;
|
||||||
|
|
||||||
|
# Assemble error results for all invalid nodes
|
||||||
|
for ( node in nodes_invalid )
|
||||||
|
{
|
||||||
|
res = Management::Result($reqid=reqid, $node=node);
|
||||||
|
res$success = F;
|
||||||
|
res$error = "unknown cluster node";
|
||||||
|
req$results += res;
|
||||||
|
}
|
||||||
|
|
||||||
|
# If only invalid nodes got requested, we're now done.
|
||||||
|
if ( |nodes_final| == 0 )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::get_id_value_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
event Management::Controller::API::get_id_value_response(req$id, req$results);
|
||||||
|
Management::Request::finish(req$id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Send dispatch requests to all agents, with the final set of nodes
|
||||||
|
for ( name in g_instances )
|
||||||
|
{
|
||||||
|
if ( name !in g_instances_ready )
|
||||||
|
next;
|
||||||
|
|
||||||
|
local agent_topic = Management::Agent::topic_prefix + "/" + name;
|
||||||
|
local areq = Management::Request::create();
|
||||||
|
|
||||||
|
areq$parent_id = req$id;
|
||||||
|
add req$node_dispatch_state$requests[areq$id];
|
||||||
|
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Agent::API::node_dispatch_request %s %s to %s",
|
||||||
|
areq$id, action, name));
|
||||||
|
|
||||||
|
Broker::publish(agent_topic,
|
||||||
|
Management::Agent::API::node_dispatch_request,
|
||||||
|
areq$id, action, nodes_final);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
event Management::Request::request_expired(req: Management::Request::Request)
|
event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
{
|
{
|
||||||
# Various handlers for timed-out request state. We use the state members
|
# Various handlers for timed-out request state. We use the state members
|
||||||
# to identify how to respond. No need to clean up the request itself,
|
# to identify how to respond. No need to clean up the request itself,
|
||||||
# since we're getting here via the request module's expiration
|
# since we're getting here via the request module's expiration
|
||||||
# mechanism that handles the cleanup.
|
# mechanism that handles the cleanup.
|
||||||
local res: Management::Result;
|
local res = Management::Result($reqid=req$id,
|
||||||
|
$success = F,
|
||||||
|
$error = "request timed out");
|
||||||
|
|
||||||
if ( req?$set_configuration_state )
|
if ( req?$set_configuration_state )
|
||||||
{
|
{
|
||||||
# This timeout means we no longer have a pending request.
|
# This timeout means we no longer have a pending request.
|
||||||
g_config_reqid_pending = "";
|
g_config_reqid_pending = "";
|
||||||
|
|
||||||
res = Management::Result($reqid=req$id);
|
|
||||||
res$success = F;
|
|
||||||
res$error = "request timed out";
|
|
||||||
req$results += res;
|
req$results += res;
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
||||||
|
@ -579,9 +751,6 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
|
|
||||||
if ( req?$get_nodes_state )
|
if ( req?$get_nodes_state )
|
||||||
{
|
{
|
||||||
res = Management::Result($reqid=req$id);
|
|
||||||
res$success = F;
|
|
||||||
res$error = "request timed out";
|
|
||||||
req$results += res;
|
req$results += res;
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
|
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
|
||||||
|
@ -589,12 +758,27 @@ event Management::Request::request_expired(req: Management::Request::Request)
|
||||||
event Management::Controller::API::get_nodes_response(req$id, req$results);
|
event Management::Controller::API::get_nodes_response(req$id, req$results);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( req?$node_dispatch_state )
|
||||||
|
{
|
||||||
|
req$results += res;
|
||||||
|
|
||||||
|
switch req$node_dispatch_state$action[0]
|
||||||
|
{
|
||||||
|
case "get_id_value":
|
||||||
|
Management::Log::info(fmt(
|
||||||
|
"tx Management::Controller::API::get_id_value_response %s",
|
||||||
|
Management::Request::to_string(req)));
|
||||||
|
event Management::Controller::API::get_id_value_response(req$id, req$results);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
Management::Log::error(fmt("unexpected dispatch command %s",
|
||||||
|
req$node_dispatch_state$action[0]));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ( req?$test_state )
|
if ( req?$test_state )
|
||||||
{
|
{
|
||||||
res = Management::Result($reqid=req$id);
|
|
||||||
res$success = F;
|
|
||||||
res$error = "request timed out";
|
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
|
Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
|
||||||
event Management::Controller::API::test_timeout_response(req$id, res);
|
event Management::Controller::API::test_timeout_response(req$id, res);
|
||||||
}
|
}
|
||||||
|
@ -638,6 +822,7 @@ event zeek_init()
|
||||||
Management::Controller::API::get_instances_response,
|
Management::Controller::API::get_instances_response,
|
||||||
Management::Controller::API::set_configuration_response,
|
Management::Controller::API::set_configuration_response,
|
||||||
Management::Controller::API::get_nodes_response,
|
Management::Controller::API::get_nodes_response,
|
||||||
|
Management::Controller::API::get_id_value_response,
|
||||||
Management::Controller::API::test_timeout_response
|
Management::Controller::API::test_timeout_response
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,10 @@ export {
|
||||||
|
|
||||||
## The controller/agent log supports four different log levels.
|
## The controller/agent log supports four different log levels.
|
||||||
type Level: enum {
|
type Level: enum {
|
||||||
DEBUG,
|
DEBUG = 10,
|
||||||
INFO,
|
INFO = 20,
|
||||||
WARNING,
|
WARNING = 30,
|
||||||
ERROR,
|
ERROR = 40,
|
||||||
};
|
};
|
||||||
|
|
||||||
## The record type containing the column fields of the agent/controller log.
|
## The record type containing the column fields of the agent/controller log.
|
||||||
|
@ -36,8 +36,9 @@ export {
|
||||||
message: string;
|
message: string;
|
||||||
} &log;
|
} &log;
|
||||||
|
|
||||||
## The log level in use for this node.
|
## The log level in use for this node. This is the minimum
|
||||||
global log_level = DEBUG &redef;
|
## log level required to produce output.
|
||||||
|
global log_level = INFO &redef;
|
||||||
|
|
||||||
## A debug-level log message writer.
|
## A debug-level log message writer.
|
||||||
##
|
##
|
||||||
|
@ -82,6 +83,7 @@ global l2s: table[Level] of string = {
|
||||||
global r2s: table[Management::Role] of string = {
|
global r2s: table[Management::Role] of string = {
|
||||||
[Management::AGENT] = "AGENT",
|
[Management::AGENT] = "AGENT",
|
||||||
[Management::CONTROLLER] = "CONTROLLER",
|
[Management::CONTROLLER] = "CONTROLLER",
|
||||||
|
[Management::NODE] = "NODE",
|
||||||
};
|
};
|
||||||
|
|
||||||
function debug(message: string)
|
function debug(message: string)
|
||||||
|
|
1
scripts/policy/frameworks/management/node/__load__.zeek
Normal file
1
scripts/policy/frameworks/management/node/__load__.zeek
Normal file
|
@ -0,0 +1 @@
|
||||||
|
@load ./main
|
48
scripts/policy/frameworks/management/node/api.zeek
Normal file
48
scripts/policy/frameworks/management/node/api.zeek
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
##! The Management event API of cluster nodes. The API consists of request/
|
||||||
|
##! response event pairs, like elsewhere in the Management, Supervisor, and
|
||||||
|
##! Control frameworks.
|
||||||
|
|
||||||
|
@load policy/frameworks/management/types
|
||||||
|
|
||||||
|
module Management::Node::API;
|
||||||
|
|
||||||
|
export {
|
||||||
|
## Management agents send this event to every Zeek cluster node to run a
|
||||||
|
## "dispatch" -- a particular, pre-implemented action. This is the agent-node
|
||||||
|
## complement to :zeek:see:`Management::Agent::API::node_dispatch_request`.
|
||||||
|
##
|
||||||
|
## reqid: a request identifier string, echoed in the response event.
|
||||||
|
##
|
||||||
|
## action: the requested dispatch command, with any arguments.
|
||||||
|
##
|
||||||
|
## nodes: the cluster node names this dispatch targets. An empty set,
|
||||||
|
## supplied by default, means it applies to all nodes. Since nodes
|
||||||
|
## receive all dispatch requests, they can use any node names provided
|
||||||
|
## here to filter themselves out of responding.
|
||||||
|
global node_dispatch_request: event(reqid: string, action: vector of string,
|
||||||
|
nodes: set[string] &default=set());
|
||||||
|
|
||||||
|
## Response to a node_dispatch_request event. The nodes send this back
|
||||||
|
## to the agent. This is the agent-node equivalent of
|
||||||
|
## :zeek:see:`Management::Agent::API::node_dispatch_response`.
|
||||||
|
##
|
||||||
|
## reqid: the request identifier used in the request event.
|
||||||
|
##
|
||||||
|
## result: a :zeek:see:`Management::Result` record covering one Zeek
|
||||||
|
## cluster node managed by the agent. Upon success, the data field
|
||||||
|
## contains a value appropriate for the requested dispatch.
|
||||||
|
global node_dispatch_response: event(reqid: string, result: Management::Result);
|
||||||
|
|
||||||
|
|
||||||
|
# Notification events, node -> agent
|
||||||
|
|
||||||
|
## The cluster nodes send this event upon peering as a "check-in" to
|
||||||
|
## the agent, to indicate the node is now available to communicate
|
||||||
|
## with. It is an agent-level equivalent of :zeek:see:`Broker::peer_added`,
|
||||||
|
## and similar to :zeek:see:`Management::Agent::API::notify_agent_hello`
|
||||||
|
## for agents.
|
||||||
|
##
|
||||||
|
## node: the name of the node, as given in :zeek:see:`Cluster::node`.
|
||||||
|
##
|
||||||
|
global notify_node_hello: event(node: string);
|
||||||
|
}
|
9
scripts/policy/frameworks/management/node/config.zeek
Normal file
9
scripts/policy/frameworks/management/node/config.zeek
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
##! Configuration settings for nodes controlled by the Management framework.
|
||||||
|
|
||||||
|
module Management::Node;
|
||||||
|
|
||||||
|
export {
|
||||||
|
## The nodes' Broker topic. Cluster nodes automatically subscribe
|
||||||
|
## to it, to receive request events from the Management framework.
|
||||||
|
const node_topic = "zeek/management/node" &redef;
|
||||||
|
}
|
121
scripts/policy/frameworks/management/node/main.zeek
Normal file
121
scripts/policy/frameworks/management/node/main.zeek
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
##! This module provides Management framework functionality present in every
|
||||||
|
##! cluster node, to allowing Management agents to interact with the nodes.
|
||||||
|
|
||||||
|
@load base/frameworks/cluster
|
||||||
|
|
||||||
|
@load policy/frameworks/management/agent/config
|
||||||
|
@load policy/frameworks/management/log
|
||||||
|
|
||||||
|
@load ./api
|
||||||
|
@load ./config
|
||||||
|
|
||||||
|
module Management::Node;
|
||||||
|
|
||||||
|
# Tag our logs correctly
|
||||||
|
redef Management::Log::role = Management::NODE;
|
||||||
|
|
||||||
|
## The type of dispatch callbacks. These implement a particular dispatch action,
|
||||||
|
## using the provided string vector as arguments, filling results into the
|
||||||
|
## provided result record.
|
||||||
|
type DispatchCallback: function(args: vector of string, res: Management::Result);
|
||||||
|
|
||||||
|
## Implementation of the "get_id_value" dispatch. Its only argument is the name
|
||||||
|
## of the ID to look up.
|
||||||
|
function dispatch_get_id_value(args: vector of string, res: Management::Result)
|
||||||
|
{
|
||||||
|
if ( |args| == 0 )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = "get_id_value expects name of global identifier";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
local val = lookup_ID(args[0]);
|
||||||
|
|
||||||
|
# The following lookup_ID() result strings indicate errors:
|
||||||
|
if ( type_name(val) == "string" )
|
||||||
|
{
|
||||||
|
local valstr: string = val;
|
||||||
|
if ( valstr == "<unknown id>" || valstr == "<no ID value>" )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = valstr[1:-1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( res$success )
|
||||||
|
res$data = to_json(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
global g_dispatch_table: table[string] of DispatchCallback = {
|
||||||
|
["get_id_value"] = dispatch_get_id_value,
|
||||||
|
};
|
||||||
|
|
||||||
|
event Management::Node::API::node_dispatch_request(reqid: string, action: vector of string,
|
||||||
|
nodes: set[string])
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("rx Management::Node::API::node_dispatch_request %s %s %s", reqid, action, nodes));
|
||||||
|
|
||||||
|
if ( |nodes| > 0 && Cluster::node !in nodes )
|
||||||
|
{
|
||||||
|
Management::Log::debug(fmt(
|
||||||
|
"dispatch %s not targeting this node (%s !in %s), skipping",
|
||||||
|
reqid, Cluster::node, nodes));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
local res = Management::Result(
|
||||||
|
$reqid = reqid, $node = Cluster::node);
|
||||||
|
|
||||||
|
if ( |action| == 0 )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = "no dispatch arguments provided";
|
||||||
|
}
|
||||||
|
else if ( action[0] !in g_dispatch_table )
|
||||||
|
{
|
||||||
|
res$success = F;
|
||||||
|
res$error = fmt("dispatch %s unknown", action[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( ! res$success )
|
||||||
|
{
|
||||||
|
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s",
|
||||||
|
Management::result_to_string(res)));
|
||||||
|
event Management::Node::API::node_dispatch_response(reqid, res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
g_dispatch_table[action[0]](action[1:], res);
|
||||||
|
|
||||||
|
Management::Log::info(fmt("tx Management::Node::API::node_dispatch_response %s",
|
||||||
|
Management::result_to_string(res)));
|
||||||
|
event Management::Node::API::node_dispatch_response(reqid, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
local epi = Management::Agent::endpoint_info();
|
||||||
|
|
||||||
|
# If this is the agent peering, notify it that we're ready
|
||||||
|
if ( peer$network$address == epi$network$address &&
|
||||||
|
peer$network$bound_port == epi$network$bound_port )
|
||||||
|
event Management::Node::API::notify_node_hello(Cluster::node);
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
local epi = Management::Agent::endpoint_info();
|
||||||
|
|
||||||
|
Broker::peer(epi$network$address, epi$network$bound_port, Management::connect_retry);
|
||||||
|
Broker::subscribe(node_topic);
|
||||||
|
|
||||||
|
# Events automatically sent to the Management agent.
|
||||||
|
local events: vector of any = [
|
||||||
|
Management::Node::API::node_dispatch_response,
|
||||||
|
Management::Node::API::notify_node_hello
|
||||||
|
];
|
||||||
|
|
||||||
|
for ( i in events )
|
||||||
|
Broker::auto_publish(node_topic, events[i]);
|
||||||
|
}
|
|
@ -6,12 +6,13 @@ module Management;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
## Management infrastructure node type. This intentionally does not
|
## Management infrastructure node type. This intentionally does not
|
||||||
## include the data cluster node types (worker, logger, etc) -- those
|
## include the managed cluster node types (worker, logger, etc) -- those
|
||||||
## continue to be managed by the cluster framework.
|
## continue to be managed by the cluster framework.
|
||||||
type Role: enum {
|
type Role: enum {
|
||||||
NONE, ##< No active role in cluster management
|
NONE, ##< No active role in cluster management
|
||||||
AGENT, ##< A cluster management agent.
|
AGENT, ##< A cluster management agent.
|
||||||
CONTROLLER, ##< The cluster's controller.
|
CONTROLLER, ##< The cluster's controller.
|
||||||
|
NODE, ##< A managed cluster node (worker, manager, etc).
|
||||||
};
|
};
|
||||||
|
|
||||||
## A Zeek-side option with value.
|
## A Zeek-side option with value.
|
||||||
|
|
|
@ -24,6 +24,10 @@
|
||||||
@load frameworks/management/__load__.zeek
|
@load frameworks/management/__load__.zeek
|
||||||
@load frameworks/management/config.zeek
|
@load frameworks/management/config.zeek
|
||||||
@load frameworks/management/log.zeek
|
@load frameworks/management/log.zeek
|
||||||
|
# @load frameworks/management/node/__load__.zeek
|
||||||
|
@load frameworks/management/node/api.zeek
|
||||||
|
@load frameworks/management/node/config.zeek
|
||||||
|
# @load frameworks/management/node/main.zeek
|
||||||
@load frameworks/management/request.zeek
|
@load frameworks/management/request.zeek
|
||||||
@load frameworks/management/types.zeek
|
@load frameworks/management/types.zeek
|
||||||
@load frameworks/management/util.zeek
|
@load frameworks/management/util.zeek
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
@load frameworks/control/controller.zeek
|
@load frameworks/control/controller.zeek
|
||||||
@load frameworks/management/agent/main.zeek
|
@load frameworks/management/agent/main.zeek
|
||||||
@load frameworks/management/controller/main.zeek
|
@load frameworks/management/controller/main.zeek
|
||||||
|
@load frameworks/management/node/__load__.zeek
|
||||||
|
@load frameworks/management/node/main.zeek
|
||||||
@load frameworks/files/extract-all-files.zeek
|
@load frameworks/files/extract-all-files.zeek
|
||||||
@load policy/misc/dump-events.zeek
|
@load policy/misc/dump-events.zeek
|
||||||
@load policy/protocols/conn/speculative-service.zeek
|
@load policy/protocols/conn/speculative-service.zeek
|
||||||
|
|
14
src/Val.cc
14
src/Val.cc
|
@ -420,8 +420,9 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val*
|
||||||
}
|
}
|
||||||
|
|
||||||
rapidjson::Value j;
|
rapidjson::Value j;
|
||||||
|
auto tag = val->GetType()->Tag();
|
||||||
|
|
||||||
switch ( val->GetType()->Tag() )
|
switch ( tag )
|
||||||
{
|
{
|
||||||
case TYPE_BOOL:
|
case TYPE_BOOL:
|
||||||
writer.Bool(val->AsBool());
|
writer.Bool(val->AsBool());
|
||||||
|
@ -475,8 +476,15 @@ static void BuildJSON(threading::formatter::JSON::NullDoubleWriter& writer, Val*
|
||||||
ODesc d;
|
ODesc d;
|
||||||
d.SetStyle(RAW_STYLE);
|
d.SetStyle(RAW_STYLE);
|
||||||
val->Describe(&d);
|
val->Describe(&d);
|
||||||
writer.String(util::json_escape_utf8(
|
std::string desc(reinterpret_cast<const char*>(d.Bytes()), d.Len());
|
||||||
std::string(reinterpret_cast<const char*>(d.Bytes()), d.Len())));
|
|
||||||
|
// None of our function types should have surrounding
|
||||||
|
// whitespace, but ODesc might produce it due to its
|
||||||
|
// many output modes and flags. Strip it.
|
||||||
|
if ( tag == TYPE_FUNC )
|
||||||
|
desc = util::strstrip(desc);
|
||||||
|
|
||||||
|
writer.String(util::json_escape_utf8(desc));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
### NOTE: This file has been sorted with diff-sort.
|
### NOTE: This file has been sorted with diff-sort.
|
||||||
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:13 "Remove in v5.1. Use log-certs-base64.zeek instead."
|
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:15 "Remove in v5.1. Use log-certs-base64.zeek instead."
|
||||||
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead."
|
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead."
|
||||||
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default")
|
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default")
|
||||||
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default")
|
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:65 ("Remove in v5.1. OCSP logging is now enabled by default")
|
||||||
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default")
|
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default")
|
||||||
warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")
|
warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:5 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")
|
||||||
warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")
|
warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
3528e248d7d35e102c39c8e8050fc1a6dfa477bf
|
1b515f3f60abed5c505a970cae380560ce6304c1
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue