mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Remove periodic pinging of controller by agents
This changes the agent-controller communication to remove the need for ongoing pinging of the controller by agents not actively "in service". Instead, agents now use the notify_agent_hello event to the controller to report only their identity. The controller puts them into service via an agent_welcome_request/ response pair, and takes them out of service via agent_standby_request/response. This removes the on_change handler from the set of agents that is ready for service, because not every change to this set is now a suitable time to potentially send out the configuration. We now invoke this check explicitly in the two situations where it's warranted: when a agent reports ready for service, and when we've received a new configuration.
This commit is contained in:
parent
8463f14a52
commit
ac40d5c5b2
3 changed files with 208 additions and 113 deletions
|
@ -8,14 +8,35 @@ export {
|
|||
|
||||
# Agent API events
|
||||
|
||||
# The controller uses this event to convey a new cluster
|
||||
# configuration to the agent. Once processed, the agent
|
||||
# responds with the response event.
|
||||
global set_configuration_request: event(reqid: string,
|
||||
config: ClusterController::Types::Configuration);
|
||||
global set_configuration_response: event(reqid: string,
|
||||
result: ClusterController::Types::Result);
|
||||
|
||||
# The controller uses this event to confirm to the agent
|
||||
# that it is part of the current cluster. The agent
|
||||
# acknowledges with the response event.
|
||||
global agent_welcome_request: event(reqid: string);
|
||||
global agent_welcome_response: event(reqid: string,
|
||||
result: ClusterController::Types::Result);
|
||||
|
||||
# The controller sends this event to convey that the agent is not
|
||||
# currently required. This status may later change, depending on
|
||||
# updates from the client, so the peering can remain active. The
|
||||
# agent releases any cluster-related resources when processing the
|
||||
# request.
|
||||
global agent_standby_request: event(reqid: string);
|
||||
global agent_standby_response: event(reqid: string,
|
||||
result: ClusterController::Types::Result);
|
||||
|
||||
# Notification events, agent -> controller
|
||||
|
||||
# Report agent being available.
|
||||
# The agent sends this upon peering as a "check in", informing the
|
||||
# controller that an agent of the given name is now available to
|
||||
# communicate with.
|
||||
global notify_agent_hello: event(instance: string, host: addr,
|
||||
api_version: count);
|
||||
|
||||
|
@ -30,11 +51,4 @@ export {
|
|||
|
||||
# Report informational message.
|
||||
global notify_log: event(instance: string, msg: string, node: string &default="");
|
||||
|
||||
# Notification events, controller -> agent
|
||||
|
||||
# Confirmation from controller in response to notify_agent_hello
|
||||
# that the agent is welcome.
|
||||
global notify_controller_hello: event(controller: string, host: addr);
|
||||
|
||||
}
|
||||
|
|
|
@ -24,11 +24,6 @@ global g_nodes: table[string] of ClusterController::Types::Node;
|
|||
# new configurations.
|
||||
global g_data_cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||
|
||||
# Whether we currenty keep notifying the controller that we're here.
|
||||
# We stop once the controller responds back, and resume when we've
|
||||
# lost the peering.
|
||||
global g_notify_controller: bool = T;
|
||||
|
||||
|
||||
event SupervisorControl::create_response(reqid: string, result: string)
|
||||
{
|
||||
|
@ -169,25 +164,44 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste
|
|||
}
|
||||
}
|
||||
|
||||
event ClusterAgent::API::notify_controller_hello(controller: string, host: addr)
|
||||
event ClusterAgent::API::agent_welcome_request(reqid: string)
|
||||
{
|
||||
ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_controller_hello %s %s", controller, host));
|
||||
g_notify_controller = F;
|
||||
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid));
|
||||
|
||||
local res = ClusterController::Types::Result(
|
||||
$reqid = reqid,
|
||||
$instance = ClusterAgent::name);
|
||||
|
||||
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_response %s",
|
||||
ClusterController::Types::result_to_string(res)));
|
||||
event ClusterAgent::API::agent_welcome_response(reqid, res);
|
||||
}
|
||||
|
||||
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
||||
event ClusterAgent::API::agent_standby_request(reqid: string)
|
||||
{
|
||||
if ( g_notify_controller )
|
||||
schedule 1sec { ClusterAgent::API::notify_agent_hello(instance, host, api_version) };
|
||||
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_standby_request %s", reqid));
|
||||
|
||||
# We shut down any existing cluster nodes via an empty configuration,
|
||||
# and fall silent. We do not unpeer/disconnect (assuming we earlier
|
||||
# peered/connected -- otherwise there's nothing we can do here via
|
||||
# Broker anyway), mainly to keep open the possibility of running
|
||||
# cluster nodes again later.
|
||||
event ClusterAgent::API::set_configuration_request("", ClusterController::Types::Configuration());
|
||||
|
||||
local res = ClusterController::Types::Result(
|
||||
$reqid = reqid,
|
||||
$instance = ClusterAgent::name);
|
||||
|
||||
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_response %s",
|
||||
ClusterController::Types::result_to_string(res)));
|
||||
event ClusterAgent::API::agent_standby_response(reqid, res);
|
||||
}
|
||||
|
||||
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||
{
|
||||
# This does not (cannot?) immediately verify that the new peer
|
||||
# is in fact a controller, so we might send this redundantly.
|
||||
# Controllers handle the hello event accordingly.
|
||||
|
||||
g_notify_controller = T;
|
||||
# is in fact a controller, so we might send this in vain.
|
||||
# Controllers register the agent upon receipt of the event.
|
||||
|
||||
local epi = ClusterAgent::endpoint_info();
|
||||
|
||||
|
@ -218,6 +232,9 @@ event zeek_init()
|
|||
# Auto-publish a bunch of events. Glob patterns or module-level
|
||||
# auto-publish would be helpful here.
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response);
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response);
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response);
|
||||
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello);
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change);
|
||||
Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error);
|
||||
|
|
|
@ -12,23 +12,36 @@ module ClusterController::Runtime;
|
|||
|
||||
redef ClusterController::role = ClusterController::Types::CONTROLLER;
|
||||
|
||||
global check_instances_ready: function(insts: set[string], tc: TableChange, inst: string);
|
||||
global check_instances_ready: function();
|
||||
global add_instance: function(inst: ClusterController::Types::Instance);
|
||||
global drop_instance: function(inst: ClusterController::Types::Instance);
|
||||
|
||||
global null_config: function(): ClusterController::Types::Configuration;
|
||||
global is_null_config: function(config: ClusterController::Types::Configuration): bool;
|
||||
|
||||
# The desired set of agents the controller interact with, as provided by the
|
||||
# most recent config update sent by the client. They key is a name of each
|
||||
# instance. This should match the $name member of the instance records.
|
||||
# Checks whether the given instance is one that we know with different
|
||||
# communication settings: a a different peering direction, a different listening
|
||||
# port, etc. Used as a predicate to indicate when we need to drop the existing
|
||||
# one from our internal state.
|
||||
global is_instance_connectivity_change: function
|
||||
(inst: ClusterController::Types::Instance): bool;
|
||||
|
||||
# The set of agents the controller interacts with to manage to currently
|
||||
# configured cluster. This may be a subset of all the agents known to the
|
||||
# controller, as tracked by the g_instances_known set. They key is the instance
|
||||
# name and should match the $name member of the corresponding instance record.
|
||||
global g_instances: table[string] of ClusterController::Types::Instance = table();
|
||||
|
||||
# The set of instances that have checked in with the controller. This is a
|
||||
# superset of g_instances, since it covers any agent that has sent us a
|
||||
# notify_agent_hello event.
|
||||
global g_instances_known: set[string] = set();
|
||||
|
||||
# A corresponding set of instances/agents that we track in order to understand
|
||||
# when all of the above instances have checked in with a notify_agent_hello
|
||||
# event. (An alternative would be to use a record that adds a single state bit
|
||||
# for each instance, and store that above.)
|
||||
global g_instances_ready: set[string] = set() &on_change=check_instances_ready;
|
||||
# when all of the above instances have sent agent_welcome_response events. (An
|
||||
# alternative would be to use a record that adds a single state bit for each
|
||||
# instance, and store that above.)
|
||||
global g_instances_ready: set[string] = set();
|
||||
|
||||
# The request ID of the most recent configuration update that's come in from
|
||||
# a client. We track it here until we know we are ready to communicate with all
|
||||
|
@ -44,6 +57,9 @@ function send_config_to_agents(req: ClusterController::Request::Request,
|
|||
{
|
||||
for ( name in g_instances )
|
||||
{
|
||||
if ( name !in g_instances_ready )
|
||||
next;
|
||||
|
||||
local agent_topic = ClusterAgent::topic_prefix + "/" + name;
|
||||
local areq = ClusterController::Request::create();
|
||||
areq$parent_id = req$id;
|
||||
|
@ -60,24 +76,18 @@ function send_config_to_agents(req: ClusterController::Request::Request,
|
|||
}
|
||||
}
|
||||
|
||||
# This is the &on_change handler for the g_instances_ready set.
|
||||
function check_instances_ready(insts: set[string], tc: TableChange, inst: string)
|
||||
# This is the &on_change handler for the g_instances_ready set, meaning
|
||||
# it runs whenever a required agent has confirmed it's ready.
|
||||
function check_instances_ready()
|
||||
{
|
||||
local cur_instances: set[string];
|
||||
|
||||
# See if the new update to the readiness set makes it match the current
|
||||
# instances. If so, trigger the notify_agents_ready event.
|
||||
if ( tc == TABLE_ELEMENT_NEW || tc == TABLE_ELEMENT_REMOVED )
|
||||
{
|
||||
for ( inst in g_instances )
|
||||
add cur_instances[inst];
|
||||
|
||||
if ( cur_instances == g_instances_ready )
|
||||
{
|
||||
event ClusterController::API::notify_agents_ready(cur_instances);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function add_instance(inst: ClusterController::Types::Instance)
|
||||
{
|
||||
|
@ -86,6 +96,20 @@ function add_instance(inst: ClusterController::Types::Instance)
|
|||
if ( inst?$listen_port )
|
||||
Broker::peer(cat(inst$host), inst$listen_port,
|
||||
ClusterController::connect_retry);
|
||||
|
||||
if ( inst$name in g_instances_known )
|
||||
{
|
||||
# The agent has already peered with us. Send welcome to indicate
|
||||
# it's part of cluster management. Once it responds, we update
|
||||
# the set of ready instances and proceed as feasible with config
|
||||
# deployments.
|
||||
|
||||
local req = ClusterController::Request::create();
|
||||
|
||||
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", inst$name));
|
||||
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name,
|
||||
ClusterAgent::API::agent_welcome_request, req$id);
|
||||
}
|
||||
}
|
||||
|
||||
function drop_instance(inst: ClusterController::Types::Instance)
|
||||
|
@ -93,22 +117,19 @@ function drop_instance(inst: ClusterController::Types::Instance)
|
|||
if ( inst$name !in g_instances )
|
||||
return;
|
||||
|
||||
# Send this agent a config that will terminate any data cluster
|
||||
# nodes it might have. This is "fire and forget" -- there will
|
||||
# not be a response.
|
||||
# Send the agent a standby so it shuts down its cluster nodes & state
|
||||
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_request to %s", inst$name));
|
||||
Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name,
|
||||
ClusterAgent::API::set_configuration_request,
|
||||
"", ClusterController::Types::Configuration());
|
||||
|
||||
# If the instance has a port, we peered with it, so now unpeer.
|
||||
if ( inst?$listen_port )
|
||||
Broker::unpeer(cat(inst$host), inst$listen_port );
|
||||
ClusterAgent::API::agent_standby_request, "");
|
||||
|
||||
delete g_instances[inst$name];
|
||||
|
||||
if ( inst$name in g_instances_ready )
|
||||
delete g_instances_ready[inst$name];
|
||||
|
||||
# The agent remains in g_instances_known, to track that we're able
|
||||
# to communicate with it in case it's required again.
|
||||
|
||||
ClusterController::Log::info(fmt("dropped instance %s", inst$name));
|
||||
}
|
||||
|
||||
|
@ -122,69 +143,113 @@ function is_null_config(config: ClusterController::Types::Configuration): bool
|
|||
return config$id == "";
|
||||
}
|
||||
|
||||
function is_instance_connectivity_change(inst: ClusterController::Types::Instance): bool
|
||||
{
|
||||
# If we're not tracking this instance as part of a cluster config, it's
|
||||
# not a change. (More precisely: we cannot say whether it's changed.)
|
||||
if ( inst$name !in g_instances )
|
||||
return F;
|
||||
|
||||
# The agent has peered with us and now uses a different host.
|
||||
# XXX 0.0.0.0 is a workaround until we've resolved how agents that peer
|
||||
# with us obtain their identity. Broker ID?
|
||||
if ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host )
|
||||
return T;
|
||||
|
||||
# The agent has a listening port and the one we know does not, or vice
|
||||
# versa. I.e., this is a change in the intended peering direction.
|
||||
if ( inst?$listen_port != g_instances[inst$name]?$listen_port )
|
||||
return T;
|
||||
|
||||
# Both have listening ports, but they differ.
|
||||
if ( inst?$listen_port && g_instances[inst$name]?$listen_port &&
|
||||
inst$listen_port != g_instances[inst$name]$listen_port )
|
||||
return T;
|
||||
|
||||
return F;
|
||||
}
|
||||
|
||||
event ClusterController::API::notify_agents_ready(instances: set[string])
|
||||
{
|
||||
local insts = ClusterController::Util::set_to_vector(instances);
|
||||
|
||||
ClusterController::Log::info(fmt("rx ClusterController::API:notify_agents_ready %s", join_string_vec(insts, ",")));
|
||||
|
||||
# When all agents are ready, send them the pending config update.
|
||||
local req = ClusterController::Request::lookup(g_config_reqid_pending);
|
||||
|
||||
# If the request isn't available or doesn't have config state, we just
|
||||
# clear it out and stop.
|
||||
# If there's no pending request, when it's no longer available, or it
|
||||
# doesn't have config state, don't do anything else.
|
||||
if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state )
|
||||
{
|
||||
g_config_reqid_pending = "";
|
||||
return;
|
||||
}
|
||||
|
||||
# All instances requested in the pending configuration update are now
|
||||
# known to us. Send them the config. As they send their response events
|
||||
# we update the client's request state and eventually send the response
|
||||
# event to the it.
|
||||
send_config_to_agents(req, req$set_configuration_state$config);
|
||||
|
||||
# The request object continues to exist and will be referenced by the
|
||||
# responses coming in, but there's no longer a pending config update to
|
||||
# track.
|
||||
g_config_reqid_pending = "";
|
||||
}
|
||||
|
||||
event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
||||
{
|
||||
if ( instance !in g_instances )
|
||||
{
|
||||
# An unknown agent has checked in. This can happen with agentsthat aren't yet
|
||||
# showing in a configuration received by the client. We log at debug level only.
|
||||
ClusterController::Log::debug(
|
||||
fmt("unknown instance %s/%s has checked in, ignoring", instance, host));
|
||||
return;
|
||||
}
|
||||
|
||||
ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_agent_hello %s %s", instance, host));
|
||||
|
||||
local inst = g_instances[instance];
|
||||
|
||||
# When a known agent checks in with a mismatching API version we kick it out.
|
||||
# When an agent checks in with a mismatching API version, we log the
|
||||
# fact and drop its state, if any.
|
||||
if ( api_version != ClusterController::API::version )
|
||||
{
|
||||
ClusterController::Log::warning(
|
||||
fmt("instance %s/%s has checked in with incompatible API version %s, dropping",
|
||||
fmt("instance %s/%s has checked in with incompatible API version %s",
|
||||
instance, host, api_version));
|
||||
drop_instance(inst);
|
||||
|
||||
if ( instance in g_instances )
|
||||
drop_instance(g_instances[instance]);
|
||||
if ( instance in g_instances_known )
|
||||
delete g_instances_known[instance];
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ( instance !in g_instances_ready )
|
||||
add g_instances_known[instance];
|
||||
|
||||
if ( instance in g_instances && instance !in g_instances_ready )
|
||||
{
|
||||
ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host));
|
||||
add g_instances_ready[instance];
|
||||
# We need this instance for our cluster and have full context for
|
||||
# it from the configuration. Tell agent.
|
||||
local req = ClusterController::Request::create();
|
||||
|
||||
ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", instance));
|
||||
Broker::publish(ClusterAgent::topic_prefix + "/" + instance,
|
||||
ClusterAgent::API::agent_welcome_request, req$id);
|
||||
}
|
||||
}
|
||||
|
||||
Broker::publish(ClusterAgent::topic_prefix + "/" + instance,
|
||||
ClusterAgent::API::notify_controller_hello, ClusterController::name,
|
||||
to_addr(ClusterController::network_info()$address));
|
||||
event ClusterAgent::API::agent_welcome_response(reqid: string, result: ClusterController::Types::Result)
|
||||
{
|
||||
ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_response %s", reqid));
|
||||
|
||||
local req = ClusterController::Request::lookup(reqid);
|
||||
|
||||
if ( ClusterController::Request::is_null(req) )
|
||||
return;
|
||||
|
||||
ClusterController::Request::finish(req$id);
|
||||
|
||||
# An agent we've been waiting to hear back from is ready for cluster
|
||||
# work. Double-check we still want it, otherwise drop it.
|
||||
|
||||
if ( ! result$success || result$instance !in g_instances )
|
||||
{
|
||||
ClusterController::Log::info(fmt(
|
||||
"tx ClusterAgent::API::agent_standby_request to %s", result$instance));
|
||||
Broker::publish(ClusterAgent::topic_prefix + "/" + result$instance,
|
||||
ClusterAgent::API::agent_standby_request, "");
|
||||
return;
|
||||
}
|
||||
|
||||
add g_instances_ready[result$instance];
|
||||
ClusterController::Log::info(fmt("instance %s ready", result$instance));
|
||||
|
||||
check_instances_ready();
|
||||
}
|
||||
|
||||
event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node,
|
||||
|
@ -267,10 +332,10 @@ event ClusterAgent::API::set_configuration_response(reqid: string, result: Clust
|
|||
ClusterController::Request::finish(r$id);
|
||||
}
|
||||
|
||||
# This is the point where we're really done with the original
|
||||
# set_configuration request. We adopt the configuration as the current
|
||||
# one.
|
||||
# We're now done with the original set_configuration request.
|
||||
# Adopt the configuration as the current one.
|
||||
g_config_current = req$set_configuration_state$config;
|
||||
g_config_reqid_pending = "";
|
||||
|
||||
ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s",
|
||||
ClusterController::Request::to_string(req)));
|
||||
|
@ -302,6 +367,18 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
|
|||
return;
|
||||
}
|
||||
|
||||
# XXX validate the configuration:
|
||||
# - Are node instances among defined instances?
|
||||
# - Are all names unique?
|
||||
# - Are any node options understood?
|
||||
# - Do node types with optional fields have required values?
|
||||
# ...
|
||||
|
||||
# The incoming request is now the pending one. It gets cleared when all
|
||||
# agents have processed their config updates successfully, or their
|
||||
# responses time out.
|
||||
g_config_reqid_pending = req$id;
|
||||
|
||||
# Compare the instance configuration to our current one. If it matches,
|
||||
# we can proceed to deploying the new data cluster topology. If it does
|
||||
# not, we need to establish connectivity with agents we connect to, or
|
||||
|
@ -354,43 +431,27 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C
|
|||
if ( inst$name !in insts_to_keep )
|
||||
next;
|
||||
|
||||
# XXX 0.0.0.0 is a workaround until we've resolved
|
||||
# how agents that peer with us obtain their identity.
|
||||
# Broker ID?
|
||||
if ( ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host ) ||
|
||||
inst?$listen_port != g_instances[inst$name]?$listen_port ||
|
||||
( inst?$listen_port && g_instances[inst$name]?$listen_port &&
|
||||
inst$listen_port != g_instances[inst$name]$listen_port ) )
|
||||
if ( is_instance_connectivity_change(inst) )
|
||||
{
|
||||
# The endpoint looks different. We drop the current one
|
||||
# and need to re-establish connectivity.
|
||||
# and need to re-establish connectivity with the new
|
||||
# one.
|
||||
add insts_to_drop[inst$name];
|
||||
add insts_to_add[inst$name];
|
||||
}
|
||||
}
|
||||
|
||||
# Process our TODO lists.
|
||||
# Process our TODO lists. Handle drops first, then additions, in
|
||||
# case we need to re-establish connectivity with an agent.
|
||||
|
||||
for ( inst_name in insts_to_drop )
|
||||
drop_instance(g_instances[inst_name]);
|
||||
|
||||
for ( inst_name in insts_to_peer )
|
||||
add_instance(insts_to_peer[inst_name]);
|
||||
|
||||
# XXX validate the configuration:
|
||||
# - Are node instances among defined instances?
|
||||
# - Are all names unique?
|
||||
# - Are any node options understood?
|
||||
# - Do node types with optional fields have required values?
|
||||
# ...
|
||||
|
||||
# Track this config request globally until all of the agents required
|
||||
# for it have checked in. It gets cleared in the notify_agents_ready
|
||||
# event handler.
|
||||
g_config_reqid_pending = req$id;
|
||||
|
||||
# Special case: if the new request kept the set of instances identical,
|
||||
# trigger notify_agents_ready explicitly so we transmit the new config.
|
||||
if ( |insts_to_drop| == 0 && |insts_to_add| == 0 && |insts_to_keep| > 0 )
|
||||
event ClusterController::API::notify_agents_ready(insts_to_keep);
|
||||
# Updates to out instance tables are complete, now check if we're already
|
||||
# able to send the config to the agents:
|
||||
check_instances_ready();
|
||||
}
|
||||
|
||||
event ClusterController::API::get_instances_request(reqid: string)
|
||||
|
@ -419,6 +480,9 @@ event ClusterController::Request::request_expired(req: ClusterController::Reques
|
|||
|
||||
if ( req?$set_configuration_state )
|
||||
{
|
||||
# This timeout means we no longer have a pending request.
|
||||
g_config_reqid_pending = "";
|
||||
|
||||
res = ClusterController::Types::Result($reqid=req$id);
|
||||
res$success = F;
|
||||
res$error = "request timed out";
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue