Merge branch 'topic/christian/controller-renaming'

* topic/christian/controller-renaming:
  Bump external cluster testsuite to reflect Management framework reorg
  Bump zeek-client to reflect Management framework reorg
  Reorg of the cluster controller to new "Management framework" layout
This commit is contained in:
Christian Kreibich 2022-02-10 17:54:14 -08:00
commit 40fa1a0769
29 changed files with 565 additions and 539 deletions

View file

@ -1,3 +1,11 @@
5.0.0-dev.112 | 2022-02-10 17:56:27 -0800
* Reorg of the cluster controller to new "Management framework" layout (Christian Kreibich, Corelight)
* Bump external cluster testsuite to reflect Management framework reorg (Christian Kreibich, Corelight)
* Bump zeek-client to reflect Management framework reorg (Christian Kreibich, Corelight)
5.0.0-dev.108 | 2022-02-10 10:35:02 -0700 5.0.0-dev.108 | 2022-02-10 10:35:02 -0700
* Fixing a big pile of Coverity issues (Tim Wojtulewicz, Corelight) * Fixing a big pile of Coverity issues (Tim Wojtulewicz, Corelight)

View file

@ -1 +1 @@
5.0.0-dev.108 5.0.0-dev.112

@ -1 +1 @@
Subproject commit 3a3aa08e0d68d7f734fb92329798c375978ccb3a Subproject commit c2af7381c584b6545517843872747598bb0e25d5

View file

@ -1,4 +0,0 @@
##! The entry point for the cluster agent. It runs bootstrap logic for launching
##! the agent process via Zeek's Supervisor.
@load ./boot

View file

@ -1,4 +0,0 @@
##! The entry point for the cluster controller. It runs bootstrap logic for
##! launching the controller process via Zeek's Supervisor.
@load ./boot

View file

@ -1,36 +0,0 @@
##! The cluster controller's boot logic runs in Zeek's supervisor and instructs
##! it to launch the controller process. The controller's main logic resides in
##! main.zeek, similarly to other frameworks. The new process will execute that
##! script.
##!
##! If the current process is not the Zeek supervisor, this does nothing.
@load ./config
event zeek_init()
{
if ( ! Supervisor::is_supervisor() )
return;
local epi = ClusterController::endpoint_info();
local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T,
$scripts=vector("policy/frameworks/cluster/controller/main.zeek"));
if ( ClusterController::directory != "" )
sn$directory = ClusterController::directory;
if ( ClusterController::stdout_file != "" )
sn$stdout_file = ClusterController::stdout_file;
if ( ClusterController::stderr_file != "" )
sn$stderr_file = ClusterController::stderr_file;
# This helps Zeek run controller and agent with a minimal set of scripts.
sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "CONTROLLER";
local res = Supervisor::create(sn);
if ( res != "" )
{
print(fmt("error: supervisor could not create controller node: %s", res));
exit(1);
}
}

View file

@ -1,110 +0,0 @@
##! Configuration settings for the cluster controller.
@load policy/frameworks/cluster/agent/config
module ClusterController;
export {
## The name of this controller. Defaults to the value of the
## ZEEK_CONTROLLER_NAME environment variable. When that is unset and the
## user doesn't redef the value, the implementation defaults to
## "controller-<hostname>".
const name = getenv("ZEEK_CONTROLLER_NAME") &redef;
## The controller's stdout log name. If the string is non-empty, Zeek will
## produce a free-form log (i.e., not one governed by Zeek's logging
## framework) in Zeek's working directory. If left empty, no such log
## results.
##
## Note that the controller also establishes a "proper" Zeek log via the
## :zeek:see:`ClusterController::Log` module.
const stdout_file = "controller.stdout" &redef;
## The controller's stderr log name. Like :zeek:see:`ClusterController::stdout_file`,
## but for the stderr stream.
const stderr_file = "controller.stderr" &redef;
## The network address the controller listens on. By default this uses
## the value of the ZEEK_CONTROLLER_ADDR environment variable, but you
## may also redef to a specific value. When empty, the implementation
## falls back to :zeek:see:`ClusterController::default_address`.
const listen_address = getenv("ZEEK_CONTROLLER_ADDR") &redef;
## The fallback listen address if :zeek:see:`ClusterController::listen_address`
## remains empty. Unless redefined, this uses Broker's own default
## listen address.
const default_address = Broker::default_listen_address &redef;
## The network port the controller listens on. Counterpart to
## :zeek:see:`ClusterController::listen_address`, defaulting to the
## ZEEK_CONTROLLER_PORT environment variable.
const listen_port = getenv("ZEEK_CONTROLLER_PORT") &redef;
## The fallback listen port if :zeek:see:`ClusterController::listen_port`
## remains empty.
const default_port = 2150/tcp &redef;
## The controller's connect retry interval. Defaults to a more
## aggressive value compared to Broker's 30s.
const connect_retry = 1sec &redef;
## The controller's Broker topic. Clients send requests to this topic.
const topic = "zeek/cluster-control/controller" &redef;
## The role of this process in cluster management. Agent and controller
## both redefine this. Used during logging.
const role = ClusterController::Types::NONE &redef;
## The timeout for request state. Such state (see the :zeek:see:`ClusterController::Request`
## module) ties together request and response event pairs. The timeout causes
## its cleanup in the absence of a timely response. It applies both to
## state kept for client requests, as well as state in the agents for
## requests to the supervisor.
const request_timeout = 10sec &redef;
## An optional custom output directory for the controller's stdout and
## stderr logs. Agent and controller currently only log locally, not via
## the data cluster's logger node. (This might change in the future.)
## This means that if both write to the same log file, the output gets
## garbled.
const directory = "" &redef;
## Returns a :zeek:see:`Broker::NetworkInfo` record describing the controller.
global network_info: function(): Broker::NetworkInfo;
## Returns a :zeek:see:`Broker::EndpointInfo` record describing the controller.
global endpoint_info: function(): Broker::EndpointInfo;
}
function network_info(): Broker::NetworkInfo
{
local ni: Broker::NetworkInfo;
if ( ClusterController::listen_address != "" )
ni$address = ClusterController::listen_address;
else if ( ClusterController::default_address != "" )
ni$address = ClusterController::default_address;
else
ni$address = "127.0.0.1";
if ( ClusterController::listen_port != "" )
ni$bound_port = to_port(ClusterController::listen_port);
else
ni$bound_port = ClusterController::default_port;
return ni;
}
function endpoint_info(): Broker::EndpointInfo
{
local epi: Broker::EndpointInfo;
if ( ClusterController::name != "" )
epi$id = ClusterController::name;
else
epi$id = fmt("controller-%s", gethostname());
epi$network = network_info();
return epi;
}

View file

@ -0,0 +1,11 @@
##! This loads Management framework functionality needed by both the controller
##! and agents. Note that there's no notion of loading "the Management
##! framework" -- one always loads "management/controller" or
##! "management/agent". This __load__ script exists only to simplify loading all
##! common functionality.
@load ./config
@load ./log
@load ./request
@load ./types
@load ./util

View file

@ -0,0 +1,4 @@
##! The entry point for the Management framework's cluster agent. It runs
##! bootstrap logic for launching the agent process via Zeek's Supervisor.
@load ./boot

View file

