mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00

This is a preliminary implementation of a subset of the functionality set out in our cluster controller architecture. The controller is the central management node, existing once in any Zeek cluster. The agent is a node that runs once per instance, where an instance will commonly be a physical machine. The agent in turn manages the "data cluster", i.e. the traditional notion of a Zeek cluster with manager, worker nodes, etc. Agent and controller live in the policy folder, and are activated when loading policy/frameworks/cluster/agent and policy/frameworks/cluster/controller, respectively. Both run in nodes forked by the supervisor. When Zeek doesn't use the supervisor, they do nothing. Otherwise, boot.zeek instructs the supervisor to create the respective node, running main.zeek. Both controller and agent have their own config.zeek with relevant knobs. For both, controller/types.zeek provides common data types, and controller/log.zeek provides basic logging (without logger communication -- no such node might exist). A primitive request-tracking abstraction can be found in controller/request.zeek to track outstanding request events and their subsequent responses.
250 lines
8.1 KiB
Text
250 lines
8.1 KiB
Text
@load base/frameworks/broker
|
|
|
|
@load policy/frameworks/cluster/agent/config
|
|
@load policy/frameworks/cluster/agent/api
|
|
|
|
@load ./api
|
|
@load ./log
|
|
@load ./request
|
|
|
|
redef ClusterController::role = ClusterController::Types::CONTROLLER;
|
|
|
|
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
|
{
|
|
# See if we already know about this agent; if not, register
|
|
# it.
|
|
#
|
|
# XXX protection against rogue agents?
|
|
|
|
if ( instance in ClusterController::instances )
|
|
{
|
|
# Do nothing, unless this known agent checks in with a mismatching
|
|
# API version, in which case we kick it out.
|
|
if ( api_version != ClusterController::API::version )
|
|
{
|
|
local inst = ClusterController::instances[instance];
|
|
if ( inst?$listen_port )
|
|
{
|
|
# We peered with this instance, unpeer.
|
|
Broker::unpeer(cat(inst$host), inst$listen_port );
|
|
# XXX what to do if they connected to us?
|
|
}
|
|
delete ClusterController::instances[instance];
|
|
}
|
|
|
|
# Update the instance name in the pointed-to record, in case it
|
|
# was previously named otherwise. Not being too picky here allows
|
|
# the user some leeway in spelling out the original config.
|
|
ClusterController::instances[instance]$name = instance;
|
|
|
|
return;
|
|
}
|
|
|
|
if ( api_version != ClusterController::API::version )
|
|
{
|
|
ClusterController::Log::warning(
|
|
fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), unpeering",
|
|
instance, host, api_version, ClusterController::API::version));
|
|
}
|
|
|
|
ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host);
|
|
ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host));
|
|
}
|
|
|
|
|
|
event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node,
|
|
old: ClusterController::Types::State,
|
|
new: ClusterController::Types::State)
|
|
{
|
|
# XXX TODO
|
|
}
|
|
|
|
event ClusterAgent::API::notify_error(instance: string, msg: string, node: string)
|
|
{
|
|
# XXX TODO
|
|
}
|
|
|
|
event ClusterAgent::API::notify_log(instance: string, msg: string, node: string)
|
|
{
|
|
# XXX TODO
|
|
}
|
|
|
|
event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result)
|
|
{
|
|
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid));
|
|
|
|
# Retrieve state for the request we just got a response to
|
|
local areq = ClusterController::Request::lookup(reqid);
|
|
if ( ClusterController::Request::is_null(areq) )
|
|
return;
|
|
|
|
# Record the result and mark the request as done. This also
|
|
# marks the request as done in the parent-level request, since
|
|
# these records are stored by reference.
|
|
areq$results[0] = result; # We only have a single result here atm
|
|
areq$finished = T;
|
|
|
|
# Update the original request from the client:
|
|
local req = ClusterController::Request::lookup(areq$parent_id);
|
|
if ( ClusterController::Request::is_null(req) )
|
|
return;
|
|
|
|
# If there are any requests to the agents still unfinished,
|
|
# we're not done yet.
|
|
for ( i in req$set_configuration_state$requests )
|
|
if ( ! req$set_configuration_state$requests[i]$finished )
|
|
return;
|
|
|
|
# All set_configuration requests to instances are done, so respond
|
|
# back to client. We need to compose the result, aggregating
|
|
# the results we got from the requests to the agents. In the
|
|
# end we have one Result per instance requested in the
|
|
# original set_configuration_request.
|
|
#
|
|
# XXX we can likely generalize result aggregation in the request module.
|
|
for ( i in req$set_configuration_state$requests )
|
|
{
|
|
local r = req$set_configuration_state$requests[i];
|
|
|
|
local success = T;
|
|
local errors: string_vec;
|
|
local instance = "";
|
|
|
|
for ( j in r$results )
|
|
{
|
|
local res = r$results[j];
|
|
instance = res$instance;
|
|
|
|
if ( res$success )
|
|
next;
|
|
|
|
success = F;
|
|
errors += fmt("node %s failed: %s", res$node, res$error);
|
|
}
|
|
|
|
req$results += ClusterController::Types::Result(
|
|
$reqid = req$id,
|
|
$instance = instance,
|
|
$success = success,
|
|
$error = join_string_vec(errors, ", ")
|
|
);
|
|
|
|
ClusterController::Request::finish(r$id);
|
|
}
|
|
|
|
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", req$id));
|
|
event ClusterController::API::set_configuration_response(req$id, req$results);
|
|
ClusterController::Request::finish(req$id);
|
|
}
|
|
|
|
event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration)
|
|
{
|
|
ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid));
|
|
|
|
local req = ClusterController::Request::create(reqid);
|
|
req$set_configuration_state = ClusterController::Request::SetConfigurationState();
|
|
|
|
# Compare new configuration to the current one and send updates
|
|
# to the instances as needed.
|
|
if ( config?$instances )
|
|
{
|
|
# XXX properly handle instance update: connect to new instances provided
|
|
# when they are listening, accept connections from new instances that are
|
|
# not
|
|
for ( inst in config$instances )
|
|
{
|
|
if ( inst$name !in ClusterController::instances )
|
|
{
|
|
local res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name);
|
|
res$error = fmt("instance %s is unknown, skipping", inst$name);
|
|
req$results += res;
|
|
}
|
|
}
|
|
}
|
|
|
|
# XXX validate the configuration:
|
|
# - Are node instances among defined instances?
|
|
# - Are all names unique?
|
|
# - Are any node options understood?
|
|
# - Do node types with optional fields have required values?
|
|
# ...
|
|
|
|
# Transmit the configuration on to the agents. They need to be aware of
|
|
# each other's location and nodes, so the data cluster nodes can connect
|
|
# (for example, so a worker on instance 1 can connect to a logger on
|
|
# instance 2).
|
|
for ( name in ClusterController::instances )
|
|
{
|
|
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
|
|
local areq = ClusterController::Request::create();
|
|
areq$parent_id = reqid;
|
|
|
|
# We track the requests sent off to each agent. As the
|
|
# responses come in, we can check them off as completed,
|
|
# and once all are, we respond back to the client.
|
|
req$set_configuration_state$requests += areq;
|
|
|
|
# XXX could also broadcast just once on the agent prefix, but
|
|
# explicit request/response pairs for each agent seems cleaner.
|
|
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s",
|
|
areq$id, name));
|
|
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config);
|
|
}
|
|
|
|
# Response event gets sent via the agents' reponse event.
|
|
}
|
|
|
|
event ClusterController::API::get_instances_request(reqid: string)
|
|
{
|
|
ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid));
|
|
|
|
local insts: vector of ClusterController::Types::Instance;
|
|
|
|
for ( i in ClusterController::instances )
|
|
insts += ClusterController::instances[i];
|
|
|
|
ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid));
|
|
event ClusterController::API::get_instances_response(reqid, insts);
|
|
}
|
|
|
|
event zeek_init()
|
|
{
|
|
# Controller always listens -- it needs to be able to respond
|
|
# to the Zeek client. This port is also used by the agents
|
|
# if they connect to the client.
|
|
local cni = ClusterController::network_info();
|
|
Broker::listen(cat(cni$address), cni$bound_port);
|
|
|
|
Broker::subscribe(ClusterAgent::topic_prefix);
|
|
Broker::subscribe(ClusterController::topic);
|
|
|
|
Broker::auto_publish(ClusterController::topic,
|
|
ClusterController::API::get_instances_response);
|
|
Broker::auto_publish(ClusterController::topic,
|
|
ClusterController::API::set_configuration_response);
|
|
|
|
if ( |ClusterController::instances| > 0 )
|
|
{
|
|
# We peer with the agents -- otherwise, the agents peer
|
|
# with (i.e., connect to) us.
|
|
for ( i in ClusterController::instances )
|
|
{
|
|
local inst = ClusterController::instances[i];
|
|
|
|
if ( ! inst?$listen_port )
|
|
{
|
|
# XXX config error -- this must be there
|
|
next;
|
|
}
|
|
|
|
Broker::peer(cat(inst$host), inst$listen_port,
|
|
ClusterController::connect_retry);
|
|
}
|
|
}
|
|
|
|
# If ClusterController::instances is empty, agents peer with
|
|
# us and we do nothing. We'll build up state as the
|
|
# notify_agent_hello() events come int.
|
|
|
|
ClusterController::Log::info("controller is live");
|
|
}
|