Reorg of the cluster controller to new "Management framework" layout

- This gives the cluster controller and agent the common name "Management
framework" and changes the start directory of the sources from
"policy/frameworks/cluster" to "policy/frameworks/management". This avoids
ambiguity with the existing cluster framework.

- It renames the "ClusterController" and "ClusterAgent" script modules to
"Management::Controller" and "Management::Agent", respectively. This allows us
to anchor tooling common to both controller and agent at the "Management"
module.

- It moves common configuration settings, logging, requests, types, and
utilities to the common "Management" module.

- It removes the explicit "::Types" submodule (so a request/response result is
now a Management::Result, not a Management::Types::Result), which makes
typenames more readable.

- It updates tests that depend on module naming and full set of scripts.
This commit is contained in:
Christian Kreibich 2022-02-04 18:04:42 -08:00
parent 9efc214d42
commit 54aaf3a623
25 changed files with 554 additions and 536 deletions

View file

@ -1,4 +0,0 @@
##! The entry point for the cluster agent. It runs bootstrap logic for launching
##! the agent process via Zeek's Supervisor.
@load ./boot

View file

@ -1,138 +0,0 @@
##! The event API of cluster agents. Most endpoints consist of event pairs,
##! where the agent answers a request event with a corresponding response
##! event. Such event pairs share the same name prefix and end in "_request" and
##! "_response", respectively.
@load base/frameworks/supervisor/control
@load policy/frameworks/cluster/controller/types
module ClusterAgent::API;
export {
## A simple versioning scheme, used to track basic compatibility of
## controller and agent.
const version = 1;
# Agent API events
## The controller sends this event to convey a new cluster configuration
## to the agent. Once processed, the agent responds with the response
## event.
##
## reqid: a request identifier string, echoed in the response event.
##
## config: a :zeek:see:`ClusterController::Types::Configuration` record
## describing the cluster topology. Note that this contains the full
## topology, not just the part pertaining to this agent. That's because
## the cluster framework requires full cluster visibility to establish
## the needed peerings.
##
global set_configuration_request: event(reqid: string,
config: ClusterController::Types::Configuration);
## Response to a set_configuration_request event. The agent sends
## this back to the controller.
##
## reqid: the request identifier used in the request event.
##
## result: the result record.
##
global set_configuration_response: event(reqid: string,
result: ClusterController::Types::Result);
## The controller sends this event to request a list of
## :zeek:see:`ClusterController::Types::NodeStatus` records that capture
## the status of Supervisor-managed nodes running on this instance.
## instances.
##
## reqid: a request identifier string, echoed in the response event.
##
global get_nodes_request: event(reqid: string);
## Response to a get_nodes_request event. The agent sends this back to the
## controller.
##
## reqid: the request identifier used in the request event.
##
## result: a :zeek:see:`ClusterController::Types::Result` record. Its data
## member is a vector of :zeek:see:`ClusterController::Types::NodeStatus`
## records, covering the nodes at this instance. The result may also
## indicate failure, with error messages indicating what went wrong.
##
global get_nodes_response: event(reqid: string,
result: ClusterController::Types::Result);
## The controller sends this event to confirm to the agent that it is
## part of the current cluster topology. The agent acknowledges with the
## corresponding response event.
##
## reqid: a request identifier string, echoed in the response event.
##
global agent_welcome_request: event(reqid: string);
## Response to an agent_welcome_request event. The agent sends this
## back to the controller.
##
## reqid: the request identifier used in the request event.
##
## result: the result record.
##
global agent_welcome_response: event(reqid: string,
result: ClusterController::Types::Result);
## The controller sends this event to convey that the agent is not
## currently required. This status may later change, depending on
## updates from the client, so the Broker-level peering can remain
## active. The agent releases any cluster-related resources (including
## shutdown of existing Zeek cluster nodes) when processing the request,
## and confirms via the response event. Shutting down an agent at this
## point has no operational impact on the running cluster.
##
## reqid: a request identifier string, echoed in the response event.
##
global agent_standby_request: event(reqid: string);
## Response to an agent_standby_request event. The agent sends this
## back to the controller.
##
## reqid: the request identifier used in the request event.
##
## result: the result record.
##
global agent_standby_response: event(reqid: string,
result: ClusterController::Types::Result);
# Notification events, agent -> controller
## The agent sends this event upon peering as a "check-in", informing
## the controller that an agent of the given name is now available to
## communicate with. It is a controller-level equivalent of
## `:zeek:see:`Broker::peer_added`.
##
## instance: an instance name, really the agent's name as per :zeek:see:`ClusterAgent::name`.
##
## host: the IP address of the agent. (This may change in the future.)
##
## api_version: the API version of this agent.
##
global notify_agent_hello: event(instance: string, host: addr,
api_version: count);
# The following are not yet implemented.
# Report node state changes.
global notify_change: event(instance: string,
n: ClusterController::Types::Node,
old: ClusterController::Types::State,
new: ClusterController::Types::State);
# Report operational error.
global notify_error: event(instance: string, msg: string, node: string &default="");
# Report informational message.
global notify_log: event(instance: string, msg: string, node: string &default="");
}

View file

@ -1,41 +0,0 @@
##! The cluster agent boot logic runs in Zeek's supervisor and instructs it to
##! launch an agent process. The agent's main logic resides in main.zeek,
##! similarly to other frameworks. The new process will execute that script.
##!
##! If the current process is not the Zeek supervisor, this does nothing.
@load ./config
# The agent needs the supervisor to listen for node management requests. We
# need to tell it to do so, and we need to do so here, in the agent
# bootstrapping code, so the redef applies prior to the fork of the agent
# process itself.
redef SupervisorControl::enable_listen = T;
event zeek_init()
{
if ( ! Supervisor::is_supervisor() )
return;
local epi = ClusterAgent::endpoint_info();
local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T,
$scripts=vector("policy/frameworks/cluster/agent/main.zeek"));
if ( ClusterAgent::directory != "" )
sn$directory = ClusterAgent::directory;
if ( ClusterAgent::stdout_file_suffix != "" )
sn$stdout_file = epi$id + "." + ClusterAgent::stdout_file_suffix;
if ( ClusterAgent::stderr_file_suffix != "" )
sn$stderr_file = epi$id + "." + ClusterAgent::stderr_file_suffix;
# This helps Zeek run controller and agent with a minimal set of scripts.
sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "AGENT";
local res = Supervisor::create(sn);
if ( res != "" )
{
print(fmt("error: supervisor could not create agent node: %s", res));
exit(1);
}
}

View file