@ -4,9 +4,9 @@
##! "_response", respectively. ##! "_response", respectively.
@load base/frameworks/supervisor/control @load base/frameworks/supervisor/control
@load policy/frameworks/cluster/controller/types @load policy/frameworks/management/types
module ClusterAgent::API; module Management::Agent::API;
export { export {
## A simple versioning scheme, used to track basic compatibility of ## A simple versioning scheme, used to track basic compatibility of
@ -21,14 +21,14 @@ export {
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
## config: a :zeek:see:`ClusterController::Types::Configuration` record ## config: a :zeek:see:`Management::Configuration` record
## describing the cluster topology. Note that this contains the full ## describing the cluster topology. Note that this contains the full
## topology, not just the part pertaining to this agent. That's because ## topology, not just the part pertaining to this agent. That's because
## the cluster framework requires full cluster visibility to establish ## the cluster framework requires full cluster visibility to establish
## the needed peerings. ## the needed peerings.
## ##
global set_configuration_request: event(reqid: string, global set_configuration_request: event(reqid: string,
config: ClusterController::Types::Configuration); config: Management::Configuration);
## Response to a set_configuration_request event. The agent sends ## Response to a set_configuration_request event. The agent sends
## this back to the controller. ## this back to the controller.
@ -38,11 +38,11 @@ export {
## result: the result record. ## result: the result record.
## ##
global set_configuration_response: event(reqid: string, global set_configuration_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
## The controller sends this event to request a list of ## The controller sends this event to request a list of
## :zeek:see:`ClusterController::Types::NodeStatus` records that capture ## :zeek:see:`Management::NodeStatus` records that capture
## the status of Supervisor-managed nodes running on this instance. ## the status of Supervisor-managed nodes running on this instance.
## instances. ## instances.
## ##
@ -55,13 +55,13 @@ export {
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:see:`ClusterController::Types::Result` record. Its data ## result: a :zeek:see:`Management::Result` record. Its data
## member is a vector of :zeek:see:`ClusterController::Types::NodeStatus` ## member is a vector of :zeek:see:`Management::NodeStatus`
## records, covering the nodes at this instance. The result may also ## records, covering the nodes at this instance. The result may also
## indicate failure, with error messages indicating what went wrong. ## indicate failure, with error messages indicating what went wrong.
## ##
global get_nodes_response: event(reqid: string, global get_nodes_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
## The controller sends this event to confirm to the agent that it is ## The controller sends this event to confirm to the agent that it is
## part of the current cluster topology. The agent acknowledges with the ## part of the current cluster topology. The agent acknowledges with the
@ -79,7 +79,7 @@ export {
## result: the result record. ## result: the result record.
## ##
global agent_welcome_response: event(reqid: string, global agent_welcome_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
## The controller sends this event to convey that the agent is not ## The controller sends this event to convey that the agent is not
@ -102,7 +102,7 @@ export {
## result: the result record. ## result: the result record.
## ##
global agent_standby_response: event(reqid: string, global agent_standby_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
# Notification events, agent -> controller # Notification events, agent -> controller
@ -112,7 +112,7 @@ export {
## communicate with. It is a controller-level equivalent of ## communicate with. It is a controller-level equivalent of
## `:zeek:see:`Broker::peer_added`. ## `:zeek:see:`Broker::peer_added`.
## ##
## instance: an instance name, really the agent's name as per :zeek:see:`ClusterAgent::name`. ## instance: an instance name, really the agent's name as per :zeek:see:`Management::Agent::name`.
## ##
## host: the IP address of the agent. (This may change in the future.) ## host: the IP address of the agent. (This may change in the future.)
## ##
@ -126,9 +126,9 @@ export {
# Report node state changes. # Report node state changes.
global notify_change: event(instance: string, global notify_change: event(instance: string,
n: ClusterController::Types::Node, n: Management::Node,
old: ClusterController::Types::State, old: Management::State,
new: ClusterController::Types::State); new: Management::State);
# Report operational error. # Report operational error.
global notify_error: event(instance: string, msg: string, node: string &default=""); global notify_error: event(instance: string, msg: string, node: string &default="");

View file

@ -1,5 +1,5 @@
##! The cluster agent boot logic runs in Zeek's supervisor and instructs it to ##! The cluster agent boot logic runs in Zeek's supervisor and instructs it to
##! launch an agent process. The agent's main logic resides in main.zeek, ##! launch a Management agent process. The agent's main logic resides in main.zeek,
##! similarly to other frameworks. The new process will execute that script. ##! similarly to other frameworks. The new process will execute that script.
##! ##!
##! If the current process is not the Zeek supervisor, this does nothing. ##! If the current process is not the Zeek supervisor, this does nothing.
@ -17,16 +17,16 @@ event zeek_init()
if ( ! Supervisor::is_supervisor() ) if ( ! Supervisor::is_supervisor() )
return; return;
local epi = ClusterAgent::endpoint_info(); local epi = Management::Agent::endpoint_info();
local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T, local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T,
$scripts=vector("policy/frameworks/cluster/agent/main.zeek")); $scripts=vector("policy/frameworks/management/agent/main.zeek"));
if ( ClusterAgent::directory != "" ) if ( Management::Agent::directory != "" )
sn$directory = ClusterAgent::directory; sn$directory = Management::Agent::directory;
if ( ClusterAgent::stdout_file_suffix != "" ) if ( Management::Agent::stdout_file_suffix != "" )
sn$stdout_file = epi$id + "." + ClusterAgent::stdout_file_suffix; sn$stdout_file = epi$id + "." + Management::Agent::stdout_file_suffix;
if ( ClusterAgent::stderr_file_suffix != "" ) if ( Management::Agent::stderr_file_suffix != "" )
sn$stderr_file = epi$id + "." + ClusterAgent::stderr_file_suffix; sn$stderr_file = epi$id + "." + Management::Agent::stderr_file_suffix;
# This helps Zeek run controller and agent with a minimal set of scripts. # This helps Zeek run controller and agent with a minimal set of scripts.
sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "AGENT"; sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "AGENT";

View file

@ -1,8 +1,9 @@
##! Configuration settings for a cluster agent. ##! Configuration settings for a cluster agent.
@load policy/frameworks/cluster/controller/types @load policy/frameworks/management/config
@load policy/frameworks/management/types
module ClusterAgent; module Management::Agent;
export { export {
## The name this agent uses to represent the cluster instance it ## The name this agent uses to represent the cluster instance it
@ -14,55 +15,49 @@ export {
## Agent stdout log configuration. If the string is non-empty, Zeek will ## Agent stdout log configuration. If the string is non-empty, Zeek will
## produce a free-form log (i.e., not one governed by Zeek's logging ## produce a free-form log (i.e., not one governed by Zeek's logging
## framework) in Zeek's working directory. The final log's name is ## framework) in Zeek's working directory. The final log's name is
## "<name>.<suffix>", where the name is taken from :zeek:see:`ClusterAgent::name`, ## "<name>.<suffix>", where the name is taken from :zeek:see:`Management::Agent::name`,
## and the suffix is defined by the following variable. If left empty, ## and the suffix is defined by the following variable. If left empty,
## no such log results. ## no such log results.
## ##
## Note that the agent also establishes a "proper" Zeek log via the ## Note that the agent also establishes a "proper" Zeek log via the
## :zeek:see:`ClusterController::Log` module. ## :zeek:see:`Management::Log` module.
const stdout_file_suffix = "agent.stdout" &redef; const stdout_file_suffix = "agent.stdout" &redef;
## Agent stderr log configuration. Like :zeek:see:`ClusterAgent::stdout_file_suffix`, ## Agent stderr log configuration. Like :zeek:see:`Management::Agent::stdout_file_suffix`,
## but for the stderr stream. ## but for the stderr stream.
const stderr_file_suffix = "agent.stderr" &redef; const stderr_file_suffix = "agent.stderr" &redef;
## The network address the agent listens on. This only takes effect if ## The network address the agent listens on. This only takes effect if
## the agent isn't configured to connect to the controller (see ## the agent isn't configured to connect to the controller (see
## :zeek:see:`ClusterAgent::controller`). By default this uses the value of the ## :zeek:see:`Management::Agent::controller`). By default this uses the value of the
## ZEEK_AGENT_ADDR environment variable, but you may also redef to ## ZEEK_AGENT_ADDR environment variable, but you may also redef to
## a specific value. When empty, the implementation falls back to ## a specific value. When empty, the implementation falls back to
## :zeek:see:`ClusterAgent::default_address`. ## :zeek:see:`Management::default_address`.
const listen_address = getenv("ZEEK_AGENT_ADDR") &redef; const listen_address = getenv("ZEEK_AGENT_ADDR") &redef;
## The fallback listen address if :zeek:see:`ClusterAgent::listen_address`
## remains empty. Unless redefined, this uses Broker's own default listen
## address.
const default_address = Broker::default_listen_address &redef;
## The network port the agent listens on. Counterpart to ## The network port the agent listens on. Counterpart to
## :zeek:see:`ClusterAgent::listen_address`, defaulting to the ZEEK_AGENT_PORT ## :zeek:see:`Management::Agent::listen_address`, defaulting to the ZEEK_AGENT_PORT
## environment variable. ## environment variable.
const listen_port = getenv("ZEEK_AGENT_PORT") &redef; const listen_port = getenv("ZEEK_AGENT_PORT") &redef;
## The fallback listen port if :zeek:see:`ClusterAgent::listen_port` remains empty. ## The fallback listen port if :zeek:see:`Management::Agent::listen_port` remains empty.
const default_port = 2151/tcp &redef; const default_port = 2151/tcp &redef;
## The agent's Broker topic prefix. For its own communication, the agent ## The agent's Broker topic prefix. For its own communication, the agent
## suffixes this with "/<name>", based on :zeek:see:`ClusterAgent::name`. ## suffixes this with "/<name>", based on :zeek:see:`Management::Agent::name`.
const topic_prefix = "zeek/cluster-control/agent" &redef; const topic_prefix = "zeek/management/agent" &redef;
## The network coordinates of the controller. When defined, the agent ## The network coordinates of the controller. When defined, the agent
## peers with (and connects to) the controller; otherwise the controller ## peers with (and connects to) the controller; otherwise the controller
## will peer (and connect to) the agent, listening as defined by ## will peer (and connect to) the agent, listening as defined by
## :zeek:see:`ClusterAgent::listen_address` and :zeek:see:`ClusterAgent::listen_port`. ## :zeek:see:`Management::Agent::listen_address` and :zeek:see:`Management::Agent::listen_port`.
const controller: Broker::NetworkInfo = [ const controller: Broker::NetworkInfo = [
$address="0.0.0.0", $bound_port=0/unknown] &redef; $address="0.0.0.0", $bound_port=0/unknown] &redef;
## An optional custom output directory for the agent's stdout and stderr ## An optional custom output directory for stdout/stderr. Agent and
## logs. Agent and controller currently only log locally, not via the ## controller currently only log locally, not via the data cluster's
## data cluster's logger node. (This might change in the future.) This ## logger node. This means that if both write to the same log file,
## means that if both write to the same log file, the output gets ## output gets garbled.
## garbled.
const directory = "" &redef; const directory = "" &redef;
## The working directory for data cluster nodes created by this ## The working directory for data cluster nodes created by this
@ -71,20 +66,20 @@ export {
## cluster nodes. ## cluster nodes.
const cluster_directory = "" &redef; const cluster_directory = "" &redef;
## Returns a :zeek:see:`ClusterController::Types::Instance` describing this ## Returns a :zeek:see:`Management::Instance` describing this
## instance (its agent name plus listening address/port, as applicable). ## instance (its agent name plus listening address/port, as applicable).
global instance: function(): ClusterController::Types::Instance; global instance: function(): Management::Instance;
## Returns a :zeek:see:`Broker::EndpointInfo` record for this instance. ## Returns a :zeek:see:`Broker::EndpointInfo` record for this instance.
## Similar to :zeek:see:`ClusterAgent::instance`, but with slightly different ## Similar to :zeek:see:`Management::Agent::instance`, but with slightly different
## data format. ## data format.
global endpoint_info: function(): Broker::EndpointInfo; global endpoint_info: function(): Broker::EndpointInfo;
} }
function instance(): ClusterController::Types::Instance function instance(): Management::Instance
{ {
local epi = endpoint_info(); local epi = endpoint_info();
return ClusterController::Types::Instance($name=epi$id, return Management::Instance($name=epi$id,
$host=to_addr(epi$network$address), $host=to_addr(epi$network$address),
$listen_port=epi$network$bound_port); $listen_port=epi$network$bound_port);
} }
@ -94,22 +89,22 @@ function endpoint_info(): Broker::EndpointInfo
local epi: Broker::EndpointInfo; local epi: Broker::EndpointInfo;
local network: Broker::NetworkInfo; local network: Broker::NetworkInfo;
if ( ClusterAgent::name != "" ) if ( Management::Agent::name != "" )
epi$id = ClusterAgent::name; epi$id = Management::Agent::name;
else else
epi$id = fmt("agent-%s", gethostname()); epi$id = fmt("agent-%s", gethostname());
if ( ClusterAgent::listen_address != "" ) if ( Management::Agent::listen_address != "" )
network$address = ClusterAgent::listen_address; network$address = Management::Agent::listen_address;
else if ( ClusterAgent::default_address != "" ) else if ( Management::default_address != "" )
network$address = ClusterAgent::default_address; network$address = Management::default_address;
else else
network$address = "127.0.0.1"; network$address = "127.0.0.1";
if ( ClusterAgent::listen_port != "" ) if ( Management::Agent::listen_port != "" )
network$bound_port = to_port(ClusterAgent::listen_port); network$bound_port = to_port(Management::Agent::listen_port);
else else
network$bound_port = ClusterAgent::default_port; network$bound_port = Management::Agent::default_port;
epi$network = network; epi$network = network;

View file

@ -4,14 +4,12 @@
##! supervisor. ##! supervisor.
@load base/frameworks/broker @load base/frameworks/broker
@load policy/frameworks/management
@load policy/frameworks/cluster/controller/config
@load policy/frameworks/cluster/controller/log
@load policy/frameworks/cluster/controller/request
@load ./api @load ./api
@load ./config
module ClusterAgent::Runtime; module Mangement::Agent::Runtime;
# This export is mainly to appease Zeekygen's need to understand redefs of the # This export is mainly to appease Zeekygen's need to understand redefs of the
# Request record below. Without it, it fails to establish link targets for the # Request record below. Without it, it fails to establish link targets for the
@ -23,20 +21,21 @@ export {
}; };
} }
redef record ClusterController::Request::Request += { redef record Management::Request::Request += {
supervisor_state: SupervisorState &optional; supervisor_state: SupervisorState &optional;
}; };
redef ClusterController::role = ClusterController::Types::AGENT; # Tag our logs correctly
redef Management::Log::role = Management::AGENT;
# The global configuration as passed to us by the controller # The global configuration as passed to us by the controller
global g_config: ClusterController::Types::Configuration; global g_config: Management::Configuration;
# A map to make other instance info accessible # A map to make other instance info accessible
global g_instances: table[string] of ClusterController::Types::Instance; global g_instances: table[string] of Management::Instance;
# A map for the nodes we run on this instance, via this agent. # A map for the nodes we run on this instance, via this agent.
global g_nodes: table[string] of ClusterController::Types::Node; global g_nodes: table[string] of Management::Node;
# The node map employed by the supervisor to describe the cluster # The node map employed by the supervisor to describe the cluster
# topology to newly forked nodes. We refresh it when we receive # topology to newly forked nodes. We refresh it when we receive
@ -46,8 +45,8 @@ global g_data_cluster: table[string] of Supervisor::ClusterEndpoint;
event SupervisorControl::create_response(reqid: string, result: string) event SupervisorControl::create_response(reqid: string, result: string)
{ {
local req = ClusterController::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
local name = req$supervisor_state$node; local name = req$supervisor_state$node;
@ -55,17 +54,17 @@ event SupervisorControl::create_response(reqid: string, result: string)
if ( |result| > 0 ) if ( |result| > 0 )
{ {
local msg = fmt("failed to create node %s: %s", name, result); local msg = fmt("failed to create node %s: %s", name, result);
ClusterController::Log::error(msg); Management::Log::error(msg);
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); event Management::Agent::API::notify_error(Management::Agent::name, msg, name);
} }
ClusterController::Request::finish(reqid); Management::Request::finish(reqid);
} }
event SupervisorControl::destroy_response(reqid: string, result: bool) event SupervisorControl::destroy_response(reqid: string, result: bool)
{ {
local req = ClusterController::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
local name = req$supervisor_state$node; local name = req$supervisor_state$node;
@ -73,35 +72,35 @@ event SupervisorControl::destroy_response(reqid: string, result: bool)
if ( ! result ) if ( ! result )
{ {
local msg = fmt("failed to destroy node %s, %s", name, reqid); local msg = fmt("failed to destroy node %s, %s", name, reqid);
ClusterController::Log::error(msg); Management::Log::error(msg);
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); event Management::Agent::API::notify_error(Management::Agent::name, msg, name);
} }
ClusterController::Request::finish(reqid); Management::Request::finish(reqid);
} }
function supervisor_create(nc: Supervisor::NodeConfig) function supervisor_create(nc: Supervisor::NodeConfig)
{ {
local req = ClusterController::Request::create(); local req = Management::Request::create();
req$supervisor_state = SupervisorState($node = nc$name); req$supervisor_state = SupervisorState($node = nc$name);
event SupervisorControl::create_request(req$id, nc); event SupervisorControl::create_request(req$id, nc);
ClusterController::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); Management::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id));
} }
function supervisor_destroy(node: string) function supervisor_destroy(node: string)
{ {
local req = ClusterController::Request::create(); local req = Management::Request::create();
req$supervisor_state = SupervisorState($node = node); req$supervisor_state = SupervisorState($node = node);
event SupervisorControl::destroy_request(req$id, node); event SupervisorControl::destroy_request(req$id, node);
ClusterController::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); Management::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id));
} }
event ClusterAgent::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) event Management::Agent::API::set_configuration_request(reqid: string, config: Management::Configuration)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::set_configuration_request %s", reqid));
local nodename: string; local nodename: string;
local node: ClusterController::Types::Node; local node: Management::Node;
local nc: Supervisor::NodeConfig; local nc: Supervisor::NodeConfig;
local msg: string; local msg: string;
@ -126,7 +125,7 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste
g_data_cluster = table(); g_data_cluster = table();
for ( node in config$nodes ) for ( node in config$nodes )
{ {
if ( node$instance == ClusterAgent::name ) if ( node$instance == Management::Agent::name )
g_nodes[node$name] = node; g_nodes[node$name] = node;
# The cluster and supervisor frameworks require a port for every # The cluster and supervisor frameworks require a port for every
@ -155,8 +154,8 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste
node = g_nodes[nodename]; node = g_nodes[nodename];
nc = Supervisor::NodeConfig($name=nodename); nc = Supervisor::NodeConfig($name=nodename);
if ( ClusterAgent::cluster_directory != "" ) if ( Management::Agent::cluster_directory != "" )
nc$directory = ClusterAgent::cluster_directory; nc$directory = Management::Agent::cluster_directory;
if ( node?$interface ) if ( node?$interface )
nc$interface = node$interface; nc$interface = node$interface;
@ -181,34 +180,34 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste
if ( reqid != "" ) if ( reqid != "" )
{ {
local res = ClusterController::Types::Result( local res = Management::Result(
$reqid = reqid, $reqid = reqid,
$instance = ClusterAgent::name); $instance = Management::Agent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Agent::API::set_configuration_response %s",
ClusterController::Types::result_to_string(res))); Management::result_to_string(res)));
event ClusterAgent::API::set_configuration_response(reqid, res); event Management::Agent::API::set_configuration_response(reqid, res);
} }
} }
event SupervisorControl::status_response(reqid: string, result: Supervisor::Status) event SupervisorControl::status_response(reqid: string, result: Supervisor::Status)
{ {
local req = ClusterController::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
ClusterController::Request::finish(reqid); Management::Request::finish(reqid);
local res = ClusterController::Types::Result( local res = Management::Result(
$reqid = req$parent_id, $instance = ClusterAgent::name); $reqid = req$parent_id, $instance = Management::Agent::name);
local node_statuses: ClusterController::Types::NodeStatusVec; local node_statuses: Management::NodeStatusVec;
for ( node in result$nodes ) for ( node in result$nodes )
{ {
local sns = result$nodes[node]; # Supervisor node status local sns = result$nodes[node]; # Supervisor node status
local cns = ClusterController::Types::NodeStatus( local cns = Management::NodeStatus(
$node=node, $state=ClusterController::Types::PENDING); $node=node, $state=Management::PENDING);
# Identify the role of the node. For data cluster roles (worker, # Identify the role of the node. For data cluster roles (worker,
# manager, etc) we derive this from the cluster node table. For # manager, etc) we derive this from the cluster node table. For
@ -232,20 +231,20 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
local role = sns$node$env["ZEEK_CLUSTER_MGMT_NODE"]; local role = sns$node$env["ZEEK_CLUSTER_MGMT_NODE"];
if ( role == "CONTROLLER" ) if ( role == "CONTROLLER" )
{ {
cns$mgmt_role = ClusterController::Types::CONTROLLER; cns$mgmt_role = Management::CONTROLLER;
# The controller always listens, so the Zeek client can connect. # The controller always listens, so the Zeek client can connect.
cns$p = ClusterController::endpoint_info()$network$bound_port; cns$p = Management::Agent::endpoint_info()$network$bound_port;
} }
else if ( role == "AGENT" ) else if ( role == "AGENT" )
{ {
cns$mgmt_role = ClusterController::Types::AGENT; cns$mgmt_role = Management::AGENT;
# If we have a controller address, the agent connects to it # If we have a controller address, the agent connects to it
# and does not listen. See zeek_init() below for similar logic. # and does not listen. See zeek_init() below for similar logic.
if ( ClusterAgent::controller$address == "0.0.0.0" ) if ( Management::Agent::controller$address == "0.0.0.0" )
cns$p = ClusterAgent::endpoint_info()$network$bound_port; cns$p = Management::Agent::endpoint_info()$network$bound_port;
} }
else else
ClusterController::Log::warning(fmt( Management::Log::warning(fmt(
"unexpected cluster management node type '%'", role)); "unexpected cluster management node type '%'", role));
} }
} }
@ -255,7 +254,7 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
if ( sns?$pid ) if ( sns?$pid )
{ {
cns$pid = sns$pid; cns$pid = sns$pid;
cns$state = ClusterController::Types::RUNNING; cns$state = Management::RUNNING;
} }
node_statuses += cns; node_statuses += cns;
@ -263,53 +262,53 @@ event SupervisorControl::status_response(reqid: string, result: Supervisor::Stat
res$data = node_statuses; res$data = node_statuses;
ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Agent::API::get_nodes_response %s",
ClusterController::Types::result_to_string(res))); Management::result_to_string(res)));
event ClusterAgent::API::get_nodes_response(req$parent_id, res); event Management::Agent::API::get_nodes_response(req$parent_id, res);
} }
event ClusterAgent::API::get_nodes_request(reqid: string) event Management::Agent::API::get_nodes_request(reqid: string)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::get_nodes_request %s", reqid));
local req = ClusterController::Request::create(); local req = Management::Request::create();
req$parent_id = reqid; req$parent_id = reqid;
event SupervisorControl::status_request(req$id, ""); event SupervisorControl::status_request(req$id, "");
ClusterController::Log::info(fmt("issued supervisor status, %s", req$id)); Management::Log::info(fmt("issued supervisor status, %s", req$id));
} }
event ClusterAgent::API::agent_welcome_request(reqid: string) event Management::Agent::API::agent_welcome_request(reqid: string)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_request %s", reqid));
local res = ClusterController::Types::Result( local res = Management::Result(
$reqid = reqid, $reqid = reqid,
$instance = ClusterAgent::name); $instance = Management::Agent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_response %s", Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_response %s",
ClusterController::Types::result_to_string(res))); Management::result_to_string(res)));
event ClusterAgent::API::agent_welcome_response(reqid, res); event Management::Agent::API::agent_welcome_response(reqid, res);
} }
event ClusterAgent::API::agent_standby_request(reqid: string) event Management::Agent::API::agent_standby_request(reqid: string)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_standby_request %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::agent_standby_request %s", reqid));
# We shut down any existing cluster nodes via an empty configuration, # We shut down any existing cluster nodes via an empty configuration,
# and fall silent. We do not unpeer/disconnect (assuming we earlier # and fall silent. We do not unpeer/disconnect (assuming we earlier
# peered/connected -- otherwise there's nothing we can do here via # peered/connected -- otherwise there's nothing we can do here via
# Broker anyway), mainly to keep open the possibility of running # Broker anyway), mainly to keep open the possibility of running
# cluster nodes again later. # cluster nodes again later.
event ClusterAgent::API::set_configuration_request("", ClusterController::Types::Configuration()); event Management::Agent::API::set_configuration_request("", Management::Configuration());
local res = ClusterController::Types::Result( local res = Management::Result(
$reqid = reqid, $reqid = reqid,
$instance = ClusterAgent::name); $instance = Management::Agent::name);
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_response %s", Management::Log::info(fmt("tx Management::Agent::API::agent_standby_response %s",
ClusterController::Types::result_to_string(res))); Management::result_to_string(res)));
event ClusterAgent::API::agent_standby_response(reqid, res); event Management::Agent::API::agent_standby_response(reqid, res);
} }
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
@ -318,10 +317,10 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
# is in fact a controller, so we might send this in vain. # is in fact a controller, so we might send this in vain.
# Controllers register the agent upon receipt of the event. # Controllers register the agent upon receipt of the event.
local epi = ClusterAgent::endpoint_info(); local epi = Management::Agent::endpoint_info();
event ClusterAgent::API::notify_agent_hello(epi$id, event Management::Agent::API::notify_agent_hello(epi$id,
to_addr(epi$network$address), ClusterAgent::API::version); to_addr(epi$network$address), Management::Agent::API::version);
} }
# XXX We may want a request timeout event handler here. It's arguably cleaner to # XXX We may want a request timeout event handler here. It's arguably cleaner to
@ -330,8 +329,8 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
event zeek_init() event zeek_init()
{ {
local epi = ClusterAgent::endpoint_info(); local epi = Management::Agent::endpoint_info();
local agent_topic = ClusterAgent::topic_prefix + "/" + epi$id; local agent_topic = Management::Agent::topic_prefix + "/" + epi$id;
# The agent needs to peer with the supervisor -- this doesn't currently # The agent needs to peer with the supervisor -- this doesn't currently
# happen automatically. The address defaults to Broker's default, which # happen automatically. The address defaults to Broker's default, which
@ -350,15 +349,15 @@ event zeek_init()
# Auto-publish a bunch of events. Glob patterns or module-level # Auto-publish a bunch of events. Glob patterns or module-level
# auto-publish would be helpful here. # auto-publish would be helpful here.
Broker::auto_publish(agent_topic, ClusterAgent::API::get_nodes_response); Broker::auto_publish(agent_topic, Management::Agent::API::get_nodes_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); Broker::auto_publish(agent_topic, Management::Agent::API::set_configuration_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response); Broker::auto_publish(agent_topic, Management::Agent::API::agent_welcome_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response); Broker::auto_publish(agent_topic, Management::Agent::API::agent_standby_response);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello); Broker::auto_publish(agent_topic, Management::Agent::API::notify_agent_hello);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change); Broker::auto_publish(agent_topic, Management::Agent::API::notify_change);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error); Broker::auto_publish(agent_topic, Management::Agent::API::notify_error);
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log); Broker::auto_publish(agent_topic, Management::Agent::API::notify_log);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request);
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::status_request);
@ -367,12 +366,12 @@ event zeek_init()
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request);
# Establish connectivity with the controller. # Establish connectivity with the controller.
if ( ClusterAgent::controller$address != "0.0.0.0" ) if ( Management::Agent::controller$address != "0.0.0.0" )
{ {
# We connect to the controller. # We connect to the controller.
Broker::peer(ClusterAgent::controller$address, Broker::peer(Management::Agent::controller$address,
ClusterAgent::controller$bound_port, Management::Agent::controller$bound_port,
ClusterController::connect_retry); Management::connect_retry);
} }
else else
{ {
@ -380,5 +379,5 @@ event zeek_init()
Broker::listen(cat(epi$network$address), epi$network$bound_port); Broker::listen(cat(epi$network$address), epi$network$bound_port);
} }
ClusterController::Log::info("agent is live"); Management::Log::info("agent is live");
} }

