mirror of
https://github.com/zeek/zeek.git
synced 2025-10-14 12:38:20 +00:00

This changes the basic agent-management model to one in which the configurations received from the client define not just the data cluster, but also set the set of acceptable instances. Unless connectivity already exists, the controller will establish peerings with new agents that listen, or wait for ones that connect to the controller to check in. Once all required agents are available, the controller triggers the new notify_agents_ready event, an agent/controller-level "cluster-is-ready" event. The controller also uses this event to submit a pending config update to the now-ready instances.
247 lines
8.2 KiB
Text
247 lines
8.2 KiB
Text
@load base/frameworks/broker
|
|
|
|
@load policy/frameworks/cluster/controller/config
|
|
@load policy/frameworks/cluster/controller/log
|
|
@load policy/frameworks/cluster/controller/request
|
|
|
|
@load ./api
|
|
|
|
redef ClusterController::role = ClusterController::Types::AGENT;
|
|
|
|
# The global configuration as passed to us by the controller
|
|
global g_config: ClusterController::Types::Configuration;
|
|
|
|
# A map to make other instance info accessible
|
|
global g_instances: table[string] of ClusterController::Types::Instance;
|
|
|
|
# A map for the nodes we run on this instance, via this agent.
|
|
global g_nodes: table[string] of ClusterController::Types::Node;
|
|
|
|
# The node map employed by the supervisor to describe the cluster
|
|
# topology to newly forked nodes. We refresh it when we receive
|
|
# new configurations.
|
|
global g_data_cluster: table[string] of Supervisor::ClusterEndpoint;
|
|
|
|
# Whether we currenty keep notifying the controller that we're here.
|
|
# We stop once the controller responds back, and resume when we've
|
|
# lost the peering.
|
|
global g_notify_controller: bool = T;
|
|
|
|
|
|
event SupervisorControl::create_response(reqid: string, result: string)
|
|
{
|
|
local req = ClusterController::Request::lookup(reqid);
|
|
if ( ClusterController::Request::is_null(req) )
|
|
return;
|
|
|
|
local name = req$supervisor_state$node;
|
|
|
|
if ( |result| > 0 )
|
|
{
|
|
local msg = fmt("failed to create node %s: %s", name, result);
|
|
ClusterController::Log::error(msg);
|
|
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name);
|
|
}
|
|
|
|
ClusterController::Request::finish(reqid);
|
|
}
|
|
|
|
event SupervisorControl::destroy_response(reqid: string, result: bool)
|
|
{
|
|
local req = ClusterController::Request::lookup(reqid);
|
|
if ( ClusterController::Request::is_null(req) )
|
|
return;
|
|
|
|
local name = req$supervisor_state$node;
|
|
|
|
if ( ! result )
|
|
{
|
|
local msg = fmt("failed to destroy node %s, %s", name, reqid);
|
|
ClusterController::Log::error(msg);
|
|
event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name);
|
|
}
|
|
|
|
ClusterController::Request::finish(reqid);
|
|
}
|
|
|
|
function supervisor_create(nc: Supervisor::NodeConfig)
|
|
{
|
|
local req = ClusterController::Request::create();
|
|
req$supervisor_state = ClusterController::Request::SupervisorState($node = nc$name);
|
|
event SupervisorControl::create_request(req$id, nc);
|
|
ClusterController::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id));
|
|
}
|
|
|
|
function supervisor_destroy(node: string)
|
|
{
|
|
local req = ClusterController::Request::create();
|
|
req$supervisor_state = ClusterController::Request::SupervisorState($node = node);
|
|
event SupervisorControl::destroy_request(req$id, node);
|
|
ClusterController::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id));
|
|
}
|
|
|
|
event ClusterAgent::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration)
|
|
{
|
|
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_request %s", reqid));
|
|
|
|
local nodename: string;
|
|
local node: ClusterController::Types::Node;
|
|
local nc: Supervisor::NodeConfig;
|
|
local msg: string;
|
|
|
|
# Adopt the global configuration provided.
|
|
# XXX this can later handle validation and persistence
|
|
# XXX should do this transactionally, only set when all else worked
|
|
g_config = config;
|
|
|
|
# Refresh the instances table:
|
|
g_instances = table();
|
|
for ( inst in config$instances )
|
|
g_instances[inst$name] = inst;
|
|
|
|
# Terminate existing nodes
|
|
for ( nodename in g_nodes )
|
|
supervisor_destroy(nodename);
|
|
|
|
g_nodes = table();
|
|
|
|
# Refresh the data cluster and nodes tables
|
|
|
|
g_data_cluster = table();
|
|
for ( node in config$nodes )
|
|
{
|
|
if ( node$instance == ClusterAgent::name )
|
|
g_nodes[node$name] = node;
|
|
|
|
local cep = Supervisor::ClusterEndpoint(
|
|
$role = node$role,
|
|
$host = g_instances[node$instance]$host,
|
|
$p = node$p);
|
|
|
|
if ( node?$interface )
|
|
cep$interface = node$interface;
|
|
|
|
g_data_cluster[node$name] = cep;
|
|
}
|
|
|
|
# Apply the new configuration via the supervisor
|
|
|
|
for ( nodename in g_nodes )
|
|
{
|
|
node = g_nodes[nodename];
|
|
nc = Supervisor::NodeConfig($name=nodename);
|
|
|
|
if ( ClusterAgent::cluster_directory != "" )
|
|
nc$directory = ClusterAgent::cluster_directory;
|
|
|
|
if ( node?$interface )
|
|
nc$interface = node$interface;
|
|
if ( node?$cpu_affinity )
|
|
nc$cpu_affinity = node$cpu_affinity;
|
|
if ( node?$scripts )
|
|
nc$scripts = node$scripts;
|
|
if ( node?$env )
|
|
nc$env = node$env;
|
|
|
|
# XXX could use options to enable per-node overrides for
|
|
# directory, stdout, stderr, others?
|
|
|
|
nc$cluster = g_data_cluster;
|
|
supervisor_create(nc);
|
|
}
|
|
|
|
# XXX this currently doesn not fail if any of above problems occurred,
|
|
# mainly due to the tediousness of handling the supervisor's response
|
|
# events asynchonously. The only indication of error will be
|
|
# notification events to the controller.
|
|
|
|
if ( reqid != "" )
|
|
{
|
|
local res = ClusterController::Types::Result(
|
|
$reqid = reqid,
|
|
$instance = ClusterAgent::name);
|
|
|
|
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s",
|
|
ClusterController::Types::result_to_string(res)));
|
|
event ClusterAgent::API::set_configuration_response(reqid, res);
|
|
}
|
|
}
|
|
|
|
event ClusterAgent::API::notify_controller_hello(controller: string, host: addr)
|
|
{
|
|
ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_controller_hello %s %s", controller, host));
|
|
g_notify_controller = F;
|
|
}
|
|
|
|
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
|
{
|
|
if ( g_notify_controller )
|
|
schedule 1sec { ClusterAgent::API::notify_agent_hello(instance, host, api_version) };
|
|
}
|
|
|
|
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
|
{
|
|
# This does not (cannot?) immediately verify that the new peer
|
|
# is in fact a controller, so we might send this redundantly.
|
|
# Controllers handle the hello event accordingly.
|
|
|
|
g_notify_controller = T;
|
|
|
|
local epi = ClusterAgent::endpoint_info();
|
|
|
|
event ClusterAgent::API::notify_agent_hello(epi$id,
|
|
to_addr(epi$network$address), ClusterAgent::API::version);
|
|
}
|
|
|
|
event zeek_init()
|
|
{
|
|
local epi = ClusterAgent::endpoint_info();
|
|
local agent_topic = ClusterAgent::topic_prefix + "/" + epi$id;
|
|
|
|
# The agent needs to peer with the supervisor -- this doesn't currently
|
|
# happen automatically. The address defaults to Broker's default, which
|
|
# relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker
|
|
# internally falls back to listening on any; we pick 127.0.0.1.
|
|
local supervisor_addr = Broker::default_listen_address;
|
|
if ( supervisor_addr == "" )
|
|
supervisor_addr = "127.0.0.1";
|
|
|
|
Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry);
|
|
|
|
# Agents need receive communication targeted at it, and any responses
|
|
# from the supervisor.
|
|
Broker::subscribe(agent_topic);
|
|
Broker::subscribe(SupervisorControl::topic_prefix);
|
|
|
|
# Auto-publish a bunch of events. Glob patterns or module-level
|
|
# auto-publish would be helpful here.
|
|
Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response);
|
|
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello);
|
|
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change);
|
|
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error);
|
|
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log);
|
|
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_response);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_response);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_response);
|
|
Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request);
|
|
|
|
# Establish connectivity with the controller.
|
|
if ( ClusterAgent::controller$address != "0.0.0.0" )
|
|
{
|
|
# We connect to the controller.
|
|
Broker::peer(ClusterAgent::controller$address,
|
|
ClusterAgent::controller$bound_port,
|
|
ClusterController::connect_retry);
|
|
}
|
|
else
|
|
{
|
|
# Controller connects to us; listen for it.
|
|
Broker::listen(cat(epi$network$address), epi$network$bound_port);
|
|
}
|
|
|
|
ClusterController::Log::info("agent is live");
|
|
}
|