zeek/scripts/policy/frameworks/cluster/controller/main.zeek
Christian Kreibich b57be021b7 Make all globals start with a "g_" prefix
This makes it easier to spot them in code, and is shorter than using explicit
namespacing.
2021-12-21 14:52:28 -08:00

383 lines
13 KiB
Text

@load base/frameworks/broker
@load policy/frameworks/cluster/agent/config
@load policy/frameworks/cluster/agent/api
@load ./api
@load ./log
@load ./request
redef ClusterController::role = ClusterController::Types::CONTROLLER;
global g_config_current: ClusterController::Types::Configuration;
global g_config_reqid_pending: string = "";
function send_config_to_agents(req: ClusterController::Request::Request,
config: ClusterController::Types::Configuration)
{
for ( name in ClusterController::instances )
{
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create();
areq$parent_id = req$id;
# We track the requests sent off to each agent. As the
# responses come in, we can check them off as completed,
# and once all are, we respond back to the client.
req$set_configuration_state$requests += areq;
# XXX could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner.
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config);
}
}
function drop_instance(inst: ClusterController::Types::Instance)
{
if ( inst$name in ClusterController::instances )
{
# Send this agent a config that will terminate any data cluster
# nodes it might have. This is "fire and forget" -- there will
# not be a response.
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name,
ClusterAgent::API::set_configuration_request,
"", ClusterController::Types::Configuration());
delete ClusterController::instances[inst$name];
}
# If the instance has a port, we peered with it, so now unpeer.
if ( inst?$listen_port )
Broker::unpeer(cat(inst$host), inst$listen_port );
ClusterController::Log::info(fmt("dropped instance %s", inst$name));
}
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
{
# See if we already know about this agent; if not, register
# it.
#
# XXX protection against rogue agents?
local inst: ClusterController::Types::Instance;
if ( instance in ClusterController::instances )
{
# Do nothing, unless this known agent checks in with a mismatching
# API version, in which case we kick it out.
if ( api_version != ClusterController::API::version )
drop_instance(ClusterController::instances[instance]);
# Update the instance name in the pointed-to record, in case it
# was previously named otherwise. Not being too picky here allows
# the user some leeway in spelling out the original config.
ClusterController::instances[instance]$name = instance;
return;
}
if ( api_version != ClusterController::API::version )
{
ClusterController::Log::warning(
fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), ignoring",
instance, host, api_version, ClusterController::API::version));
return;
}
ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host);
ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host));
# If we have a pending configuration request, check in on it now to see whether
# we have all agents required, and finalize the config request.
if ( g_config_reqid_pending == "" )
return;
local req = ClusterController::Request::lookup(g_config_reqid_pending);
if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state )
{
# Odd, just clear out pending state.
g_config_reqid_pending = "";
return;
}
for ( inst in req$set_configuration_state$config$instances )
{
if ( inst$name !in ClusterController::instances )
{
# The request still has instances not known to us, try again
# later.
return;
}
}
# All instances requested in the configuration are now known to us.
# Send them the config. As they send their response events we
# update the request state and eventually send the response event
# to the client.
send_config_to_agents(req, req$set_configuration_state$config);
g_config_reqid_pending = "";
}
event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node,
old: ClusterController::Types::State,
new: ClusterController::Types::State)
{
# XXX TODO
}
event ClusterAgent::API::notify_error(instance: string, msg: string, node: string)
{
# XXX TODO
}
event ClusterAgent::API::notify_log(instance: string, msg: string, node: string)
{
# XXX TODO
}
event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result)
{
ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid));
# Retrieve state for the request we just got a response to
local areq = ClusterController::Request::lookup(reqid);
if ( ClusterController::Request::is_null(areq) )
return;
# Record the result and mark the request as done. This also
# marks the request as done in the parent-level request, since
# these records are stored by reference.
areq$results[0] = result; # We only have a single result here atm
areq$finished = T;
# Update the original request from the client:
local req = ClusterController::Request::lookup(areq$parent_id);
if ( ClusterController::Request::is_null(req) )
return;
# If there are any requests to the agents still unfinished,
# we're not done yet.
for ( i in req$set_configuration_state$requests )
if ( ! req$set_configuration_state$requests[i]$finished )
return;
# All set_configuration requests to instances are done, so respond
# back to client. We need to compose the result, aggregating
# the results we got from the requests to the agents. In the
# end we have one Result per instance requested in the
# original set_configuration_request.
#
# XXX we can likely generalize result aggregation in the request module.
for ( i in req$set_configuration_state$requests )
{
local r = req$set_configuration_state$requests[i];
local success = T;
local errors: string_vec;
local instance = "";
for ( j in r$results )
{
local res = r$results[j];
instance = res$instance;
if ( res$success )
next;
success = F;
errors += fmt("node %s failed: %s", res$node, res$error);
}
req$results += ClusterController::Types::Result(
$reqid = req$id,
$instance = instance,
$success = success,
$error = join_string_vec(errors, ", ")
);
ClusterController::Request::finish(r$id);
}
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
}
event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration)
{
ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid));
local res: ClusterController::Types::Result;
local req = ClusterController::Request::create(reqid);
req$set_configuration_state = ClusterController::Request::SetConfigurationState($config = config);
# At the moment there can only be one pending request.
if ( g_config_reqid_pending != "" )
{
res = ClusterController::Types::Result($reqid=reqid);
res$success = F;
res$error = fmt("request %s still pending", g_config_reqid_pending);
req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
return;
}
# Compare the instance configuration to our current one. For instances
# that are supposed to be checking in with us but have not, record
# errors. We will fail the request in those cases. For instances we
# don't know and are supposed to connect to, do so next.
local inst_name: string;
local inst: ClusterController::Types::Instance;
local insts_to_add: ClusterController::Types::InstanceVec;
# A set of instances not contained in the config, that therefore needs
# to be dropped.
local insts_to_drop: set[string] = {};
for ( inst_name in ClusterController::instances )
add insts_to_drop[inst_name];
for ( inst in config$instances )
{
if ( inst$name in ClusterController::instances )
{
# Verify that it's actually the same endpoint. If not, kick out
# the old one. XXX 0.0.0.0 is a workaround until we've resolved
# how agents that peer with us obtain their identity.
if ( ( inst$host != 0.0.0.0 && inst$host != ClusterController::instances[inst$name]$host ) ||
inst?$listen_port != ClusterController::instances[inst$name]?$listen_port ||
( inst?$listen_port && ClusterController::instances[inst$name]?$listen_port &&
inst$listen_port != ClusterController::instances[inst$name]$listen_port ) )
{
drop_instance(ClusterController::instances[inst$name]);
}
else
{
# We know this instance, don't drop it below.
delete insts_to_drop[inst$name];
}
}
if ( inst$name !in ClusterController::instances )
{
# It's an instance we don't currently know about.
if ( ! inst?$listen_port )
{
# If a requested instance doesn't have a listen port and isn't known
# to us, we have no way to establish connectivity. We reject the
# request.
res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name);
res$success = F;
res$error = fmt("instance %s is unknown", inst$name);
req$results += res;
}
else
{
# We'll need to connect to this instance.
insts_to_add[|insts_to_add|] = inst;
}
}
}
# An error at this point means that we're rejecting this request.
if ( |req$results| > 0 )
{
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
return;
}
for ( inst_name in insts_to_drop )
drop_instance(ClusterController::instances[inst_name]);
# We have instances to connect to, so initiate peering and stop for now.
# Processing will continue as the agents check in. That's also when they
# get added to ClusterController::instances.
if ( |insts_to_add| > 0 )
{
for ( idx in insts_to_add )
{
inst = insts_to_add[idx];
Broker::peer(cat(inst$host), inst$listen_port,
ClusterController::connect_retry);
}
g_config_reqid_pending = req$id;
return;
}
# XXX validate the configuration:
# - Are node instances among defined instances?
# - Are all names unique?
# - Are any node options understood?
# - Do node types with optional fields have required values?
# ...
# Response event gets sent via the agents' reponse event.
send_config_to_agents(req, config);
}
event ClusterController::API::get_instances_request(reqid: string)
{
ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid));
local insts: vector of ClusterController::Types::Instance;
for ( i in ClusterController::instances )
insts += ClusterController::instances[i];
ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid));
event ClusterController::API::get_instances_response(reqid, insts);
}
event zeek_init()
{
# Controller always listens -- it needs to be able to respond
# to the Zeek client. This port is also used by the agents
# if they connect to the client.
local cni = ClusterController::network_info();
Broker::listen(cat(cni$address), cni$bound_port);
Broker::subscribe(ClusterAgent::topic_prefix);
Broker::subscribe(ClusterController::topic);
Broker::auto_publish(ClusterController::topic,
ClusterController::API::get_instances_response);
Broker::auto_publish(ClusterController::topic,
ClusterController::API::set_configuration_response);
if ( |ClusterController::instances| > 0 )
{
# We peer with the agents -- otherwise, the agents peer
# with (i.e., connect to) us.
for ( i in ClusterController::instances )
{
local inst = ClusterController::instances[i];
if ( ! inst?$listen_port )
{
# XXX config error -- this must be there
next;
}
Broker::peer(cat(inst$host), inst$listen_port,
ClusterController::connect_retry);
}
}
# If ClusterController::instances is empty, agents peer with
# us and we do nothing. We'll build up state as the
# notify_agent_hello() events come int.
ClusterController::Log::info("controller is live");
}