View file

@ -0,0 +1,20 @@
##! Management framework configuration settings common to agent and controller.
##! This does not include config settings that exist in both agent and
##! controller but that they set differently, since setting defaults here would
##! be awkward or pointless (since both node types would overwrite them
##! anyway). For role-specific settings, see management/controller/config.zeek
##! and management/agent/config.zeek.
module Management;
export {
## The fallback listen address if more specific adddresses, such as
## the controller's :zeek:see:`Management::Controller::listen_address`
## remains empty. Unless redefined, this uses Broker's own default
## listen address.
const default_address = Broker::default_listen_address &redef;
## The retry interval for Broker connnects. Defaults to a more
## aggressive value compared to Broker's 30s.
const connect_retry = 1sec &redef;
}

View file

@ -0,0 +1,4 @@
##! The entry point for the Management framework's cluster controller. It runs
##! bootstrap logic for launching the controller process via Zeek's Supervisor.
@load ./boot

View file

@ -3,9 +3,9 @@
##! corresponding response event. Such event pairs share the same name prefix ##! corresponding response event. Such event pairs share the same name prefix
##! and end in "_request" and "_response", respectively. ##! and end in "_request" and "_response", respectively.
@load ./types @load policy/frameworks/management/types
module ClusterController::API; module Management::Controller::API;
export { export {
## A simple versioning scheme, used to track basic compatibility of ## A simple versioning scheme, used to track basic compatibility of
@ -26,10 +26,10 @@ export {
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: the result record. Its data member is a ## result: the result record. Its data member is a
## :zeek:see:`ClusterController::Types::Instance` record. ## :zeek:see:`Management::Instance` record.
## ##
global get_instances_response: event(reqid: string, global get_instances_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
## zeek-client sends this event to establish a new cluster configuration, ## zeek-client sends this event to establish a new cluster configuration,
@ -39,26 +39,26 @@ export {
## ##
## reqid: a request identifier string, echoed in the response event. ## reqid: a request identifier string, echoed in the response event.
## ##
## config: a :zeek:see:`ClusterController::Types::Configuration` record ## config: a :zeek:see:`Management::Configuration` record
## specifying the cluster configuration. ## specifying the cluster configuration.
## ##
global set_configuration_request: event(reqid: string, global set_configuration_request: event(reqid: string,
config: ClusterController::Types::Configuration); config: Management::Configuration);
## Response to a set_configuration_request event. The controller sends ## Response to a set_configuration_request event. The controller sends
## this back to the client. ## this back to the client.
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a vector of :zeek:see:`ClusterController::Types::Result` records. ## result: a vector of :zeek:see:`Management::Result` records.
## Each member captures one agent's response. ## Each member captures one agent's response.
## ##
global set_configuration_response: event(reqid: string, global set_configuration_response: event(reqid: string,
result: ClusterController::Types::ResultVec); result: Management::ResultVec);
## zeek-client sends this event to request a list of ## zeek-client sends this event to request a list of
## :zeek:see:`ClusterController::Types::NodeStatus` records that capture ## :zeek:see:`Management::NodeStatus` records that capture
## the status of Supervisor-managed nodes running on the cluster's ## the status of Supervisor-managed nodes running on the cluster's
## instances. ## instances.
## ##
@ -71,13 +71,13 @@ export {
## ##
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
## result: a :zeek:type`vector` of :zeek:see:`ClusterController::Types::Result` ## result: a :zeek:type`vector` of :zeek:see:`Management::Result`
## records. Each record covers one cluster instance. Each record's data ## records. Each record covers one cluster instance. Each record's data
## member is a vector of :zeek:see:`ClusterController::Types::NodeStatus` ## member is a vector of :zeek:see:`Management::NodeStatus`
## records, covering the nodes at that instance. Results may also indicate ## records, covering the nodes at that instance. Results may also indicate
## failure, with error messages indicating what went wrong. ## failure, with error messages indicating what went wrong.
global get_nodes_response: event(reqid: string, global get_nodes_response: event(reqid: string,
result: ClusterController::Types::ResultVec); result: Management::ResultVec);
# Testing events. These don't provide operational value but expose # Testing events. These don't provide operational value but expose
@ -102,7 +102,7 @@ export {
## reqid: the request identifier used in the request event. ## reqid: the request identifier used in the request event.
## ##
global test_timeout_response: event(reqid: string, global test_timeout_response: event(reqid: string,
result: ClusterController::Types::Result); result: Management::Result);
# Notification events, agent -> controller # Notification events, agent -> controller

View file

@ -0,0 +1,36 @@
##! The cluster controller's boot logic runs in Zeek's supervisor and instructs
##! it to launch the Management controller process. The controller's main logic
##! resides in main.zeek, similarly to other frameworks. The new process will
##! execute that script.
##!
##! If the current process is not the Zeek supervisor, this does nothing.
@load ./config
event zeek_init()
{
if ( ! Supervisor::is_supervisor() )
return;
local epi = Management::Controller::endpoint_info();
local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T,
$scripts=vector("policy/frameworks/management/controller/main.zeek"));
if ( Management::Controller::directory != "" )
sn$directory = Management::Controller::directory;
if ( Management::Controller::stdout_file != "" )
sn$stdout_file = Management::Controller::stdout_file;
if ( Management::Controller::stderr_file != "" )
sn$stderr_file = Management::Controller::stderr_file;
# This helps Zeek run controller and agent with a minimal set of scripts.
sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "CONTROLLER";
local res = Supervisor::create(sn);
if ( res != "" )
{
print(fmt("error: supervisor could not create controller node: %s", res));
exit(1);
}
}

View file

@ -0,0 +1,90 @@
##! Configuration settings for the cluster controller.
@load policy/frameworks/management/config
@load policy/frameworks/management/types
module Management::Controller;
export {
## The name of this controller. Defaults to the value of the
## ZEEK_CONTROLLER_NAME environment variable. When that is unset and the
## user doesn't redef the value, the implementation defaults to
## "controller-<hostname>".
const name = getenv("ZEEK_CONTROLLER_NAME") &redef;
## The controller's stdout log name. If the string is non-empty, Zeek will
## produce a free-form log (i.e., not one governed by Zeek's logging
## framework) in Zeek's working directory. If left empty, no such log
## results.
##
## Note that the controller also establishes a "proper" Zeek log via the
## :zeek:see:`Management::Log` module.
const stdout_file = "controller.stdout" &redef;
## The controller's stderr log name. Like :zeek:see:`Management::Controller::stdout_file`,
## but for the stderr stream.
const stderr_file = "controller.stderr" &redef;
## The network address the controller listens on. By default this uses
## the value of the ZEEK_CONTROLLER_ADDR environment variable, but you
## may also redef to a specific value. When empty, the implementation
## falls back to :zeek:see:`Management::default_address`.
const listen_address = getenv("ZEEK_CONTROLLER_ADDR") &redef;
## The network port the controller listens on. Counterpart to
## :zeek:see:`Management::Controller::listen_address`, defaulting to the
## ZEEK_CONTROLLER_PORT environment variable.
const listen_port = getenv("ZEEK_CONTROLLER_PORT") &redef;
## The fallback listen port if :zeek:see:`Management::Controller::listen_port`
## remains empty.
const default_port = 2150/tcp &redef;
## The controller's Broker topic. Clients send requests to this topic.
const topic = "zeek/management/controller" &redef;
## An optional custom output directory for stdout/stderr. Agent and
## controller currently only log locally, not via the data cluster's
## logger node. This means that if both write to the same log file,
## output gets garbled.
const directory = "" &redef;
## Returns a :zeek:see:`Broker::NetworkInfo` record describing the controller.
global network_info: function(): Broker::NetworkInfo;
## Returns a :zeek:see:`Broker::EndpointInfo` record describing the controller.
global endpoint_info: function(): Broker::EndpointInfo;
}
function network_info(): Broker::NetworkInfo
{
local ni: Broker::NetworkInfo;
if ( Management::Controller::listen_address != "" )
ni$address = Management::Controller::listen_address;
else if ( Management::default_address != "" )
ni$address = Management::default_address;
else
ni$address = "127.0.0.1";
if ( Management::Controller::listen_port != "" )
ni$bound_port = to_port(Management::Controller::listen_port);
else
ni$bound_port = Management::Controller::default_port;
return ni;
}
function endpoint_info(): Broker::EndpointInfo
{
local epi: Broker::EndpointInfo;
if ( Management::Controller::name != "" )
epi$id = Management::Controller::name;
else
epi$id = fmt("controller-%s", gethostname());
epi$network = network_info();
return epi;
}

View file

@ -1,37 +1,36 @@
##! This is the main "runtime" of the cluster controller. Zeek does not load ##! This is the main "runtime" of the Management framework's controller. Zeek
##! this directly; rather, the controller's bootstrapping module (in ./boot.zeek) ##! does not load this directly; rather, the controller's bootstrapping module
##! specifies it as the script to run in the node newly created via Zeek's ##! (in ./boot.zeek) specifies it as the script to run in the node newly created
##! supervisor. ##! by the supervisor.
@load base/frameworks/broker @load base/frameworks/broker
@load policy/frameworks/cluster/agent/config @load policy/frameworks/management
@load policy/frameworks/cluster/agent/api @load policy/frameworks/management/agent/config # For the agent topic prefix
@load policy/frameworks/management/agent/api
@load ./api @load ./api
@load ./log @load ./config
@load ./request
@load ./util
module ClusterController::Runtime; module Management::Controller::Runtime;
# This export is mainly to appease Zeekygen's need to understand redefs of the # This export is mainly to appease Zeekygen's need to understand redefs of the
# Request record below. Without it, it fails to establish link targets for the # Request record below. Without it, it fails to establish link targets for the
# tucked-on types. # tucked-on types.
export { export {
## Request state specific to ## Request state specific to
## :zeek:see:`ClusterController::API::set_configuration_request` and ## :zeek:see:`Management::Controller::API::set_configuration_request` and
## :zeek:see:`ClusterController::API::set_configuration_response`. ## :zeek:see:`Management::Controller::API::set_configuration_response`.
type SetConfigurationState: record { type SetConfigurationState: record {
## The cluster configuration established with this request ## The cluster configuration established with this request
config: ClusterController::Types::Configuration; config: Management::Configuration;
## Request state for every controller/agent transaction. ## Request state for every controller/agent transaction.
requests: set[string] &default=set(); requests: set[string] &default=set();
}; };
## Request state specific to ## Request state specific to
## :zeek:see:`ClusterController::API::get_nodes_request` and ## :zeek:see:`Management::Controller::API::get_nodes_request` and
## :zeek:see:`ClusterController::API::get_nodes_response`. ## :zeek:see:`Management::Controller::API::get_nodes_response`.
type GetNodesState: record { type GetNodesState: record {
## Request state for every controller/agent transaction. ## Request state for every controller/agent transaction.
requests: set[string] &default=set(); requests: set[string] &default=set();
@ -41,33 +40,34 @@ export {
type TestState: record { }; type TestState: record { };
} }
redef record ClusterController::Request::Request += { redef record Management::Request::Request += {
set_configuration_state: SetConfigurationState &optional; set_configuration_state: SetConfigurationState &optional;
get_nodes_state: GetNodesState &optional; get_nodes_state: GetNodesState &optional;
test_state: TestState &optional; test_state: TestState &optional;
}; };
redef ClusterController::role = ClusterController::Types::CONTROLLER; # Tag our logs correctly
redef Management::Log::role = Management::CONTROLLER;
global check_instances_ready: function(); global check_instances_ready: function();
global add_instance: function(inst: ClusterController::Types::Instance); global add_instance: function(inst: Management::Instance);
global drop_instance: function(inst: ClusterController::Types::Instance); global drop_instance: function(inst: Management::Instance);
global null_config: function(): ClusterController::Types::Configuration; global null_config: function(): Management::Configuration;
global is_null_config: function(config: ClusterController::Types::Configuration): bool; global is_null_config: function(config: Management::Configuration): bool;
# Checks whether the given instance is one that we know with different # Checks whether the given instance is one that we know with different
# communication settings: a a different peering direction, a different listening # communication settings: a a different peering direction, a different listening
# port, etc. Used as a predicate to indicate when we need to drop the existing # port, etc. Used as a predicate to indicate when we need to drop the existing
# one from our internal state. # one from our internal state.
global is_instance_connectivity_change: function global is_instance_connectivity_change: function
(inst: ClusterController::Types::Instance): bool; (inst: Management::Instance): bool;
# The set of agents the controller interacts with to manage to currently # The set of agents the controller interacts with to manage to currently
# configured cluster. This may be a subset of all the agents known to the # configured cluster. This may be a subset of all the agents known to the
# controller, as tracked by the g_instances_known set. They key is the instance # controller, as tracked by the g_instances_known set. They key is the instance
# name and should match the $name member of the corresponding instance record. # name and should match the $name member of the corresponding instance record.
global g_instances: table[string] of ClusterController::Types::Instance = table(); global g_instances: table[string] of Management::Instance = table();
# The set of instances that have checked in with the controller. This is a # The set of instances that have checked in with the controller. This is a
# superset of g_instances, since it covers any agent that has sent us a # superset of g_instances, since it covers any agent that has sent us a
@ -87,18 +87,18 @@ global g_config_reqid_pending: string = "";
# The most recent configuration we have successfully deployed. This is also # The most recent configuration we have successfully deployed. This is also
# the one we send whenever the client requests it. # the one we send whenever the client requests it.
global g_config_current: ClusterController::Types::Configuration; global g_config_current: Management::Configuration;
function send_config_to_agents(req: ClusterController::Request::Request, function send_config_to_agents(req: Management::Request::Request,
config: ClusterController::Types::Configuration) config: Management::Configuration)
{ {
for ( name in g_instances ) for ( name in g_instances )
{ {
if ( name !in g_instances_ready ) if ( name !in g_instances_ready )
next; next;
local agent_topic = ClusterAgent::topic_prefix + "/" + name; local agent_topic = Management::Agent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create(); local areq = Management::Request::create();
areq$parent_id = req$id; areq$parent_id = req$id;
# We track the requests sent off to each agent. As the # We track the requests sent off to each agent. As the
@ -108,8 +108,8 @@ function send_config_to_agents(req: ClusterController::Request::Request,
# We could also broadcast just once on the agent prefix, but # We could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner. # explicit request/response pairs for each agent seems cleaner.
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", areq$id, name)); Management::Log::info(fmt("tx Management::Agent::API::set_configuration_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config); Broker::publish(agent_topic, Management::Agent::API::set_configuration_request, areq$id, config);
} }
} }
@ -123,16 +123,16 @@ function check_instances_ready()
add cur_instances[inst]; add cur_instances[inst];
if ( cur_instances == g_instances_ready ) if ( cur_instances == g_instances_ready )
event ClusterController::API::notify_agents_ready(cur_instances); event Management::Controller::API::notify_agents_ready(cur_instances);
} }
function add_instance(inst: ClusterController::Types::Instance) function add_instance(inst: Management::Instance)
{ {
g_instances[inst$name] = inst; g_instances[inst$name] = inst;
if ( inst?$listen_port ) if ( inst?$listen_port )
Broker::peer(cat(inst$host), inst$listen_port, Broker::peer(cat(inst$host), inst$listen_port,
ClusterController::connect_retry); Management::connect_retry);
if ( inst$name in g_instances_known ) if ( inst$name in g_instances_known )
{ {
@ -141,23 +141,23 @@ function add_instance(inst: ClusterController::Types::Instance)
# the set of ready instances and proceed as feasible with config # the set of ready instances and proceed as feasible with config
# deployments. # deployments.
local req = ClusterController::Request::create(); local req = Management::Request::create();
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", inst$name)); Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", inst$name));
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, Broker::publish(Management::Agent::topic_prefix + "/" + inst$name,
ClusterAgent::API::agent_welcome_request, req$id); Management::Agent::API::agent_welcome_request, req$id);
} }
} }
function drop_instance(inst: ClusterController::Types::Instance) function drop_instance(inst: Management::Instance)
{ {
if ( inst$name !in g_instances ) if ( inst$name !in g_instances )
return; return;
# Send the agent a standby so it shuts down its cluster nodes & state # Send the agent a standby so it shuts down its cluster nodes & state
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_request to %s", inst$name)); Management::Log::info(fmt("tx Management::Agent::API::agent_standby_request to %s", inst$name));
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, Broker::publish(Management::Agent::topic_prefix + "/" + inst$name,
ClusterAgent::API::agent_standby_request, ""); Management::Agent::API::agent_standby_request, "");
delete g_instances[inst$name]; delete g_instances[inst$name];
@ -167,20 +167,20 @@ function drop_instance(inst: ClusterController::Types::Instance)
# The agent remains in g_instances_known, to track that we're able # The agent remains in g_instances_known, to track that we're able
# to communicate with it in case it's required again. # to communicate with it in case it's required again.
ClusterController::Log::info(fmt("dropped instance %s", inst$name)); Management::Log::info(fmt("dropped instance %s", inst$name));
} }
function null_config(): ClusterController::Types::Configuration function null_config(): Management::Configuration
{ {
return ClusterController::Types::Configuration($id=""); return Management::Configuration($id="");
} }
function is_null_config(config: ClusterController::Types::Configuration): bool function is_null_config(config: Management::Configuration): bool
{ {
return config$id == ""; return config$id == "";
} }
function is_instance_connectivity_change(inst: ClusterController::Types::Instance): bool function is_instance_connectivity_change(inst: Management::Instance): bool
{ {
# If we're not tracking this instance as part of a cluster config, it's # If we're not tracking this instance as part of a cluster config, it's
# not a change. (More precisely: we cannot say whether it's changed.) # not a change. (More precisely: we cannot say whether it's changed.)
@ -206,17 +206,18 @@ function is_instance_connectivity_change(inst: ClusterController::Types::Instanc
return F; return F;
} }
event ClusterController::API::notify_agents_ready(instances: set[string]) event Management::Controller::API::notify_agents_ready(instances: set[string])
{ {
local insts = ClusterController::Util::set_to_vector(instances); local insts = Management::Util::set_to_vector(instances);
ClusterController::Log::info(fmt("rx ClusterController::API:notify_agents_ready %s", join_string_vec(insts, ","))); Management::Log::info(fmt("rx Management::Controller::API:notify_agents_ready %s",
join_string_vec(insts, ",")));
local req = ClusterController::Request::lookup(g_config_reqid_pending); local req = Management::Request::lookup(g_config_reqid_pending);
# If there's no pending request, when it's no longer available, or it # If there's no pending request, when it's no longer available, or it
# doesn't have config state, don't do anything else. # doesn't have config state, don't do anything else.
if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state ) if ( Management::Request::is_null(req) || ! req?$set_configuration_state )
return; return;
# All instances requested in the pending configuration update are now # All instances requested in the pending configuration update are now
@ -226,15 +227,15 @@ event ClusterController::API::notify_agents_ready(instances: set[string])
send_config_to_agents(req, req$set_configuration_state$config); send_config_to_agents(req, req$set_configuration_state$config);
} }
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) event Management::Agent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_agent_hello %s %s", instance, host)); Management::Log::info(fmt("rx Management::Agent::API::notify_agent_hello %s %s", instance, host));
# When an agent checks in with a mismatching API version, we log the # When an agent checks in with a mismatching API version, we log the
# fact and drop its state, if any. # fact and drop its state, if any.
if ( api_version != ClusterController::API::version ) if ( api_version != Management::Controller::API::version )
{ {
ClusterController::Log::warning( Management::Log::warning(
fmt("instance %s/%s has checked in with incompatible API version %s", fmt("instance %s/%s has checked in with incompatible API version %s",
instance, host, api_version)); instance, host, api_version));
@ -252,75 +253,75 @@ event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_ve
{ {
# We need this instance for our cluster and have full context for # We need this instance for our cluster and have full context for
# it from the configuration. Tell agent. # it from the configuration. Tell agent.
local req = ClusterController::Request::create(); local req = Management::Request::create();
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", instance)); Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance));
Broker::publish(ClusterAgent::topic_prefix + "/" + instance, Broker::publish(Management::Agent::topic_prefix + "/" + instance,
ClusterAgent::API::agent_welcome_request, req$id); Management::Agent::API::agent_welcome_request, req$id);
} }
} }
event ClusterAgent::API::agent_welcome_response(reqid: string, result: ClusterController::Types::Result) event Management::Agent::API::agent_welcome_response(reqid: string, result: Management::Result)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_response %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::agent_welcome_response %s", reqid));
local req = ClusterController::Request::lookup(reqid); local req = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
ClusterController::Request::finish(req$id); Management::Request::finish(req$id);
# An agent we've been waiting to hear back from is ready for cluster # An agent we've been waiting to hear back from is ready for cluster
# work. Double-check we still want it, otherwise drop it. # work. Double-check we still want it, otherwise drop it.
if ( ! result$success || result$instance !in g_instances ) if ( ! result$success || result$instance !in g_instances )
{ {
ClusterController::Log::info(fmt( Management::Log::info(fmt(
"tx ClusterAgent::API::agent_standby_request to %s", result$instance)); "tx Management::Agent::API::agent_standby_request to %s", result$instance));
Broker::publish(ClusterAgent::topic_prefix + "/" + result$instance, Broker::publish(Management::Agent::topic_prefix + "/" + result$instance,
ClusterAgent::API::agent_standby_request, ""); Management::Agent::API::agent_standby_request, "");
return; return;
} }
add g_instances_ready[result$instance]; add g_instances_ready[result$instance];
ClusterController::Log::info(fmt("instance %s ready", result$instance)); Management::Log::info(fmt("instance %s ready", result$instance));
check_instances_ready(); check_instances_ready();
} }
event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node, event Management::Agent::API::notify_change(instance: string, n: Management::Node,
old: ClusterController::Types::State, old: Management::State,
new: ClusterController::Types::State) new: Management::State)
{ {
# XXX TODO # XXX TODO
} }
event ClusterAgent::API::notify_error(instance: string, msg: string, node: string) event Management::Agent::API::notify_error(instance: string, msg: string, node: string)
{ {
# XXX TODO # XXX TODO
} }
event ClusterAgent::API::notify_log(instance: string, msg: string, node: string) event Management::Agent::API::notify_log(instance: string, msg: string, node: string)
{ {
# XXX TODO # XXX TODO
} }
event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result) event Management::Agent::API::set_configuration_response(reqid: string, result: Management::Result)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::set_configuration_response %s", reqid));
# Retrieve state for the request we just got a response to # Retrieve state for the request we just got a response to
local areq = ClusterController::Request::lookup(reqid); local areq = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(areq) ) if ( Management::Request::is_null(areq) )
return; return;
# Release the request, which is now done. # Release the request, which is now done.
ClusterController::Request::finish(areq$id); Management::Request::finish(areq$id);
# Find the original request from the client # Find the original request from the client
local req = ClusterController::Request::lookup(areq$parent_id); local req = Management::Request::lookup(areq$parent_id);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
# Add this result to the overall response # Add this result to the overall response
@ -342,33 +343,33 @@ event ClusterAgent::API::set_configuration_response(reqid: string, result: Clust
g_config_current = req$set_configuration_state$config; g_config_current = req$set_configuration_state$config;
g_config_reqid_pending = ""; g_config_reqid_pending = "";
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
ClusterController::Request::to_string(req))); Management::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results); event Management::Controller::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id); Management::Request::finish(req$id);
} }
event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) event Management::Controller::API::set_configuration_request(reqid: string, config: Management::Configuration)
{ {
ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::set_configuration_request %s", reqid));
local res: ClusterController::Types::Result; local res: Management::Result;
local req = ClusterController::Request::create(reqid); local req = Management::Request::create(reqid);
req$set_configuration_state = SetConfigurationState($config = config); req$set_configuration_state = SetConfigurationState($config = config);
# At the moment there can only be one pending request. # At the moment there can only be one pending request.
if ( g_config_reqid_pending != "" ) if ( g_config_reqid_pending != "" )
{ {
res = ClusterController::Types::Result($reqid=reqid); res = Management::Result($reqid=reqid);
res$success = F; res$success = F;
res$error = fmt("request %s still pending", g_config_reqid_pending); res$error = fmt("request %s still pending", g_config_reqid_pending);
req$results += res; req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
ClusterController::Request::to_string(req))); Management::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results); event Management::Controller::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id); Management::Request::finish(req$id);
return; return;
} }
@ -408,11 +409,11 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
local insts_to_keep: set[string]; local insts_to_keep: set[string];
# Alternative representation of insts_to_add, directly providing the instances. # Alternative representation of insts_to_add, directly providing the instances.
local insts_to_peer: table[string] of ClusterController::Types::Instance; local insts_to_peer: table[string] of Management::Instance;
# Helpful locals. # Helpful locals.
local inst_name: string; local inst_name: string;
local inst: ClusterController::Types::Instance; local inst: Management::Instance;
for ( inst_name in g_instances ) for ( inst_name in g_instances )
add insts_current[inst_name]; add insts_current[inst_name];
@ -459,37 +460,37 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
check_instances_ready(); check_instances_ready();
} }
event ClusterController::API::get_instances_request(reqid: string) event Management::Controller::API::get_instances_request(reqid: string)
{ {
ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::set_instances_request %s", reqid));
local res = ClusterController::Types::Result($reqid = reqid); local res = Management::Result($reqid = reqid);
local insts: vector of ClusterController::Types::Instance; local insts: vector of Management::Instance;
for ( i in g_instances ) for ( i in g_instances )
insts += g_instances[i]; insts += g_instances[i];
res$data = insts; res$data = insts;
ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_instances_response %s", reqid));
event ClusterController::API::get_instances_response(reqid, res); event Management::Controller::API::get_instances_response(reqid, res);
} }
event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterController::Types::Result) event Management::Agent::API::get_nodes_response(reqid: string, result: Management::Result)
{ {
ClusterController::Log::info(fmt("rx ClusterAgent::API::get_nodes_response %s", reqid)); Management::Log::info(fmt("rx Management::Agent::API::get_nodes_response %s", reqid));
# Retrieve state for the request we just got a response to # Retrieve state for the request we just got a response to
local areq = ClusterController::Request::lookup(reqid); local areq = Management::Request::lookup(reqid);
if ( ClusterController::Request::is_null(areq) ) if ( Management::Request::is_null(areq) )
return; return;
# Release the request, which is now done. # Release the request, which is now done.
ClusterController::Request::finish(areq$id); Management::Request::finish(areq$id);
# Find the original request from the client # Find the original request from the client
local req = ClusterController::Request::lookup(areq$parent_id); local req = Management::Request::lookup(areq$parent_id);
if ( ClusterController::Request::is_null(req) ) if ( Management::Request::is_null(req) )
return; return;
# Zeek's ingestion of an any-typed val via Broker yields an opaque # Zeek's ingestion of an any-typed val via Broker yields an opaque
@ -498,7 +499,7 @@ event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterContro
# whether the val is of the actual, intended (any-)type or a Broker # whether the val is of the actual, intended (any-)type or a Broker
# DataVal wrapper, we explicitly cast it back to our intended Zeek # DataVal wrapper, we explicitly cast it back to our intended Zeek
# type. This test case demonstrates: broker.remote_event_vector_any # type. This test case demonstrates: broker.remote_event_vector_any
result$data = result$data as ClusterController::Types::NodeStatusVec; result$data = result$data as Management::NodeStatusVec;
# Add this result to the overall response # Add this result to the overall response
req$results[|req$results|] = result; req$results[|req$results|] = result;
@ -514,27 +515,27 @@ event ClusterAgent::API::get_nodes_response(reqid: string, result: ClusterContro
if ( |req$get_nodes_state$requests| > 0 ) if ( |req$get_nodes_state$requests| > 0 )
return; return;
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
ClusterController::Request::to_string(req))); Management::Request::to_string(req)));
event ClusterController::API::get_nodes_response(req$id, req$results); event Management::Controller::API::get_nodes_response(req$id, req$results);
ClusterController::Request::finish(req$id); Management::Request::finish(req$id);
} }
event ClusterController::API::get_nodes_request(reqid: string) event Management::Controller::API::get_nodes_request(reqid: string)
{ {
ClusterController::Log::info(fmt("rx ClusterController::API::get_nodes_request %s", reqid)); Management::Log::info(fmt("rx Management::Controller::API::get_nodes_request %s", reqid));
# Special case: if we have no instances, respond right away. # Special case: if we have no instances, respond right away.
if ( |g_instances| == 0 ) if ( |g_instances| == 0 )
{ {
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", reqid)); Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid));
event ClusterController::API::get_nodes_response(reqid, vector( event Management::Controller::API::get_nodes_response(reqid, vector(
ClusterController::Types::Result($reqid=reqid, $success=F, Management::Result($reqid=reqid, $success=F,
$error="no instances connected"))); $error="no instances connected")));
return; return;
} }
local req = ClusterController::Request::create(reqid); local req = Management::Request::create(reqid);
req$get_nodes_state = GetNodesState(); req$get_nodes_state = GetNodesState();
for ( name in g_instances ) for ( name in g_instances )
@ -542,72 +543,72 @@ event ClusterController::API::get_nodes_request(reqid: string)
if ( name !in g_instances_ready ) if ( name !in g_instances_ready )
next; next;
local agent_topic = ClusterAgent::topic_prefix + "/" + name; local agent_topic = Management::Agent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create(); local areq = Management::Request::create();
areq$parent_id = req$id; areq$parent_id = req$id;
add req$get_nodes_state$requests[areq$id]; add req$get_nodes_state$requests[areq$id];
ClusterController::Log::info(fmt("tx ClusterAgent::API::get_nodes_request %s to %s", areq$id, name)); Management::Log::info(fmt("tx Management::Agent::API::get_nodes_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::get_nodes_request, areq$id); Broker::publish(agent_topic, Management::Agent::API::get_nodes_request, areq$id);
} }
} }
event ClusterController::Request::request_expired(req: ClusterController::Request::Request) event Management::Request::request_expired(req: Management::Request::Request)
{ {
# Various handlers for timed-out request state. We use the state members # Various handlers for timed-out request state. We use the state members
# to identify how to respond. No need to clean up the request itself, # to identify how to respond. No need to clean up the request itself,
# since we're getting here via the request module's expiration # since we're getting here via the request module's expiration
# mechanism that handles the cleanup. # mechanism that handles the cleanup.
local res: ClusterController::Types::Result; local res: Management::Result;
if ( req?$set_configuration_state ) if ( req?$set_configuration_state )
{ {
# This timeout means we no longer have a pending request. # This timeout means we no longer have a pending request.
g_config_reqid_pending = ""; g_config_reqid_pending = "";
res = ClusterController::Types::Result($reqid=req$id); res = Management::Result($reqid=req$id);
res$success = F; res$success = F;
res$error = "request timed out"; res$error = "request timed out";
req$results += res; req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
ClusterController::Request::to_string(req))); Management::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results); event Management::Controller::API::set_configuration_response(req$id, req$results);
} }
if ( req?$get_nodes_state ) if ( req?$get_nodes_state )
{ {
res = ClusterController::Types::Result($reqid=req$id); res = Management::Result($reqid=req$id);
res$success = F; res$success = F;
res$error = "request timed out"; res$error = "request timed out";
req$results += res; req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::get_nodes_response %s", Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s",
ClusterController::Request::to_string(req))); Management::Request::to_string(req)));
event ClusterController::API::get_nodes_response(req$id, req$results); event Management::Controller::API::get_nodes_response(req$id, req$results);
} }
if ( req?$test_state ) if ( req?$test_state )
{ {
res = ClusterController::Types::Result($reqid=req$id); res = Management::Result($reqid=req$id);
res$success = F; res$success = F;
res$error = "request timed out"; res$error = "request timed out";
ClusterController::Log::info(fmt("tx ClusterController::API::test_timeout_response %s", req$id)); Management::Log::info(fmt("tx Management::Controller::API::test_timeout_response %s", req$id));
event ClusterController::API::test_timeout_response(req$id, res); event Management::Controller::API::test_timeout_response(req$id, res);
} }
} }
event ClusterController::API::test_timeout_request(reqid: string, with_state: bool) event Management::Controller::API::test_timeout_request(reqid: string, with_state: bool)
{ {
ClusterController::Log::info(fmt("rx ClusterController::API::test_timeout_request %s %s", reqid, with_state)); Management::Log::info(fmt("rx Management::Controller::API::test_timeout_request %s %s", reqid, with_state));
if ( with_state ) if ( with_state )
{ {
# This state times out and triggers a timeout response in the # This state times out and triggers a timeout response in the
# above request_expired event handler. # above request_expired event handler.
local req = ClusterController::Request::create(reqid); local req = Management::Request::create(reqid);
req$test_state = TestState(); req$test_state = TestState();
} }
} }
@ -625,23 +626,23 @@ event zeek_init()
# connectivity to agents: agents are defined and communicated with as # connectivity to agents: agents are defined and communicated with as
# defined via configurations defined by the client. # defined via configurations defined by the client.
local cni = ClusterController::network_info(); local cni = Management::Controller::network_info();
Broker::listen(cat(cni$address), cni$bound_port); Broker::listen(cat(cni$address), cni$bound_port);
Broker::subscribe(ClusterAgent::topic_prefix); Broker::subscribe(Management::Agent::topic_prefix);
Broker::subscribe(ClusterController::topic); Broker::subscribe(Management::Controller::topic);
# Events sent to the client: # Events sent to the client:
local events: vector of any = [ local events: vector of any = [
ClusterController::API::get_instances_response, Management::Controller::API::get_instances_response,
ClusterController::API::set_configuration_response, Management::Controller::API::set_configuration_response,
ClusterController::API::get_nodes_response, Management::Controller::API::get_nodes_response,
ClusterController::API::test_timeout_response Management::Controller::API::test_timeout_response
]; ];
for ( i in events ) for ( i in events )
Broker::auto_publish(ClusterController::topic, events[i]); Broker::auto_publish(Management::Controller::topic, events[i]);
ClusterController::Log::info("controller is live"); Management::Log::info("controller is live");
} }