@ -1,117 +0,0 @@
##! Configuration settings for a cluster agent.
@load policy/frameworks/cluster/controller/types
module ClusterAgent;
export {
## The name this agent uses to represent the cluster instance it
## manages. Defaults to the value of the ZEEK_AGENT_NAME environment
## variable. When that is unset and you don't redef the value,
## the implementation defaults to "agent-<hostname>".
const name = getenv("ZEEK_AGENT_NAME") &redef;
## Agent stdout log configuration. If the string is non-empty, Zeek will
## produce a free-form log (i.e., not one governed by Zeek's logging
## framework) in Zeek's working directory. The final log's name is
## "<name>.<suffix>", where the name is taken from :zeek:see:`ClusterAgent::name`,
## and the suffix is defined by the following variable. If left empty,
## no such log results.
##
## Note that the agent also establishes a "proper" Zeek log via the
## :zeek:see:`ClusterController::Log` module.
const stdout_file_suffix = "agent.stdout" &redef;
## Agent stderr log configuration. Like :zeek:see:`ClusterAgent::stdout_file_suffix`,
## but for the stderr stream.
const stderr_file_suffix = "agent.stderr" &redef;
## The network address the agent listens on. This only takes effect if
## the agent isn't configured to connect to the controller (see
## :zeek:see:`ClusterAgent::controller`). By default this uses the value of the
## ZEEK_AGENT_ADDR environment variable, but you may also redef to
## a specific value. When empty, the implementation falls back to
## :zeek:see:`ClusterAgent::default_address`.
const listen_address = getenv("ZEEK_AGENT_ADDR") &redef;
## The fallback listen address if :zeek:see:`ClusterAgent::listen_address`
## remains empty. Unless redefined, this uses Broker's own default listen
## address.
const default_address = Broker::default_listen_address &redef;
## The network port the agent listens on. Counterpart to
## :zeek:see:`ClusterAgent::listen_address`, defaulting to the ZEEK_AGENT_PORT
## environment variable.
const listen_port = getenv("ZEEK_AGENT_PORT") &redef;
## The fallback listen port if :zeek:see:`ClusterAgent::listen_port` remains empty.
const default_port = 2151/tcp &redef;
## The agent's Broker topic prefix. For its own communication, the agent
## suffixes this with "/<name>", based on :zeek:see:`ClusterAgent::name`.
const topic_prefix = "zeek/cluster-control/agent" &redef;
## The network coordinates of the controller. When defined, the agent
## peers with (and connects to) the controller; otherwise the controller
## will peer (and connect to) the agent, listening as defined by
## :zeek:see:`ClusterAgent::listen_address` and :zeek:see:`ClusterAgent::listen_port`.
const controller: Broker::NetworkInfo = [
$address="0.0.0.0", $bound_port=0/unknown] &redef;
## An optional custom output directory for the agent's stdout and stderr
## logs. Agent and controller currently only log locally, not via the
## data cluster's logger node. (This might change in the future.) This
## means that if both write to the same log file, the output gets
## garbled.
const directory = "" &redef;
## The working directory for data cluster nodes created by this
## agent. If you make this a relative path, note that the path is
## relative to the agent's working directory, since it creates data
## cluster nodes.
const cluster_directory = "" &redef;
## Returns a :zeek:see:`ClusterController::Types::Instance` describing this
## instance (its agent name plus listening address/port, as applicable).
global instance: function(): ClusterController::Types::Instance;
## Returns a :zeek:see:`Broker::EndpointInfo` record for this instance.
## Similar to :zeek:see:`ClusterAgent::instance`, but with slightly different
## data format.
global endpoint_info: function(): Broker::EndpointInfo;
}
function instance(): ClusterController::Types::Instance
{
local epi = endpoint_info();
return ClusterController::Types::Instance($name=epi$id,
$host=to_addr(epi$network$address),
$listen_port=epi$network$bound_port);
}
function endpoint_info(): Broker::EndpointInfo
{
local epi: Broker::EndpointInfo;
local network: Broker::NetworkInfo;
if ( ClusterAgent::name != "" )
epi$id = ClusterAgent::name;
else
epi$id = fmt("agent-%s", gethostname());
if ( ClusterAgent::listen_address != "" )
network$address = ClusterAgent::listen_address;
else if ( ClusterAgent::default_address != "" )
network$address = ClusterAgent::default_address;
else
network$address = "127.0.0.1";
if ( ClusterAgent::listen_port != "" )
network$bound_port = to_port(ClusterAgent::listen_port);
else
network$bound_port = ClusterAgent::default_port;
epi$network = network;
return epi;
}

View file

