Support on-demand peering with agents when receiving new cluster configuration

Prior to this, static configuration needed to be in place to configure the
controller/agent layout. The configuration update can now include new instances
that the controller will connect to, assuming they're instances with a listening
agent.
This commit is contained in:
Christian Kreibich 2021-11-09 21:55:30 -08:00
parent 484f79f599
commit 5cb44c2f69
2 changed files with 135 additions and 44 deletions

View file

@ -9,6 +9,30 @@
redef ClusterController::role = ClusterController::Types::CONTROLLER;
global config_current: ClusterController::Types::Configuration;
global config_reqid_pending: string = "";
function send_config_to_agents(req: ClusterController::Request::Request,
config: ClusterController::Types::Configuration)
{
for ( name in ClusterController::instances )
{
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create();
areq$parent_id = req$id;
# We track the requests sent off to each agent. As the
# responses come in, we can check them off as completed,
# and once all are, we respond back to the client.
req$set_configuration_state$requests += areq;
# XXX could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner.
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config);
}
}
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
{
# See if we already know about this agent; if not, register
@ -16,13 +40,15 @@ event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_ve
#
# XXX protection against rogue agents?
local inst: ClusterController::Types::Instance;
if ( instance in ClusterController::instances )
{
# Do nothing, unless this known agent checks in with a mismatching
# API version, in which case we kick it out.
if ( api_version != ClusterController::API::version )
{
local inst = ClusterController::instances[instance];
inst = ClusterController::instances[instance];
if ( inst?$listen_port )
{
# We peered with this instance, unpeer.
@ -36,19 +62,50 @@ event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_ve
# was previously named otherwise. Not being too picky here allows
# the user some leeway in spelling out the original config.
ClusterController::instances[instance]$name = instance;
return;
}
if ( api_version != ClusterController::API::version )
{
ClusterController::Log::warning(
fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), unpeering",
fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), ignoring",
instance, host, api_version, ClusterController::API::version));
return;
}
ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host);
ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host));
# If we have a pending configuration request, check in on it now to see whether
# we have all agents required, and finalize the config request.
if ( config_reqid_pending == "" )
return;
local req = ClusterController::Request::lookup(config_reqid_pending);
if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state )
{
# Odd, just clear out pending state.
config_reqid_pending = "";
return;
}
for ( inst in req$set_configuration_state$config$instances )
{
if ( inst$name !in ClusterController::instances )
{
# The request still has instances not known to us, try again
# later.
return;
}
}
# All instances requested in the configuration are now known to us.
# Send them the config. As they send their response events we
# update the request state and eventually send the response event
# to the client.
send_config_to_agents(req, req$set_configuration_state$config);
config_reqid_pending = "";
}
@ -132,7 +189,8 @@ event ClusterAgent::API::set_configuration_response(reqid: string, result: Clust
ClusterController::Request::finish(r$id);
}
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", req$id));
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
}
@ -141,27 +199,80 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
{
ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid));
local res: ClusterController::Types::Result;
local req = ClusterController::Request::create(reqid);
req$set_configuration_state = ClusterController::Request::SetConfigurationState();
req$set_configuration_state = ClusterController::Request::SetConfigurationState($config = config);
# Compare new configuration to the current one and send updates
# to the instances as needed.
if ( config?$instances )
# At the moment there can only be one pending request.
if ( config_reqid_pending != "" )
{
# XXX properly handle instance update: connect to new instances provided
# when they are listening, accept connections from new instances that are
# not
for ( inst in config$instances )
res = ClusterController::Types::Result($reqid=reqid);
res$success = F;
res$error = fmt("request %s still pending", config_reqid_pending);
req$results += res;
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
return;
}
# Compare the instance configuration to our current one. For instances
# that are supposed to be checking in with us but have not, record
# errors. We will fail the request in those cases. For instances we
# don't know and are supposed to connect to, do so next.
local inst: ClusterController::Types::Instance;
local insts_todo: ClusterController::Types::InstanceVec;
for ( inst in config$instances )
{
if ( inst$name !in ClusterController::instances )
{
if ( inst$name !in ClusterController::instances )
if ( ! inst?$listen_port )
{
local res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name);
res$error = fmt("instance %s is unknown, skipping", inst$name);
# If a requested instance doesn't have a listen port and isn't known
# to us, we have no way to establish connectivity. We reject the
# request.
res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name);
res$success = F;
res$error = fmt("instance %s is unknown", inst$name);
req$results += res;
}
else
{
# We'll need to connect to this instance.
insts_todo[|insts_todo|] = inst;
}
}
}
# An error at this point means that we're rejecting this request.
if ( |req$results| > 0 )
{
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
ClusterController::Request::to_string(req)));
event ClusterController::API::set_configuration_response(req$id, req$results);
ClusterController::Request::finish(req$id);
return;
}
# We have instances to connect to, so initiate peering and
# stop for now. Processing will continue as the agents check
# in.
if ( |insts_todo| > 0 )
{
for ( idx in insts_todo )
{
inst = insts_todo[idx];
Broker::peer(cat(inst$host), inst$listen_port,
ClusterController::connect_retry);
}
config_reqid_pending = req$id;
return;
}
# XXX validate the configuration:
# - Are node instances among defined instances?
# - Are all names unique?
@ -169,29 +280,8 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
# - Do node types with optional fields have required values?
# ...
# Transmit the configuration on to the agents. They need to be aware of
# each other's location and nodes, so the data cluster nodes can connect
# (for example, so a worker on instance 1 can connect to a logger on
# instance 2).
for ( name in ClusterController::instances )
{
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
local areq = ClusterController::Request::create();
areq$parent_id = reqid;
# We track the requests sent off to each agent. As the
# responses come in, we can check them off as completed,
# and once all are, we respond back to the client.
req$set_configuration_state$requests += areq;
# XXX could also broadcast just once on the agent prefix, but
# explicit request/response pairs for each agent seems cleaner.
ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s",
areq$id, name));
Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config);
}
# Response event gets sent via the agents' reponse event.
send_config_to_agents(req, config);
}
event ClusterController::API::get_instances_request(reqid: string)

View file

@ -30,6 +30,8 @@ export {
listen_port: port &optional;
};
type InstanceVec: vector of Instance;
## State that a Cluster Node can be in. State changes trigger an
## API notification (see notify_change()).
type State: enum {
@ -59,7 +61,6 @@ export {
id: string &default=unique_id(""); # Unique identifier for a particular configuration
## The instances in the cluster.
## XXX we may be able to make this optional
instances: set[Instance];
## The set of nodes in the cluster, as distributed over the instances.
@ -68,12 +69,12 @@ export {
# Return value for request-response API event pairs
type Result: record {
reqid: string; # Request ID of operation this result refers to
instance: string; # Name of associated instance (for context)
success: bool &default=T; # True if successful
data: any &optional; # Addl data returned for successful operation
error: string &default=""; # Descriptive error on failure
node: string &optional; # Name of associated node (for context)
reqid: string; # Request ID of operation this result refers to
instance: string &default=""; # Name of associated instance (for context)
success: bool &default=T; # True if successful
data: any &optional; # Addl data returned for successful operation
error: string &default=""; # Descriptive error on failure
node: string &optional; # Name of associated node (for context)
};
type ResultVec: vector of Result;