View file

@ -1,11 +1,11 @@
##! This module implements straightforward logging abilities for cluster ##! This module implements logging abilities for controller and agent. It uses
##! controller and agent. It uses Zeek's logging framework, and works only for ##! Zeek's logging framework and works only for nodes managed by the
##! nodes managed by the supervisor. In this setting Zeek's logging framework ##! supervisor. In this setting Zeek's logging framework operates locally, i.e.,
##! operates locally, i.e., this logging does not involve any logger nodes. ##! this does not involve logger nodes.
@load ./config @load ./types
module ClusterController::Log; module Management::Log;
export { export {
## The cluster logging stream identifier. ## The cluster logging stream identifier.
@ -30,7 +30,7 @@ export {
node: string; node: string;
## Log level of this message, converted from the above Level enum ## Log level of this message, converted from the above Level enum
level: string; level: string;
## The role of the node, translated from ClusterController::Types::Role. ## The role of the node, translated from Management::Role.
role: string; role: string;
## A message indicating information about cluster controller operation. ## A message indicating information about cluster controller operation.
message: string; message: string;
@ -63,6 +63,10 @@ export {
## message: the message to log. ## message: the message to log.
## ##
global error: function(message: string); global error: function(message: string);
## The role of this process in cluster management. Agent and controller
## both redefine this, and we use it during logging.
const role = Management::NONE &redef;
} }
# Enum translations to strings. This avoids those enums being reported # Enum translations to strings. This avoids those enums being reported
@ -75,9 +79,9 @@ global l2s: table[Level] of string = {
[ERROR] = "ERROR", [ERROR] = "ERROR",
}; };
global r2s: table[ClusterController::Types::Role] of string = { global r2s: table[Management::Role] of string = {
[ClusterController::Types::AGENT] = "AGENT", [Management::AGENT] = "AGENT",
[ClusterController::Types::CONTROLLER] = "CONTROLLER", [Management::CONTROLLER] = "CONTROLLER",
}; };
function debug(message: string) function debug(message: string)
@ -87,7 +91,7 @@ function debug(message: string)
local node = Supervisor::node(); local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[DEBUG], Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[DEBUG],
$role=r2s[ClusterController::role], $message=message]); $role=r2s[role], $message=message]);
} }
function info(message: string) function info(message: string)
@ -97,7 +101,7 @@ function info(message: string)
local node = Supervisor::node(); local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[INFO], Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[INFO],
$role=r2s[ClusterController::role], $message=message]); $role=r2s[role], $message=message]);
} }
function warning(message: string) function warning(message: string)
@ -107,7 +111,7 @@ function warning(message: string)
local node = Supervisor::node(); local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[WARNING], Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[WARNING],
$role=r2s[ClusterController::role], $message=message]); $role=r2s[role], $message=message]);
} }
function error(message: string) function error(message: string)
@ -117,7 +121,7 @@ function error(message: string)
local node = Supervisor::node(); local node = Supervisor::node();
Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[ERROR], Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[ERROR],
$role=r2s[ClusterController::role], $message=message]); $role=r2s[role], $message=message]);
} }
event zeek_init() event zeek_init()
@ -133,5 +137,5 @@ event zeek_init()
local stream = Log::Stream($columns=Info, $path=fmt("cluster-%s", node$name), local stream = Log::Stream($columns=Info, $path=fmt("cluster-%s", node$name),
$policy=log_policy); $policy=log_policy);
Log::create_stream(ClusterController::Log::LOG, stream); Log::create_stream(Management::Log::LOG, stream);
} }