@ -1,384 +0,0 @@
##! This is the main "runtime" of a cluster agent. Zeek does not load this
##! directly; rather, the agent's bootstrapping module (in ./boot.zeek)
##! specifies it as the script to run in the node newly created via Zeek's
##! supervisor.
@load base/frameworks/broker
@load policy/frameworks/cluster/controller/config
@load policy/frameworks/cluster/controller/log
@load policy/frameworks/cluster/controller/request
@load ./api
module ClusterAgent::Runtime;
# This export is mainly to appease Zeekygen's need to understand redefs of the
# Request record below. Without it, it fails to establish link targets for the
# tucked-on types.
export {
## Request state specific to the agent's Supervisor interactions.
type SupervisorState: record {
node: string; ##< Name of the node the Supervisor is acting on.
};
}
redef record ClusterController::Request::Request += {
supervisor_state: SupervisorState &optional;
};
redef ClusterController::role = ClusterController::Types::AGENT;
# The global configuration as passed to us by the controller
global g_config: ClusterController::Types::Configuration;
# A map to make other instance info accessible
global g_instances: table[string] of ClusterController::Types::Instance;
# A map for the nodes we run on this instance, via this agent.
global g_nodes: table[string] of ClusterController::Types::Node;
# The node map employed by the supervisor to describe the cluster
# topology to newly forked nodes. We refresh it when we receive
# new configurations.
global g_data_cluster: table[string] of Supervisor::ClusterEndpoint;
event SupervisorControl::create_response(reqid: string, result: string)
{
local req = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) )
return;
local name = req$supervisor_state$node;
if ( |result| > 0 )
{
local msg = fmt("failed to create node %s: %s", name, result);
ClusterController::Log::error(msg);
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name);
}
ClusterController::Request::finish(reqid);
}
event SupervisorControl::destroy_response(reqid: string, result: bool)
{
local req = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) )
return;
local name = req$supervisor_state$node;
if ( ! result )
{
local msg = fmt("failed to destroy node %s, %s", name, reqid);
ClusterController::Log::error(msg);
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name);
}
ClusterController::Request::finish(reqid);
}
function supervisor_create(nc: Supervisor::NodeConfig)
{
local req = ClusterController::Request::create();
req$supervisor_state = SupervisorState($node = nc$name);
event SupervisorControl::create_request(req$id, nc);
ClusterController::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id));
}
function supervisor_destroy(node: string)
{
local req = ClusterController::Request::create();
req$supervisor_state = SupervisorState($node = node);
event SupervisorControl::destroy_request(req$id, node);
ClusterController::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id));
}
event ClusterAgent::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_request %s", reqid));
local nodename: string;
local node: ClusterController::Types::Node;
local nc: Supervisor::NodeConfig;
local msg: string;
# Adopt the global configuration provided.
# XXX this can later handle validation and persistence
# XXX should do this transactionally, only set when all else worked
g_config = config;
# Refresh the instances table:
g_instances = table();
for ( inst in config$instances )
g_instances[inst$name] = inst;
# Terminate existing nodes
for ( nodename in g_nodes )
supervisor_destroy(nodename);
g_nodes = table();
# Refresh the data cluster and nodes tables
g_data_cluster = table();
for ( node in config$nodes )
{
if ( node$instance == ClusterAgent::name )
g_nodes[node$name] = node;
# The cluster and supervisor frameworks require a port for every
# node, using 0/unknown to signify "don't listen". We use
# optional values and map an absent value to 0/unknown.
local p = 0/unknown;
if ( node?$p )
p = node$p;
local cep = Supervisor::ClusterEndpoint(
$role = node$role,
$host = g_instances[node$instance]$host,
$p = p);
if ( node?$interface )
cep$interface = node$interface;
g_data_cluster[node$name] = cep;
}
# Apply the new configuration via the supervisor
for ( nodename in g_nodes )
{
node = g_nodes[nodename];
nc = Supervisor::NodeConfig($name=nodename);
if ( ClusterAgent::cluster_directory != "" )
nc$directory = ClusterAgent::cluster_directory;
if ( node?$interface )
nc$interface = node$interface;
if ( node?$cpu_affinity )
nc$cpu_affinity = node$cpu_affinity;
if ( node?$scripts )
nc$scripts = node$scripts;
if ( node?$env )
nc$env = node$env;
# XXX could use options to enable per-node overrides for
# directory, stdout, stderr, others?
nc$cluster = g_data_cluster;
supervisor_create(nc);
}
# XXX this currently doesn not fail if any of above problems occurred,
# mainly due to the tediousness of handling the supervisor's response
# events asynchonously. The only indication of error will be
# notification events to the controller.
if ( reqid != "" )
{
local res = ClusterController::Types::Result(
$reqid = reqid,
$instance = ClusterAgent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s",
ClusterController::Types::result_to_string(res)));
event ClusterAgent::API::set_configuration_response(reqid, res);
}
}
event SupervisorControl::status_response(reqid: string, result: Supervisor::Status)
{
local req = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) )
return;
ClusterController::Request::finish(reqid);
local res = ClusterController::Types::Result(
$reqid = req$parent_id, $instance = ClusterAgent::name);
local node_statuses: ClusterController::Types::NodeStatusVec;
for ( node in result$nodes )
{
local sns = result$nodes[node]; # Supervisor node status
local cns = ClusterController::Types::NodeStatus(
$node=node, $state=ClusterController::Types::PENDING);
# Identify the role of the node. For data cluster roles (worker,
# manager, etc) we derive this from the cluster node table. For
# agent and controller, we identify via environment variables
# that the controller framework establishes upon creation (see
# the respective boot.zeek scripts).
if ( node in sns$node$cluster )
{
cns$cluster_role = sns$node$cluster[node]$role;
# The supervisor's responses use 0/tcp (not 0/unknown)
# when indicating an unused port because its internal
# serialization always assumes TCP.
if ( sns$node$cluster[node]$p != 0/tcp )
cns$p = sns$node$cluster[node]$p;
}
else
{
if ( "ZEEK_CLUSTER_MGMT_NODE" in sns$node$env )
{
local role = sns$node$env["ZEEK_CLUSTER_MGMT_NODE"];
if ( role == "CONTROLLER" )
{
cns$mgmt_role = ClusterController::Types::CONTROLLER;
# The controller always listens, so the Zeek client can connect.
cns$p = ClusterController::endpoint_info()$network$bound_port;
}
else if ( role == "AGENT" )
{
cns$mgmt_role = ClusterController::Types::AGENT;
# If we have a controller address, the agent connects to it
# and does not listen. See zeek_init() below for similar logic.
if ( ClusterAgent::controller$address == "0.0.0.0" )
cns$p = ClusterAgent::endpoint_info()$network$bound_port;
}
else
ClusterController::Log::warning(fmt(
"unexpected cluster management node type '%'", role));
}
}
# A PID is available if a supervised node has fully launched
# and is therefore running.
if ( sns?$pid )
{
cns$pid = sns$pid;
cns$state = ClusterController::Types::RUNNING;
}
node_statuses += cns;
}
res$data = node_statuses;
ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_response %s",
ClusterController::Types::result_to_string(res)));
event ClusterAgent::API::get_nodes_response(req$parent_id, res);
}
event ClusterAgent::API::get_nodes_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_request %s", reqid));
local req = ClusterController::Request::create();
req$parent_id = reqid;
event SupervisorControl::status_request(req$id, "");
ClusterController::Log::info(fmt("issued supervisor status, %s", req$id));
}
event ClusterAgent::API::agent_welcome_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid));
local res = ClusterController::Types::Result(
$reqid = reqid,
$instance = ClusterAgent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_response %s",
ClusterController::Types::result_to_string(res)));
event ClusterAgent::API::agent_welcome_response(reqid, res);
}
event ClusterAgent::API::agent_standby_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_standby_request %s", reqid));
# We shut down any existing cluster nodes via an empty configuration,
# and fall silent. We do not unpeer/disconnect (assuming we earlier
# peered/connected -- otherwise there's nothing we can do here via
# Broker anyway), mainly to keep open the possibility of running
# cluster nodes again later.
event ClusterAgent::API::set_configuration_request("", ClusterController::Types::Configuration());
local res = ClusterController::Types::Result(
$reqid = reqid,
$instance = ClusterAgent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_response %s",
ClusterController::Types::result_to_string(res)));
event ClusterAgent::API::agent_standby_response(reqid, res);
}
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
{
# This does not (cannot?) immediately verify that the new peer
# is in fact a controller, so we might send this in vain.
# Controllers register the agent upon receipt of the event.
local epi = ClusterAgent::endpoint_info();
event ClusterAgent::API::notify_agent_hello(epi$id,
to_addr(epi$network$address), ClusterAgent::API::version);
}
# XXX We may want a request timeout event handler here. It's arguably cleaner to
# send supervisor failure events back to the controller than to rely on its
# controller-agent request timeout to kick in.
event zeek_init()
{
local epi = ClusterAgent::endpoint_info();
local agent_topic = ClusterAgent::topic_prefix + "/" + epi$id;
# The agent needs to peer with the supervisor -- this doesn't currently
# happen automatically. The address defaults to Broker's default, which
# relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker
# internally falls back to listening on any; we pick 127.0.0.1.
local supervisor_addr = Broker::default_listen_address;
if ( supervisor_addr == "" )
supervisor_addr = "127.0.0.1";
Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry);
# Agents need receive communication targeted at it, and any responses
# from the supervisor.
Broker::subscribe(agent_topic);
Broker::subscribe(SupervisorControl::topic_prefix);
# Auto-publish a bunch of events. Glob patterns or module-level
# auto-publish would be helpful here.
Broker::auto_publish(agent_topic, ClusterAgent::API::get_nodes_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request);
# Establish connectivity with the controller.
if ( ClusterAgent::controller$address != "0.0.0.0" )
{
# We connect to the controller.
Broker::peer(ClusterAgent::controller$address,
ClusterAgent::controller$bound_port,
ClusterController::connect_retry);
}
else
{
# Controller connects to us; listen for it.
Broker::listen(cat(epi$network$address), epi$network$bound_port);
}
ClusterController::Log::info("agent is live");
}

View file

@ -1,4 +0,0 @@
##! The entry point for the cluster controller. It runs bootstrap logic for
##! launching the controller process via Zeek's Supervisor.
@load ./boot

View file

