diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek index 6263bf9d9d..f570a93786 100644 --- a/scripts/policy/frameworks/cluster/agent/api.zeek +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -8,14 +8,35 @@ export { # Agent API events + # The controller uses this event to convey a new cluster + # configuration to the agent. Once processed, the agent + # responds with the response event. global set_configuration_request: event(reqid: string, config: ClusterController::Types::Configuration); global set_configuration_response: event(reqid: string, result: ClusterController::Types::Result); + # The controller uses this event to confirm to the agent + # that it is part of the current cluster. The agent + # acknowledges with the response event. + global agent_welcome_request: event(reqid: string); + global agent_welcome_response: event(reqid: string, + result: ClusterController::Types::Result); + + # The controller sends this event to convey that the agent is not + # currently required. This status may later change, depending on + # updates from the client, so the peering can remain active. The + # agent releases any cluster-related resources when processing the + # request. + global agent_standby_request: event(reqid: string); + global agent_standby_response: event(reqid: string, + result: ClusterController::Types::Result); + # Notification events, agent -> controller - # Report agent being available. + # The agent sends this upon peering as a "check in", informing the + # controller that an agent of the given name is now available to + # communicate with. global notify_agent_hello: event(instance: string, host: addr, api_version: count); @@ -30,11 +51,4 @@ export { # Report informational message. global notify_log: event(instance: string, msg: string, node: string &default=""); - - # Notification events, controller -> agent - - # Confirmation from controller in response to notify_agent_hello - # that the agent is welcome. - global notify_controller_hello: event(controller: string, host: addr); - } diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek index 734a55a126..fd0bee533e 100644 --- a/scripts/policy/frameworks/cluster/agent/main.zeek +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -24,11 +24,6 @@ global g_nodes: table[string] of ClusterController::Types::Node; # new configurations. global g_data_cluster: table[string] of Supervisor::ClusterEndpoint; -# Whether we currenty keep notifying the controller that we're here. -# We stop once the controller responds back, and resume when we've -# lost the peering. -global g_notify_controller: bool = T; - event SupervisorControl::create_response(reqid: string, result: string) { @@ -169,25 +164,44 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste } } -event ClusterAgent::API::notify_controller_hello(controller: string, host: addr) +event ClusterAgent::API::agent_welcome_request(reqid: string) { - ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_controller_hello %s %s", controller, host)); - g_notify_controller = F; + ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_request %s", reqid)); + + local res = ClusterController::Types::Result( + $reqid = reqid, + $instance = ClusterAgent::name); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_response %s", + ClusterController::Types::result_to_string(res))); + event ClusterAgent::API::agent_welcome_response(reqid, res); } -event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) +event ClusterAgent::API::agent_standby_request(reqid: string) { - if ( g_notify_controller ) - schedule 1sec { ClusterAgent::API::notify_agent_hello(instance, host, api_version) }; + ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_standby_request %s", reqid)); + + # We shut down any existing cluster nodes via an empty configuration, + # and fall silent. We do not unpeer/disconnect (assuming we earlier + # peered/connected -- otherwise there's nothing we can do here via + # Broker anyway), mainly to keep open the possibility of running + # cluster nodes again later. + event ClusterAgent::API::set_configuration_request("", ClusterController::Types::Configuration()); + + local res = ClusterController::Types::Result( + $reqid = reqid, + $instance = ClusterAgent::name); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_response %s", + ClusterController::Types::result_to_string(res))); + event ClusterAgent::API::agent_standby_response(reqid, res); } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { # This does not (cannot?) immediately verify that the new peer - # is in fact a controller, so we might send this redundantly. - # Controllers handle the hello event accordingly. - - g_notify_controller = T; + # is in fact a controller, so we might send this in vain. + # Controllers register the agent upon receipt of the event. local epi = ClusterAgent::endpoint_info(); @@ -218,6 +232,9 @@ event zeek_init() # Auto-publish a bunch of events. Glob patterns or module-level # auto-publish would be helpful here. Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); + Broker::auto_publish(agent_topic, ClusterAgent::API::agent_welcome_response); + Broker::auto_publish(agent_topic, ClusterAgent::API::agent_standby_response); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello); Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change); Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error); diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek index a774bba119..c9df650e4c 100644 --- a/scripts/policy/frameworks/cluster/controller/main.zeek +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -12,23 +12,36 @@ module ClusterController::Runtime; redef ClusterController::role = ClusterController::Types::CONTROLLER; -global check_instances_ready: function(insts: set[string], tc: TableChange, inst: string); +global check_instances_ready: function(); global add_instance: function(inst: ClusterController::Types::Instance); global drop_instance: function(inst: ClusterController::Types::Instance); global null_config: function(): ClusterController::Types::Configuration; global is_null_config: function(config: ClusterController::Types::Configuration): bool; -# The desired set of agents the controller interact with, as provided by the -# most recent config update sent by the client. They key is a name of each -# instance. This should match the $name member of the instance records. +# Checks whether the given instance is one that we know with different +# communication settings: a a different peering direction, a different listening +# port, etc. Used as a predicate to indicate when we need to drop the existing +# one from our internal state. +global is_instance_connectivity_change: function + (inst: ClusterController::Types::Instance): bool; + +# The set of agents the controller interacts with to manage to currently +# configured cluster. This may be a subset of all the agents known to the +# controller, as tracked by the g_instances_known set. They key is the instance +# name and should match the $name member of the corresponding instance record. global g_instances: table[string] of ClusterController::Types::Instance = table(); +# The set of instances that have checked in with the controller. This is a +# superset of g_instances, since it covers any agent that has sent us a +# notify_agent_hello event. +global g_instances_known: set[string] = set(); + # A corresponding set of instances/agents that we track in order to understand -# when all of the above instances have checked in with a notify_agent_hello -# event. (An alternative would be to use a record that adds a single state bit -# for each instance, and store that above.) -global g_instances_ready: set[string] = set() &on_change=check_instances_ready; +# when all of the above instances have sent agent_welcome_response events. (An +# alternative would be to use a record that adds a single state bit for each +# instance, and store that above.) +global g_instances_ready: set[string] = set(); # The request ID of the most recent configuration update that's come in from # a client. We track it here until we know we are ready to communicate with all @@ -44,6 +57,9 @@ function send_config_to_agents(req: ClusterController::Request::Request, { for ( name in g_instances ) { + if ( name !in g_instances_ready ) + next; + local agent_topic = ClusterAgent::topic_prefix + "/" + name; local areq = ClusterController::Request::create(); areq$parent_id = req$id; @@ -60,23 +76,17 @@ function send_config_to_agents(req: ClusterController::Request::Request, } } -# This is the &on_change handler for the g_instances_ready set. -function check_instances_ready(insts: set[string], tc: TableChange, inst: string) +# This is the &on_change handler for the g_instances_ready set, meaning +# it runs whenever a required agent has confirmed it's ready. +function check_instances_ready() { local cur_instances: set[string]; - # See if the new update to the readiness set makes it match the current - # instances. If so, trigger the notify_agents_ready event. - if ( tc == TABLE_ELEMENT_NEW || tc == TABLE_ELEMENT_REMOVED ) - { - for ( inst in g_instances ) - add cur_instances[inst]; + for ( inst in g_instances ) + add cur_instances[inst]; - if ( cur_instances == g_instances_ready ) - { - event ClusterController::API::notify_agents_ready(cur_instances); - } - } + if ( cur_instances == g_instances_ready ) + event ClusterController::API::notify_agents_ready(cur_instances); } function add_instance(inst: ClusterController::Types::Instance) @@ -86,6 +96,20 @@ function add_instance(inst: ClusterController::Types::Instance) if ( inst?$listen_port ) Broker::peer(cat(inst$host), inst$listen_port, ClusterController::connect_retry); + + if ( inst$name in g_instances_known ) + { + # The agent has already peered with us. Send welcome to indicate + # it's part of cluster management. Once it responds, we update + # the set of ready instances and proceed as feasible with config + # deployments. + + local req = ClusterController::Request::create(); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", inst$name)); + Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, + ClusterAgent::API::agent_welcome_request, req$id); + } } function drop_instance(inst: ClusterController::Types::Instance) @@ -93,22 +117,19 @@ function drop_instance(inst: ClusterController::Types::Instance) if ( inst$name !in g_instances ) return; - # Send this agent a config that will terminate any data cluster - # nodes it might have. This is "fire and forget" -- there will - # not be a response. + # Send the agent a standby so it shuts down its cluster nodes & state + ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_standby_request to %s", inst$name)); Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, - ClusterAgent::API::set_configuration_request, - "", ClusterController::Types::Configuration()); - - # If the instance has a port, we peered with it, so now unpeer. - if ( inst?$listen_port ) - Broker::unpeer(cat(inst$host), inst$listen_port ); + ClusterAgent::API::agent_standby_request, ""); delete g_instances[inst$name]; if ( inst$name in g_instances_ready ) delete g_instances_ready[inst$name]; + # The agent remains in g_instances_known, to track that we're able + # to communicate with it in case it's required again. + ClusterController::Log::info(fmt("dropped instance %s", inst$name)); } @@ -122,69 +143,113 @@ function is_null_config(config: ClusterController::Types::Configuration): bool return config$id == ""; } +function is_instance_connectivity_change(inst: ClusterController::Types::Instance): bool + { + # If we're not tracking this instance as part of a cluster config, it's + # not a change. (More precisely: we cannot say whether it's changed.) + if ( inst$name !in g_instances ) + return F; + + # The agent has peered with us and now uses a different host. + # XXX 0.0.0.0 is a workaround until we've resolved how agents that peer + # with us obtain their identity. Broker ID? + if ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host ) + return T; + + # The agent has a listening port and the one we know does not, or vice + # versa. I.e., this is a change in the intended peering direction. + if ( inst?$listen_port != g_instances[inst$name]?$listen_port ) + return T; + + # Both have listening ports, but they differ. + if ( inst?$listen_port && g_instances[inst$name]?$listen_port && + inst$listen_port != g_instances[inst$name]$listen_port ) + return T; + + return F; + } + event ClusterController::API::notify_agents_ready(instances: set[string]) { local insts = ClusterController::Util::set_to_vector(instances); ClusterController::Log::info(fmt("rx ClusterController::API:notify_agents_ready %s", join_string_vec(insts, ","))); - # When all agents are ready, send them the pending config update. local req = ClusterController::Request::lookup(g_config_reqid_pending); - # If the request isn't available or doesn't have config state, we just - # clear it out and stop. + # If there's no pending request, when it's no longer available, or it + # doesn't have config state, don't do anything else. if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state ) - { - g_config_reqid_pending = ""; return; - } # All instances requested in the pending configuration update are now # known to us. Send them the config. As they send their response events # we update the client's request state and eventually send the response # event to the it. send_config_to_agents(req, req$set_configuration_state$config); - - # The request object continues to exist and will be referenced by the - # responses coming in, but there's no longer a pending config update to - # track. - g_config_reqid_pending = ""; } event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) { - if ( instance !in g_instances ) - { - # An unknown agent has checked in. This can happen with agentsthat aren't yet - # showing in a configuration received by the client. We log at debug level only. - ClusterController::Log::debug( - fmt("unknown instance %s/%s has checked in, ignoring", instance, host)); - return; - } - ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_agent_hello %s %s", instance, host)); - local inst = g_instances[instance]; - - # When a known agent checks in with a mismatching API version we kick it out. + # When an agent checks in with a mismatching API version, we log the + # fact and drop its state, if any. if ( api_version != ClusterController::API::version ) { ClusterController::Log::warning( - fmt("instance %s/%s has checked in with incompatible API version %s, dropping", + fmt("instance %s/%s has checked in with incompatible API version %s", instance, host, api_version)); - drop_instance(inst); + + if ( instance in g_instances ) + drop_instance(g_instances[instance]); + if ( instance in g_instances_known ) + delete g_instances_known[instance]; + return; } - if ( instance !in g_instances_ready ) + add g_instances_known[instance]; + + if ( instance in g_instances && instance !in g_instances_ready ) { - ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host)); - add g_instances_ready[instance]; + # We need this instance for our cluster and have full context for + # it from the configuration. Tell agent. + local req = ClusterController::Request::create(); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::agent_welcome_request to %s", instance)); + Broker::publish(ClusterAgent::topic_prefix + "/" + instance, + ClusterAgent::API::agent_welcome_request, req$id); + } + } + +event ClusterAgent::API::agent_welcome_response(reqid: string, result: ClusterController::Types::Result) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::agent_welcome_response %s", reqid)); + + local req = ClusterController::Request::lookup(reqid); + + if ( ClusterController::Request::is_null(req) ) + return; + + ClusterController::Request::finish(req$id); + + # An agent we've been waiting to hear back from is ready for cluster + # work. Double-check we still want it, otherwise drop it. + + if ( ! result$success || result$instance !in g_instances ) + { + ClusterController::Log::info(fmt( + "tx ClusterAgent::API::agent_standby_request to %s", result$instance)); + Broker::publish(ClusterAgent::topic_prefix + "/" + result$instance, + ClusterAgent::API::agent_standby_request, ""); + return; } - Broker::publish(ClusterAgent::topic_prefix + "/" + instance, - ClusterAgent::API::notify_controller_hello, ClusterController::name, - to_addr(ClusterController::network_info()$address)); + add g_instances_ready[result$instance]; + ClusterController::Log::info(fmt("instance %s ready", result$instance)); + + check_instances_ready(); } event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node, @@ -267,10 +332,10 @@ event ClusterAgent::API::set_configuration_response(reqid: string, result: Clust ClusterController::Request::finish(r$id); } - # This is the point where we're really done with the original - # set_configuration request. We adopt the configuration as the current - # one. + # We're now done with the original set_configuration request. + # Adopt the configuration as the current one. g_config_current = req$set_configuration_state$config; + g_config_reqid_pending = ""; ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", ClusterController::Request::to_string(req))); @@ -302,6 +367,18 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C return; } + # XXX validate the configuration: + # - Are node instances among defined instances? + # - Are all names unique? + # - Are any node options understood? + # - Do node types with optional fields have required values? + # ... + + # The incoming request is now the pending one. It gets cleared when all + # agents have processed their config updates successfully, or their + # responses time out. + g_config_reqid_pending = req$id; + # Compare the instance configuration to our current one. If it matches, # we can proceed to deploying the new data cluster topology. If it does # not, we need to establish connectivity with agents we connect to, or @@ -354,43 +431,27 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C if ( inst$name !in insts_to_keep ) next; - # XXX 0.0.0.0 is a workaround until we've resolved - # how agents that peer with us obtain their identity. - # Broker ID? - if ( ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host ) || - inst?$listen_port != g_instances[inst$name]?$listen_port || - ( inst?$listen_port && g_instances[inst$name]?$listen_port && - inst$listen_port != g_instances[inst$name]$listen_port ) ) + if ( is_instance_connectivity_change(inst) ) { # The endpoint looks different. We drop the current one - # and need to re-establish connectivity. + # and need to re-establish connectivity with the new + # one. add insts_to_drop[inst$name]; + add insts_to_add[inst$name]; } } - # Process our TODO lists. + # Process our TODO lists. Handle drops first, then additions, in + # case we need to re-establish connectivity with an agent. + for ( inst_name in insts_to_drop ) drop_instance(g_instances[inst_name]); - for ( inst_name in insts_to_peer ) add_instance(insts_to_peer[inst_name]); - # XXX validate the configuration: - # - Are node instances among defined instances? - # - Are all names unique? - # - Are any node options understood? - # - Do node types with optional fields have required values? - # ... - - # Track this config request globally until all of the agents required - # for it have checked in. It gets cleared in the notify_agents_ready - # event handler. - g_config_reqid_pending = req$id; - - # Special case: if the new request kept the set of instances identical, - # trigger notify_agents_ready explicitly so we transmit the new config. - if ( |insts_to_drop| == 0 && |insts_to_add| == 0 && |insts_to_keep| > 0 ) - event ClusterController::API::notify_agents_ready(insts_to_keep); + # Updates to out instance tables are complete, now check if we're already + # able to send the config to the agents: + check_instances_ready(); } event ClusterController::API::get_instances_request(reqid: string) @@ -419,6 +480,9 @@ event ClusterController::Request::request_expired(req: ClusterController::Reques if ( req?$set_configuration_state ) { + # This timeout means we no longer have a pending request. + g_config_reqid_pending = ""; + res = ClusterController::Types::Result($reqid=req$id); res$success = F; res$error = "request timed out";