View file

@ -1,18 +1,18 @@
##! This module implements a request state abstraction that both cluster ##! This module implements a request state abstraction in the Management
##! controller and agent use to tie responses to received request events and be ##! framework that both controller and agent use to connect request events to
##! able to time-out such requests. ##! subsequent response ones, and to be able to time out such requests.
@load ./types
@load ./config @load ./config
@load ./types
module ClusterController::Request; module Management::Request;
export { export {
## Request records track state associated with a request/response event ## Request records track state associated with a request/response event
## pair. Calls to ## pair. Calls to
## :zeek:see:`ClusterController::Request::create` establish such state ## :zeek:see:`Management::Request::create` establish such state
## when an entity sends off a request event, while ## when an entity sends off a request event, while
## :zeek:see:`ClusterController::Request::finish` clears the state when ## :zeek:see:`Management::Request::finish` clears the state when
## a corresponding response event comes in, or the state times out. ## a corresponding response event comes in, or the state times out.
type Request: record { type Request: record {
## Each request has a hopfully unique ID provided by the requester. ## Each request has a hopfully unique ID provided by the requester.
@ -26,12 +26,19 @@ export {
## The results vector builds up the list of results we eventually ## The results vector builds up the list of results we eventually
## send to the requestor when we have processed the request. ## send to the requestor when we have processed the request.
results: ClusterController::Types::ResultVec &default=vector(); results: Management::ResultVec &default=vector();
## An internal flag to track whether a request is complete. ## An internal flag to track whether a request is complete.
finished: bool &default=F; finished: bool &default=F;
}; };
## The timeout for request state. Such state (see the :zeek:see:`Management::Request`
## module) ties together request and response event pairs. The timeout causes
## its cleanup in the absence of a timely response. It applies both to
## state kept for client requests, as well as state in the agents for
## requests to the supervisor.
const timeout_interval = 10sec &redef;
## A token request that serves as a null/nonexistant request. ## A token request that serves as a null/nonexistant request.
global null_req = Request($id="", $finished=T); global null_req = Request($id="", $finished=T);
@ -42,7 +49,7 @@ export {
global create: function(reqid: string &default=unique_id("")): Request; global create: function(reqid: string &default=unique_id("")): Request;
## This function looks up the request for a given request ID and returns ## This function looks up the request for a given request ID and returns
## it. When no such request exists, returns ClusterController::Request::null_req. ## it. When no such request exists, returns Management::Request::null_req.
## ##
## reqid: the ID of the request state to retrieve. ## reqid: the ID of the request state to retrieve.
## ##
@ -57,8 +64,8 @@ export {
global finish: function(reqid: string): bool; global finish: function(reqid: string): bool;
## This event fires when a request times out (as per the ## This event fires when a request times out (as per the
## ClusterController::request_timeout) before it has been finished via ## Management::Request::timeout_interval) before it has been finished via
## ClusterController::Request::finish(). ## Management::Request::finish().
## ##
## req: the request state that is expiring. ## req: the request state that is expiring.
## ##
@ -85,18 +92,17 @@ function requests_expire_func(reqs: table[string] of Request, reqid: string): in
# No need to flag request expiration when we've already internally marked # No need to flag request expiration when we've already internally marked
# the request as done. # the request as done.
if ( ! reqs[reqid]$finished ) if ( ! reqs[reqid]$finished )
event ClusterController::Request::request_expired(reqs[reqid]); event Management::Request::request_expired(reqs[reqid]);
return 0secs; return 0secs;
} }
# This is the global request-tracking table. The table maps from request ID # This is the global request-tracking table. The table maps from request ID
# strings to corresponding Request records. Entries time out after the # strings to corresponding Request records. Entries time out after the
# ClusterController::request_timeout interval. Upon expiration, a # Management::Request::timeout_interval. Upon expiration, a request_expired
# request_expired event triggers that conveys the request state. # event triggers that conveys the request state.
global g_requests: table[string] of Request global g_requests: table[string] of Request
&create_expire=ClusterController::request_timeout &create_expire=timeout_interval &expire_func=requests_expire_func;
&expire_func=requests_expire_func;
function create(reqid: string): Request function create(reqid: string): Request
{ {
@ -137,7 +143,7 @@ function is_null(request: Request): bool
function to_string(request: Request): string function to_string(request: Request): string
{ {
local results: string_vec; local results: string_vec;
local res: ClusterController::Types::Result; local res: Management::Result;
local parent_id = ""; local parent_id = "";
if ( request?$parent_id ) if ( request?$parent_id )
@ -146,7 +152,7 @@ function to_string(request: Request): string
for ( idx in request$results ) for ( idx in request$results )
{ {
res = request$results[idx]; res = request$results[idx];
results[|results|] = ClusterController::Types::result_to_string(res); results[|results|] = Management::result_to_string(res);
} }
return fmt("[request %s%s %s, results: %s]", request$id, parent_id, return fmt("[request %s%s %s, results: %s]", request$id, parent_id,

View file

@ -1,8 +1,8 @@
##! This module holds the basic types needed for the Cluster Controller ##! This module holds the basic types needed for the Management framework. These
##! framework. These are used by both agent and controller, and several ##! are used by both cluster agent and controller, and several have corresponding
##! have corresponding equals in the zeek-client implementation. ##! implementations in zeek-client.
module ClusterController::Types; module Management;
export { export {
## Management infrastructure node type. This intentionally does not ## Management infrastructure node type. This intentionally does not
@ -103,7 +103,7 @@ export {
type ResultVec: vector of Result; type ResultVec: vector of Result;
## Given a :zeek:see:`ClusterController::Types::Result` record, ## Given a :zeek:see:`Management::Result` record,
## this function returns a string summarizing it. ## this function returns a string summarizing it.
global result_to_string: function(res: Result): string; global result_to_string: function(res: Result): string;
} }

View file

@ -1,7 +1,7 @@
##! Utility functions for the cluster controller framework, available to agent ##! Utility functions for the Management framework, available to agent
##! and controller. ##! and controller.
module ClusterController::Util; module Management::Util;
export { export {
## Renders a set of strings to an alphabetically sorted vector. ## Renders a set of strings to an alphabetically sorted vector.

View file

@ -11,20 +11,22 @@
# @load frameworks/control/controllee.zeek # @load frameworks/control/controllee.zeek
# @load frameworks/control/controller.zeek # @load frameworks/control/controller.zeek
@load frameworks/cluster/agent/__load__.zeek @load frameworks/management/agent/__load__.zeek
@load frameworks/cluster/agent/api.zeek @load frameworks/management/agent/api.zeek
@load frameworks/cluster/agent/boot.zeek @load frameworks/management/agent/boot.zeek
@load frameworks/cluster/agent/config.zeek @load frameworks/management/agent/config.zeek
# @load frameworks/cluster/agent/main.zeek # @load frameworks/management/agent/main.zeek
@load frameworks/cluster/controller/__load__.zeek @load frameworks/management/controller/__load__.zeek
@load frameworks/cluster/controller/api.zeek @load frameworks/management/controller/api.zeek
@load frameworks/cluster/controller/boot.zeek @load frameworks/management/controller/boot.zeek
@load frameworks/cluster/controller/config.zeek @load frameworks/management/controller/config.zeek
@load frameworks/cluster/controller/log.zeek # @load frameworks/management/controller/main.zeek
# @load frameworks/cluster/controller/main.zeek @load frameworks/management/__load__.zeek
@load frameworks/cluster/controller/request.zeek @load frameworks/management/config.zeek
@load frameworks/cluster/controller/types.zeek @load frameworks/management/log.zeek
@load frameworks/cluster/controller/util.zeek @load frameworks/management/request.zeek
@load frameworks/management/types.zeek
@load frameworks/management/util.zeek
@load frameworks/dpd/detect-protocols.zeek @load frameworks/dpd/detect-protocols.zeek
@load frameworks/dpd/packet-segment-logging.zeek @load frameworks/dpd/packet-segment-logging.zeek
@load frameworks/intel/do_notice.zeek @load frameworks/intel/do_notice.zeek

View file

@ -4,8 +4,8 @@
@load protocols/ssl/notary.zeek @load protocols/ssl/notary.zeek
@load frameworks/control/controllee.zeek @load frameworks/control/controllee.zeek
@load frameworks/control/controller.zeek @load frameworks/control/controller.zeek
@load frameworks/cluster/agent/main.zeek @load frameworks/management/agent/main.zeek
@load frameworks/cluster/controller/main.zeek @load frameworks/management/controller/main.zeek
@load frameworks/files/extract-all-files.zeek @load frameworks/files/extract-all-files.zeek
@load policy/misc/dump-events.zeek @load policy/misc/dump-events.zeek
@load policy/protocols/conn/speculative-service.zeek @load policy/protocols/conn/speculative-service.zeek

View file

@ -2,8 +2,8 @@
### NOTE: This file has been sorted with diff-sort. ### NOTE: This file has been sorted with diff-sort.
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:12 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:12 "Remove in v5.1. Use log-certs-base64.zeek instead."
warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead."
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:59 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default")
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:59 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:61 ("Remove in v5.1. OCSP logging is now enabled by default")
warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default")
warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:4 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:4 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")
warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).")

View file

@ -1,7 +1,7 @@
# This test verifies basic agent-controller communication. We launch agent and # This test verifies basic agent-controller communication in the Management
# controller via the supervisor, add an extra handler for the notify_agent_hello # framework. We launch agent and controller via the supervisor, add an extra
# event that travels agent -> controller, and verify its print output in the # handler for the notify_agent_hello event that travels agent -> controller, and
# controller's stdout log. # verify its print output in the controller's stdout log.
# The following env vars is known to the controller framework # The following env vars is known to the controller framework
# @TEST-PORT: ZEEK_CONTROLLER_PORT # @TEST-PORT: ZEEK_CONTROLLER_PORT
@ -12,20 +12,20 @@
# @TEST-EXEC: btest-bg-wait 10 # @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/controller.stdout # @TEST-EXEC: btest-diff zeek/controller.stdout
@load policy/frameworks/cluster/agent @load policy/frameworks/management/agent
@load policy/frameworks/cluster/controller @load policy/frameworks/management/controller
redef Broker::default_port = to_port(getenv("BROKER_PORT")); redef Broker::default_port = to_port(getenv("BROKER_PORT"));
redef ClusterController::name = "controller"; redef Management::Controller::name = "controller";
redef ClusterAgent::name = "agent"; redef Management::Agent::name = "agent";
# Tell the agent where to locate the controller. # Tell the agent where to locate the controller.
redef ClusterAgent::controller = [$address="127.0.0.1", $bound_port=to_port(getenv("ZEEK_CONTROLLER_PORT"))]; redef Management::Agent::controller = [$address="127.0.0.1", $bound_port=to_port(getenv("ZEEK_CONTROLLER_PORT"))];
@if ( Supervisor::is_supervised() ) @if ( Supervisor::is_supervised() )
@load policy/frameworks/cluster/agent/api @load policy/frameworks/management/agent/api
global logged = F; global logged = F;
@ -41,7 +41,7 @@ event zeek_init()
} }
} }
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) event Management::Agent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
{ {
if ( Supervisor::node()$name == "controller" ) if ( Supervisor::node()$name == "controller" )
{ {

View file

@ -1 +1 @@
8e0401661fca089e941ab844694f9d2314f1401f 3528e248d7d35e102c39c8e8050fc1a6dfa477bf