@ -1,119 +0,0 @@
##! The event API of cluster controllers. Most endpoints consist of event pairs,
##! where the controller answers a zeek-client request event with a
##! corresponding response event. Such event pairs share the same name prefix
##! and end in "_request" and "_response", respectively.
@load ./types
module ClusterController::API;
export {
## A simple versioning scheme, used to track basic compatibility of
## controller, agents, and zeek-client.
const version = 1;
## zeek-client sends this event to request a list of the currently
## peered agents/instances.
##
## reqid: a request identifier string, echoed in the response event.
##
global get_instances_request: event(reqid: string);
## Response to a get_instances_request event. The controller sends
## this back to the client.
##
## reqid: the request identifier used in the request event.
##
## result: the result record. Its data member is a
## :zeek:see:`ClusterController::Types::Instance` record.
##
global get_instances_response: event(reqid: string,
result: ClusterController::Types::Result);
## zeek-client sends this event to establish a new cluster configuration,
## including the full cluster topology. The controller processes the update
## and relays it to the agents. Once each has responded (or a timeout occurs)
## the controller sends a corresponding response event back to the client.
##
## reqid: a request identifier string, echoed in the response event.
##
## config: a :zeek:see:`ClusterController::Types::Configuration` record
## specifying the cluster configuration.
##
global set_configuration_request: event(reqid: string,
config: ClusterController::Types::Configuration);
## Response to a set_configuration_request event. The controller sends
## this back to the client.
##
## reqid: the request identifier used in the request event.
##
## result: a vector of :zeek:see:`ClusterController::Types::Result` records.
## Each member captures one agent's response.
##
global set_configuration_response: event(reqid: string,
result: ClusterController::Types::ResultVec);
## zeek-client sends this event to request a list of
## :zeek:see:`ClusterController::Types::NodeStatus` records that capture
## the status of Supervisor-managed nodes running on the cluster's
## instances.
##
## reqid: a request identifier string, echoed in the response event.
##
global get_nodes_request: event(reqid: string);
## Response to a get_nodes_request event. The controller sends this
## back to the client.
##
## reqid: the request identifier used in the request event.
##
## result: a :zeek:type`vector` of :zeek:see:`ClusterController::Types::Result`
## records. Each record covers one cluster instance. Each record's data
## member is a vector of :zeek:see:`ClusterController::Types::NodeStatus`
## records, covering the nodes at that instance. Results may also indicate
## failure, with error messages indicating what went wrong.
global get_nodes_response: event(reqid: string,
result: ClusterController::Types::ResultVec);
# Testing events. These don't provide operational value but expose
# internal functionality, triggered by test cases.
## This event causes no further action (other than getting logged) if
## with_state is F. When T, the controller establishes request state, and
## the controller only ever sends the response event when this state times
## out.
##
## reqid: a request identifier string, echoed in the response event when
## with_state is T.
##
## with_state: flag indicating whether the controller should keep (and
## time out) request state for this request.
##
global test_timeout_request: event(reqid: string, with_state: bool);
## Response to a test_timeout_request event. The controller sends this
## back to the client if the original request had the with_state flag.
##
## reqid: the request identifier used in the request event.
##
global test_timeout_response: event(reqid: string,
result: ClusterController::Types::Result);
# Notification events, agent -> controller
## The controller triggers this event when the operational cluster
## instances align with the ones desired by the cluster
## configuration. It's essentially a cluster management readiness
## event. This event is currently only used by the controller and not
## published to other topics.
##
## instances: the set of instance names now ready.
##
global notify_agents_ready: event(instances: set[string]);
}

View file

@ -1,36 +0,0 @@
##! The cluster controller's boot logic runs in Zeek's supervisor and instructs
##! it to launch the controller process. The controller's main logic resides in
##! main.zeek, similarly to other frameworks. The new process will execute that
##! script.
##!
##! If the current process is not the Zeek supervisor, this does nothing.
@load ./config
event zeek_init()
{
if ( ! Supervisor::is_supervisor() )
return;
local epi = ClusterController::endpoint_info();
local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T,
$scripts=vector("policy/frameworks/cluster/controller/main.zeek"));
if ( ClusterController::directory != "" )
sn$directory = ClusterController::directory;
if ( ClusterController::stdout_file != "" )
sn$stdout_file = ClusterController::stdout_file;
if ( ClusterController::stderr_file != "" )
sn$stderr_file = ClusterController::stderr_file;
# This helps Zeek run controller and agent with a minimal set of scripts.
sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "CONTROLLER";
local res = Supervisor::create(sn);
if ( res != "" )
{
print(fmt("error: supervisor could not create controller node: %s", res));
exit(1);
}
}

View file

@ -1,110 +0,0 @@
##! Configuration settings for the cluster controller.
@load policy/frameworks/cluster/agent/config
module ClusterController;
export {
## The name of this controller. Defaults to the value of the
## ZEEK_CONTROLLER_NAME environment variable. When that is unset and the
## user doesn't redef the value, the implementation defaults to
## "controller-<hostname>".
const name = getenv("ZEEK_CONTROLLER_NAME") &redef;
## The controller's stdout log name. If the string is non-empty, Zeek will
## produce a free-form log (i.e., not one governed by Zeek's logging
## framework) in Zeek's working directory. If left empty, no such log
## results.
##
## Note that the controller also establishes a "proper" Zeek log via the
## :zeek:see:`ClusterController::Log` module.
const stdout_file = "controller.stdout" &redef;
## The controller's stderr log name. Like :zeek:see:`ClusterController::stdout_file`,
## but for the stderr stream.
const stderr_file = "controller.stderr" &redef;
## The network address the controller listens on. By default this uses
## the value of the ZEEK_CONTROLLER_ADDR environment variable, but you
## may also redef to a specific value. When empty, the implementation
## falls back to :zeek:see:`ClusterController::default_address`.
const listen_address = getenv("ZEEK_CONTROLLER_ADDR") &redef;
## The fallback listen address if :zeek:see:`ClusterController::listen_address`
## remains empty. Unless redefined, this uses Broker's own default
## listen address.
const default_address = Broker::default_listen_address &redef;
## The network port the controller listens on. Counterpart to
## :zeek:see:`ClusterController::listen_address`, defaulting to the
## ZEEK_CONTROLLER_PORT environment variable.
const listen_port = getenv("ZEEK_CONTROLLER_PORT") &redef;
## The fallback listen port if :zeek:see:`ClusterController::listen_port`
## remains empty.
const default_port = 2150/tcp &redef;
## The controller's connect retry interval. Defaults to a more
## aggressive value compared to Broker's 30s.
const connect_retry = 1sec &redef;
## The controller's Broker topic. Clients send requests to this topic.
const topic = "zeek/cluster-control/controller" &redef;
## The role of this process in cluster management. Agent and controller
## both redefine this. Used during logging.
const role = ClusterController::Types::NONE &redef;
## The timeout for request state. Such state (see the :zeek:see:`ClusterController::Request`
## module) ties together request and response event pairs. The timeout causes
## its cleanup in the absence of a timely response. It applies both to
## state kept for client requests, as well as state in the agents for
## requests to the supervisor.
const request_timeout = 10sec &redef;
## An optional custom output directory for the controller's stdout and
## stderr logs. Agent and controller currently only log locally, not via
## the data cluster's logger node. (This might change in the future.)
## This means that if both write to the same log file, the output gets
## garbled.
const directory = "" &redef;
## Returns a :zeek:see:`Broker::NetworkInfo` record describing the controller.
global network_info: function(): Broker::NetworkInfo;
## Returns a :zeek:see:`Broker::EndpointInfo` record describing the controller.
global endpoint_info: function(): Broker::EndpointInfo;
}
function network_info(): Broker::NetworkInfo
{
local ni: Broker::NetworkInfo;
if ( ClusterController::listen_address != "" )
ni$address = ClusterController::listen_address;
else if ( ClusterController::default_address != "" )
ni$address = ClusterController::default_address;
else
ni$address = "127.0.0.1";
if ( ClusterController::listen_port != "" )
ni$bound_port = to_port(ClusterController::listen_port);
else
ni$bound_port = ClusterController::default_port;
return ni;
}
function endpoint_info(): Broker::EndpointInfo
{
local epi: Broker::EndpointInfo;
if ( ClusterController::name != "" )
epi$id = ClusterController::name;
else
epi$id = fmt("controller-%s", gethostname());
epi$network = network_info();
return epi;
}

View file

@ -1,137 +0,0 @@
##! This module implements straightforward logging abilities for cluster
##! controller and agent. It uses Zeek's logging framework, and works only for
##! nodes managed by the supervisor. In this setting Zeek's logging framework
##! operates locally, i.e., this logging does not involve any logger nodes.
@load ./config
module ClusterController::Log;
export {
## The cluster logging stream identifier.
redef enum Log::ID += { LOG };
## A default logging policy hook for the stream.
global log_policy: Log::PolicyHook;
## The controller/agent log supports four different log levels.
type Level: enum {
DEBUG,
INFO,
WARNING,
ERROR,
};
## The record type containing the column fields of the agent/controller log.
type Info: record {
## The time at which a cluster message was generated.
ts: time;
## The name of the node that is creating the log record.
node: string;
## Log level of this message, converted from the above Level enum
level: string;
## The role of the node, translated from ClusterController::Types::Role.
role: string;
## A message indicating information about cluster controller operation.
message: string;
} &log;
## The log level in use for this node.
global log_level = DEBUG &redef;
## A debug-level log message writer.
##
## message: the message to log.
##
global debug: function(message: string);
## An info-level log message writer.
##
## message: the message to log.
##
global info: function(message: string);
## A warning-level log message writer.
##
## message: the message to log.
##
global warning: function(message: string);
## An error-level log message writer. (This only logs a message, it does not
## terminate Zeek or have other runtime effects.)
##
## message: the message to log.
##
global error: function(message: string);
}
# Enum translations to strings. This avoids those enums being reported
# with full qualifications in the logs, which is too verbose.
global l2s: table[Level] of string = {
[DEBUG] = "DEBUG",
[INFO] = "INFO",
[WARNING] = "WARNING",
[ERROR] = "ERROR",
};
global r2s: table[ClusterController::Types::Role] of string = {
[ClusterController::Types::AGENT] = "AGENT",
[ClusterController::Types::CONTROLLER] = "CONTROLLER",
};
function debug(message: string)
{
if ( enum_to_int(log_level) > enum_to_int(DEBUG) )
return;
local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[DEBUG],
$role=r2s[ClusterController::role], $message=message]);
}
function info(message: string)
{
if ( enum_to_int(log_level) > enum_to_int(INFO) )
return;
local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[INFO],
$role=r2s[ClusterController::role], $message=message]);
}
function warning(message: string)
{
if ( enum_to_int(log_level) > enum_to_int(WARNING) )
return;
local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[WARNING],
$role=r2s[ClusterController::role], $message=message]);
}
function error(message: string)
{
if ( enum_to_int(log_level) > enum_to_int(ERROR) )
return;
local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[ERROR],
$role=r2s[ClusterController::role], $message=message]);
}
event zeek_init()
{
if ( ! Supervisor::is_supervised() )
return;
local node = Supervisor::node();
# Defining the stream outside of the stream creation call sidesteps
# the coverage.find-bro-logs test, which tries to inventory all logs.
# This log isn't yet ready for that level of scrutiny.
local stream = Log::Stream($columns=Info, $path=fmt("cluster-%s", node$name),
$policy=log_policy);
Log::create_stream(ClusterController::Log::LOG, stream);
}

View file

@ -1,647 +0,0 @@
##! This is the main "runtime" of the cluster controller. Zeek does not load
##! this directly; rather, the controller's bootstrapping module (in ./boot.zeek)
##! specifies it as the script to run in the node newly created via Zeek's
##! supervisor.
@load base/frameworks/broker
@load policy/frameworks/cluster/agent/config
@load policy/frameworks/cluster/agent/api
@load ./api
@load ./log
@load ./request
@load ./util
module ClusterController::Runtime;
# This export is mainly to appease Zeekygen's need to understand redefs of the
# Request record below. Without it, it fails to establish link targets for the
# tucked-on types.
export {
## Request state specific to
## :zeek:see:`ClusterController::API::set_configuration_request` and
## :zeek:see:`ClusterController::API::set_configuration_response`.
type SetConfigurationState: record {
## The cluster configuration established with this request
config: ClusterController::Types::Configuration;
## Request state for every controller/agent transaction.
requests: set[string] &default=set();
};
## Request state specific to
## :zeek:see:`ClusterController::API::get_nodes_request` and
## :zeek:see:`ClusterController::API::get_nodes_response`.
type GetNodesState: record {
## Request state for every controller/agent transaction.
requests: set[string] &default=set();
};
## Dummy state for internal state-keeping test cases.
type TestState: record { };
}
redef record ClusterController::Request::Request += {
set_configuration_state: SetConfigurationState &optional;
get_nodes_state: GetNodesState &optional;
test_state: TestState &optional;
};
redef ClusterController::role = ClusterController::Types::CONTROLLER;
global check_instances_ready: function();
global add_instance: function(inst: ClusterController::Types::Instance);
global drop_instance: function(inst: ClusterController::Types::Instance);
global null_config: function(): ClusterController::Types::Configuration;
global is_null_config: function(config: ClusterController::Types::Configuration): bool;
# Checks whether the given instance is one that we know with different
# communication settings: a a different peering direction, a different listening
# port, etc. Used as a predicate to indicate when we need to drop the existing
# one from our internal state.
global is_instance_connectivity_change: function
(inst: ClusterController::Types::Instance): bool;
# The set of agents the controller interacts with to manage to currently
# configured cluster. This may be a subset of all the agents known to the
# controller, as tracked by the g_instances_known set. They key is the instance
# name and should match the $name member of the corresponding instance record.
global g_instances: table[string] of ClusterController::Types::Instance = table();
# The set of instances that have checked in with the controller. This is a
# superset of g_instances, since it covers any agent that has sent us a
# notify_agent_hello event.
global g_instances_known: set[string] = set();
# A corresponding set of instances/agents that we track in order to understand
# when all of the above instances have sent agent_welcome_response events. (An
# alternative would be to use a record that adds a single state bit for each
# instance, and store that above.)
global g_instances_ready: set[string] = set();
# The request ID of the most recent configuration update that's come in from
# a client. We track it here until we know we are ready to communicate with all
# agents required by the update.
global g_config_reqid_pending: string = "";
# The most recent configuration we have successfully deployed. This is also
# the one we send whenever the client requests it.
global g_config_current: ClusterController::Types::Configuration;
function send_config_to_agents(req: ClusterController::Request::Request,
config: ClusterController::Types::Configuration)
{
for ( name in g_instances )
{
if ( name !in g_instances_ready )
next;
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create();
areq$parent_id = req$id;
# We track the requests sent off to each agent. As the
# responses come in, we delete them. Once the requests
# set is empty, we respond back to the client.
add req$set_configuration_state$requests[areq$id];
# We could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner.
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config);
}
}
# This is the &on_change handler for the g_instances_ready set, meaning
# it runs whenever a required agent has confirmed it's ready.
function check_instances_ready()
{
local cur_instances: set[string];
for ( inst in g_instances )
add cur_instances[inst];
if ( cur_instances == g_instances_ready )
event ClusterController::API::notify_agents_ready(cur_instances);
}
function add_instance(inst: ClusterController::Types::Instance)
{
g_instances[inst$name] = inst;
if ( inst?$listen_port )
Broker::peer(cat(inst$host), inst$listen_port,
ClusterController::connect_retry);
if ( inst$name in g_instances_known )
{
# The agent has already peered with us. Send welcome to indicate
# it's part of cluster management. Once it responds, we update
# the set of ready instances and proceed as feasible with config
# deployments.
local req = ClusterController::Request::create();
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", inst$name));
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name,
ClusterAgent::API::agent_welcome_request, req$id);
}
}
function drop_instance(inst: ClusterController::Types::Instance)
{
if ( inst$name !in g_instances )
return;
# Send the agent a standby so it shuts down its cluster nodes & state
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_request to %s", inst$name));
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name,
ClusterAgent::API::agent_standby_request, "");
delete g_instances[inst$name];
if ( inst$name in g_instances_ready )
delete g_instances_ready[inst$name];
# The agent remains in g_instances_known, to track that we're able
# to communicate with it in case it's required again.
ClusterController::Log::info(fmt("dropped instance %s", inst$name));
}
function null_config(): ClusterController::Types::Configuration
{
return ClusterController::Types::Configuration($id="");
}
function is_null_config(config: ClusterController::Types::Configuration): bool
{
return config$id == "";
}
function is_instance_connectivity_change(inst: ClusterController::Types::Instance): bool
{
# If we're not tracking this instance as part of a cluster config, it's
# not a change. (More precisely: we cannot say whether it's changed.)
if ( inst$name !in g_instances )
return F;
# The agent has peered with us and now uses a different host.
# XXX 0.0.0.0 is a workaround until we've resolved how agents that peer
# with us obtain their identity. Broker ID?
if ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host )
return T;
# The agent has a listening port and the one we know does not, or vice
# versa. I.e., this is a change in the intended peering direction.
if ( inst?$listen_port != g_instances[inst$name]?$listen_port )
return T;
# Both have listening ports, but they differ.
if ( inst?$listen_port && g_instances[inst$name]?$listen_port &&
inst$listen_port != g_instances[inst$name]$listen_port )
return T;
return F;
}
event ClusterController::API::notify_agents_ready(instances: set[string])
{
local insts = ClusterController::Util::set_to_vector(instances);
ClusterController::Log::info(fmt("rx ClusterController::API:notify_agents_ready %s", join_string_vec(insts, ",")));
local req = ClusterController::Request::lookup(g_config_reqid_pending);
# If there's no pending request, when it's no longer available, or it
# doesn't have config state, don't do anything else.
if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state )
return;
# All instances requested in the pending configuration update are now
# known to us. Send them the config. As they send their response events
# we update the client's request state and eventually send the response
# event to the it.
send_config_to_agents(req, req$set_configuration_state$config);
}
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_agent_hello %s %s", instance, host));
# When an agent checks in with a mismatching API version, we log the
# fact and drop its state, if any.
if ( api_version != ClusterController::API::version )
{
ClusterController::Log::warning(
fmt("instance %s/%s has checked in with incompatible API version %s",
instance, host, api_version));
if ( instance in g_instances )
drop_instance(g_instances[instance]);
if ( instance in g_instances_known )
delete g_instances_known[instance];
return;
}
add g_instances_known[instance];
if ( instance in g_instances && instance !in g_instances_ready )
{
# We need this instance for our cluster and have full context for
# it from the configuration. Tell agent.
local req = ClusterController::Request::create();
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", instance));
Broker::publish(ClusterAgent::topic_prefix + "/" + instance,
ClusterAgent::API::agent_welcome_request, req$id);
}
}
event ClusterAgent::API::agent_welcome_response(reqid: string, result: ClusterController::Types::Result)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_response %s", reqid));
local req = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) )
return;
ClusterController::Request::finish(req$id);
# An agent we've been waiting to hear back from is ready for cluster
# work. Double-check we still want it, otherwise drop it.
if ( ! result$success || result$instance !in g_instances )
{
ClusterController::Log::info(fmt(
"tx ClusterAgent::API::agent_standby_request to %s", result$instance));
Broker::publish(ClusterAgent::topic_prefix + "/" + result$instance,
ClusterAgent::API::agent_standby_request, "");
return;
}
add g_instances_ready[result$instance];
ClusterController::Log::info(fmt("instance %s ready", result$instance));
check_instances_ready();
}
event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node,
old: ClusterController::Types::State,
new: ClusterController::Types::State)
{
# XXX TODO
}
event ClusterAgent::API::notify_error(instance: string, msg: string, node: string)
{
# XXX TODO
}
event ClusterAgent::API::notify_log(instance: string, msg: string, node: string)
{
# XXX TODO
}
event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid));
# Retrieve state for the request we just got a response to
local areq = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(areq) )
return;
# Release the request, which is now done.
ClusterController::Request::finish(areq$id);
# Find the original request from the client
local req = ClusterController::Request::lookup(areq$parent_id);
if ( ClusterController::Request::is_null(req) )
return;
# Add this result to the overall response
req$results[|req$results|] = result;
# Mark this request as done by removing it from the table of pending
# ones. The following if-check should always be true.
if ( areq$id in req$set_configuration_state$requests )
delete req$set_configuration_state$requests[areq$id];
# If there are any pending requests to the agents, we're
# done: we respond once every agent has responed (or we time out).
if ( |req$set_configuration_state$requests| > 0 )
return;
# All set_configuration requests to instances are done, so adopt the
# client's requested configuration as the new one and respond back to
# client.
g_config_current = req$set_configuration_state$config;
g_config_reqid_pending = "";
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
}
event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration)
{
ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid));
local res: ClusterController::Types::Result;
local req = ClusterController::Request::create(reqid);
req$set_configuration_state = SetConfigurationState($config = config);
# At the moment there can only be one pending request.
if ( g_config_reqid_pending != "" )
{
res = ClusterController::Types::Result($reqid=reqid);
res$success = F;
res$error = fmt("request %s still pending", g_config_reqid_pending);
req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
return;
}
# XXX validate the configuration:
# - Are node instances among defined instances?
# - Are all names unique?
# - Are any node options understood?
# - Do node types with optional fields have required values?
# ...
# The incoming request is now the pending one. It gets cleared when all
# agents have processed their config updates successfully, or their
# responses time out.
g_config_reqid_pending = req$id;
# Compare the instance configuration to our current one. If it matches,
# we can proceed to deploying the new data cluster topology. If it does
# not, we need to establish connectivity with agents we connect to, or
# wait until all instances that connect to us have done so. Either triggers
# a notify_agents_ready event, upon which we then deploy the data cluster.
# The current & new set of instance names.
local insts_current: set[string];
local insts_new: set[string];
# A set of current instances not contained in the new config.
# Those will need to get dropped.
local insts_to_drop: set[string];
# The opposite: new instances not yet in our current set. Those we will need
# to establish contact with (or they with us).
local insts_to_add: set[string];
# The overlap: instances in both the current and new set. For those we verify
# that we're actually dealign with the same entities, and might need to re-
# connect if not.
local insts_to_keep: set[string];
# Alternative representation of insts_to_add, directly providing the instances.
local insts_to_peer: table[string] of ClusterController::Types::Instance;
# Helpful locals.
local inst_name: string;
local inst: ClusterController::Types::Instance;
for ( inst_name in g_instances )
add insts_current[inst_name];
for ( inst in config$instances )
add insts_new[inst$name];
# Populate TODO lists for instances we need to drop, check, or add.
insts_to_drop = insts_current - insts_new;
insts_to_add = insts_new - insts_current;
insts_to_keep = insts_new & insts_current;
for ( inst in config$instances )
{
if ( inst$name in insts_to_add )
{
insts_to_peer[inst$name] = inst;
next;
}
# Focus on the keepers: check for change in identity/location.
if ( inst$name !in insts_to_keep )
next;
if ( is_instance_connectivity_change(inst) )
{
# The endpoint looks different. We drop the current one
# and need to re-establish connectivity with the new
# one.
add insts_to_drop[inst$name];
add insts_to_add[inst$name];
}
}
# Process our TODO lists. Handle drops first, then additions, in
# case we need to re-establish connectivity with an agent.
for ( inst_name in insts_to_drop )
drop_instance(g_instances[inst_name]);
for ( inst_name in insts_to_peer )
add_instance(insts_to_peer[inst_name]);
# Updates to out instance tables are complete, now check if we're already
# able to send the config to the agents:
check_instances_ready();
}
event ClusterController::API::get_instances_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid));
local res = ClusterController::Types::Result($reqid = reqid);
local insts: vector of ClusterController::Types::Instance;
for ( i in g_instances )
insts += g_instances[i];
res$data = insts;
ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid));
event ClusterController::API::get_instances_response(reqid, res);
}
event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterController::Types::Result)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_response %s", reqid));
# Retrieve state for the request we just got a response to
local areq = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(areq) )
return;
# Release the request, which is now done.
ClusterController::Request::finish(areq$id);
# Find the original request from the client
local req = ClusterController::Request::lookup(areq$parent_id);
if ( ClusterController::Request::is_null(req) )
return;
# Zeek's ingestion of an any-typed val via Broker yields an opaque
# Broker DataVal. When Zeek forwards this val via another event it stays
# in this opaque form. To avoid forcing recipients to distinguish
# whether the val is of the actual, intended (any-)type or a Broker
# DataVal wrapper, we explicitly cast it back to our intended Zeek
# type. This test case demonstrates: broker.remote_event_vector_any
result$data = result$data as ClusterController::Types::NodeStatusVec;
# Add this result to the overall response
req$results[|req$results|] = result;
# Mark this request as done by removing it from the table of pending
# ones. The following if-check should always be true.
if ( areq$id in req$get_nodes_state$requests )
delete req$get_nodes_state$requests[areq$id];
# If we still have pending queries out to the agents, do nothing: we'll
# handle this soon, or our request will time out and we respond with
# error.
if ( |req$get_nodes_state$requests| > 0 )
return;
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::get_nodes_response(req$id, req$results);
ClusterController::Request::finish(req$id);
}
event ClusterController::API::get_nodes_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterController::API::get_nodes_request %s", reqid));
# Special case: if we have no instances, respond right away.
if ( |g_instances| == 0 )
{
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", reqid));
event ClusterController::API::get_nodes_response(reqid, vector(
ClusterController::Types::Result($reqid=reqid, $success=F,
$error="no instances connected")));
return;
}
local req = ClusterController::Request::create(reqid);
req$get_nodes_state = GetNodesState();
for ( name in g_instances )
{
if ( name !in g_instances_ready )
next;
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create();
areq$parent_id = req$id;
add req$get_nodes_state$requests[areq$id];
ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::get_nodes_request, areq$id);
}
}
event ClusterController::Request::request_expired(req: ClusterController::Request::Request)
{
# Various handlers for timed-out request state. We use the state members
# to identify how to respond. No need to clean up the request itself,
# since we're getting here via the request module's expiration
# mechanism that handles the cleanup.
local res: ClusterController::Types::Result;
if ( req?$set_configuration_state )
{
# This timeout means we no longer have a pending request.
g_config_reqid_pending = "";
res = ClusterController::Types::Result($reqid=req$id);
res$success = F;
res$error = "request timed out";
req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
}
if ( req?$get_nodes_state )
{
res = ClusterController::Types::Result($reqid=req$id);
res$success = F;
res$error = "request timed out";
req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::get_nodes_response(req$id, req$results);
}
if ( req?$test_state )
{
res = ClusterController::Types::Result($reqid=req$id);
res$success = F;
res$error = "request timed out";
ClusterController::Log::info(fmt("tx ClusterController::API::test_timeout_response %s", req$id));
event ClusterController::API::test_timeout_response(req$id, res);
}
}
event ClusterController::API::test_timeout_request(reqid: string, with_state: bool)
{
ClusterController::Log::info(fmt("rx ClusterController::API::test_timeout_request %s %s", reqid, with_state));
if ( with_state )
{
# This state times out and triggers a timeout response in the
# above request_expired event handler.
local req = ClusterController::Request::create(reqid);
req$test_state = TestState();
}
}
event zeek_init()
{
# Initialize null config at startup. We will replace it once we have
# persistence, and again whenever we complete a client's
# set_configuration request.
g_config_current = null_config();
# The controller always listens -- it needs to be able to respond to the
# Zeek client. This port is also used by the agents if they connect to
# the client. The client doesn't automatically establish or accept
# connectivity to agents: agents are defined and communicated with as
# defined via configurations defined by the client.
local cni = ClusterController::network_info();
Broker::listen(cat(cni$address), cni$bound_port);
Broker::subscribe(ClusterAgent::topic_prefix);
Broker::subscribe(ClusterController::topic);
# Events sent to the client:
local events: vector of any = [
ClusterController::API::get_instances_response,
ClusterController::API::set_configuration_response,
ClusterController::API::get_nodes_response,
ClusterController::API::test_timeout_response
];
for ( i in events )
Broker::auto_publish(ClusterController::topic, events[i]);
ClusterController::Log::info("controller is live");
}

View file

@ -1,155 +0,0 @@
##! This module implements a request state abstraction that both cluster
##! controller and agent use to tie responses to received request events and be
##! able to time-out such requests.
@load ./types
@load ./config
module ClusterController::Request;
export {
## Request records track state associated with a request/response event
## pair. Calls to
## :zeek:see:`ClusterController::Request::create` establish such state
## when an entity sends off a request event, while
## :zeek:see:`ClusterController::Request::finish` clears the state when
## a corresponding response event comes in, or the state times out.
type Request: record {
## Each request has a hopfully unique ID provided by the requester.
id: string;
## For requests that result based upon another request (such as when
## the controller sends requests to agents based on a request it
## received by the client), this specifies that original, "parent"
## request.
parent_id: string &optional;
## The results vector builds up the list of results we eventually
## send to the requestor when we have processed the request.
results: ClusterController::Types::ResultVec &default=vector();
## An internal flag to track whether a request is complete.
finished: bool &default=F;
};
## A token request that serves as a null/nonexistant request.
global null_req = Request($id="", $finished=T);
## This function establishes request state.
##
## reqid: the identifier to use for the request.
##
global create: function(reqid: string &default=unique_id("")): Request;
## This function looks up the request for a given request ID and returns
## it. When no such request exists, returns ClusterController::Request::null_req.
##
## reqid: the ID of the request state to retrieve.
##
global lookup: function(reqid: string): Request;
## This function marks a request as complete and causes Zeek to release
## its internal state. When the request does not exist, this does
## nothing.
##
## reqid: the ID of the request state to releaase.
##
global finish: function(reqid: string): bool;
## This event fires when a request times out (as per the
## ClusterController::request_timeout) before it has been finished via
## ClusterController::Request::finish().
##
## req: the request state that is expiring.
##
global request_expired: event(req: Request);
## This function is a helper predicate to indicate whether a given
## request is null.
##
## request: a Request record to check.
##
## Returns: T if the given request matches the null_req instance, F otherwise.
##
global is_null: function(request: Request): bool;
## For troubleshooting, this function renders a request record to a string.
##
## request: the request to render.
##
global to_string: function(request: Request): string;
}
function requests_expire_func(reqs: table[string] of Request, reqid: string): interval
{
# No need to flag request expiration when we've already internally marked
# the request as done.
if ( ! reqs[reqid]$finished )
event ClusterController::Request::request_expired(reqs[reqid]);
return 0secs;
}
# This is the global request-tracking table. The table maps from request ID
# strings to corresponding Request records. Entries time out after the
# ClusterController::request_timeout interval. Upon expiration, a
# request_expired event triggers that conveys the request state.
global g_requests: table[string] of Request
&create_expire=ClusterController::request_timeout
&expire_func=requests_expire_func;
function create(reqid: string): Request
{
local ret = Request($id=reqid);
g_requests[reqid] = ret;
return ret;
}
function lookup(reqid: string): Request
{
if ( reqid in g_requests )
return g_requests[reqid];
return null_req;
}
function finish(reqid: string): bool
{
if ( reqid !in g_requests )
return F;
local req = g_requests[reqid];
delete g_requests[reqid];
req$finished = T;
return T;
}
function is_null(request: Request): bool
{
if ( request$id == "" )
return T;
return F;
}
function to_string(request: Request): string
{
local results: string_vec;
local res: ClusterController::Types::Result;
local parent_id = "";
if ( request?$parent_id )
parent_id = fmt(" (via %s)", request$parent_id);
for ( idx in request$results )
{
res = request$results[idx];
results[|results|] = ClusterController::Types::result_to_string(res);
}
return fmt("[request %s%s %s, results: %s]", request$id, parent_id,
request$finished ? "finished" : "pending",
join_string_vec(results, ","));
}

View file

@ -1,135 +0,0 @@
##! This module holds the basic types needed for the Cluster Controller
##! framework. These are used by both agent and controller, and several
##! have corresponding equals in the zeek-client implementation.
module ClusterController::Types;
export {
## Management infrastructure node type. This intentionally does not
## include the data cluster node types (worker, logger, etc) -- those
## continue to be managed by the cluster framework.
type Role: enum {
NONE, ##< No active role in cluster management
AGENT, ##< A cluster management agent.
CONTROLLER, ##< The cluster's controller.
};
## A Zeek-side option with value.
type Option: record {
name: string; ##< Name of option
value: string; ##< Value of option
};
## Configuration describing a Zeek instance running a Cluster
## Agent. Normally, there'll be one instance per cluster
## system: a single physical system.
type Instance: record {
## Unique, human-readable instance name
name: string;
## IP address of system
host: addr;
## Agent listening port. Not needed if agents connect to controller.
listen_port: port &optional;
};
type InstanceVec: vector of Instance;
## State that a Cluster Node can be in. State changes trigger an
## API notification (see notify_change()). The Pending state corresponds
## to the Supervisor not yet reporting a PID for a node when it has not
## yet fully launched.
type State: enum {
PENDING, ##< Not yet running
RUNNING, ##< Running and operating normally
STOPPED, ##< Explicitly stopped
FAILED, ##< Failed to start; and permanently halted
CRASHED, ##< Crashed, will be restarted,
UNKNOWN, ##< State not known currently (e.g., because of lost connectivity)
};
## Configuration describing a Cluster Node process.
type Node: record {
name: string; ##< Cluster-unique, human-readable node name
instance: string; ##< Name of instance where node is to run
role: Supervisor::ClusterRole; ##< Role of the node.
state: State; ##< Desired, or current, run state.
p: port &optional; ##< Port on which this node will listen
scripts: vector of string &optional; ##< Additional Zeek scripts for node
options: set[Option] &optional; ##< Zeek options for node
interface: string &optional; ##< Interface to sniff
cpu_affinity: int &optional; ##< CPU/core number to pin to
env: table[string] of string &default=table(); ##< Custom environment vars
};
## Data structure capturing a cluster's complete configuration.
type Configuration: record {
id: string &default=unique_id(""); ##< Unique identifier for a particular configuration
## The instances in the cluster.
instances: set[Instance] &default=set();
## The set of nodes in the cluster, as distributed over the instances.
nodes: set[Node] &default=set();
};
## The status of a Supervisor-managed node, as reported to the client in
## a get_nodes_request/get_nodes_response transaction.
type NodeStatus: record {
## Cluster-unique, human-readable node name
node: string;
## Current run state of the node.
state: State;
## Role the node plays in cluster management.
mgmt_role: Role &default=NONE;
## Role the node plays in the data cluster.
cluster_role: Supervisor::ClusterRole &default=Supervisor::NONE;
## Process ID of the node. This is optional because the Supervisor may not have
## a PID when a node is still bootstrapping.
pid: int &optional;
## The node's Broker peering listening port, if any.
p: port &optional;
};
type NodeStatusVec: vector of NodeStatus;
## Return value for request-response API event pairs
type Result: record {
reqid: string; ##< Request ID of operation this result refers to
instance: string &default=""; ##< Name of associated instance (for context)
success: bool &default=T; ##< True if successful
data: any &optional; ##< Addl data returned for successful operation
error: string &default=""; ##< Descriptive error on failure
node: string &optional; ##< Name of associated node (for context)
};
type ResultVec: vector of Result;
## Given a :zeek:see:`ClusterController::Types::Result` record,
## this function returns a string summarizing it.
global result_to_string: function(res: Result): string;
}
function result_to_string(res: Result): string
{
local result = "";
if ( res$success )
result = "success";
else if ( res$error != "" )
result = fmt("error (%s)", res$error);
else
result = "error";
local details: string_vec;
if ( res$reqid != "" )
details[|details|] = fmt("reqid %s", res$reqid);
if ( res$instance != "" )
details[|details|] = fmt("instance %s", res$instance);
if ( res?$node && res$node != "" )
details[|details|] = fmt("node %s", res$node);
if ( |details| > 0 )
result = fmt("%s (%s)", result, join_string_vec(details, ", "));
return result;
}

View file

@ -1,25 +0,0 @@
##! Utility functions for the cluster controller framework, available to agent
##! and controller.
module ClusterController::Util;
export {
## Renders a set of strings to an alphabetically sorted vector.
##
## ss: the string set to convert.
##
## Returns: the vector of all strings in ss.
global set_to_vector: function(ss: set[string]): vector of string;
}
function set_to_vector(ss: set[string]): vector of string
{
local res: vector of string;
for ( s in ss )
res[|res|] = s;
sort(res, strcmp);
return res;
}