diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek index a5334fbbef..6263bf9d9d 100644 --- a/scripts/policy/frameworks/cluster/agent/api.zeek +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -30,4 +30,11 @@ export { # Report informational message. global notify_log: event(instance: string, msg: string, node: string &default=""); + + # Notification events, controller -> agent + + # Confirmation from controller in response to notify_agent_hello + # that the agent is welcome. + global notify_controller_hello: event(controller: string, host: addr); + } diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek index b139d6677a..a70c7c6269 100644 --- a/scripts/policy/frameworks/cluster/agent/main.zeek +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -22,6 +22,12 @@ global g_nodes: table[string] of ClusterController::Types::Node; # new configurations. global g_data_cluster: table[string] of Supervisor::ClusterEndpoint; +# Whether we currenty keep notifying the controller that we're here. +# We stop once the controller responds back, and resume when we've +# lost the peering. +global g_notify_controller: bool = T; + + event SupervisorControl::create_response(reqid: string, result: string) { local req = ClusterController::Request::lookup(reqid); @@ -161,14 +167,28 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste } } +event ClusterAgent::API::notify_controller_hello(controller: string, host: addr) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_controller_hello %s %s", controller, host)); + g_notify_controller = F; + } + +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + if ( g_notify_controller ) + schedule 1sec { ClusterAgent::API::notify_agent_hello(instance, host, api_version) }; + } + event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) { # This does not (cannot?) immediately verify that the new peer # is in fact a controller, so we might send this redundantly. # Controllers handle the hello event accordingly. + g_notify_controller = T; + local epi = ClusterAgent::endpoint_info(); - # XXX deal with unexpected peers, unless we're okay with it + event ClusterAgent::API::notify_agent_hello(epi$id, to_addr(epi$network$address), ClusterAgent::API::version); } diff --git a/scripts/policy/frameworks/cluster/controller/api.zeek b/scripts/policy/frameworks/cluster/controller/api.zeek index 4d3e1ba70d..d742a332ef 100644 --- a/scripts/policy/frameworks/cluster/controller/api.zeek +++ b/scripts/policy/frameworks/cluster/controller/api.zeek @@ -5,6 +5,10 @@ module ClusterController::API; export { const version = 1; + # Triggered when the operational instances align with desired ones, as + # defined by the latest cluster config sent by the client. + global notify_agents_ready: event(instances: set[string]); + global get_instances_request: event(reqid: string); global get_instances_response: event(reqid: string, instances: vector of ClusterController::Types::Instance); diff --git a/scripts/policy/frameworks/cluster/controller/config.zeek b/scripts/policy/frameworks/cluster/controller/config.zeek index f889512fec..5605bd39a6 100644 --- a/scripts/policy/frameworks/cluster/controller/config.zeek +++ b/scripts/policy/frameworks/cluster/controller/config.zeek @@ -28,13 +28,6 @@ export { # The controller listens for messages on this topic: const topic = "zeek/cluster-control/controller" &redef; - # The set of agents to interact with. When this is non-empty - # at startup, the controller contacts the agents; when it is - # empty, it waits for agents to connect. They key is a name of - # each instance. This should match the $name member of the - # instance records. - const instances: table[string] of ClusterController::Types::Instance = { } &redef; - # The role of this node in cluster management. Agent and # controller both redef this. Used during logging. const role = ClusterController::Types::NONE &redef; diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek index 5c8c57a057..fb01f72628 100644 --- a/scripts/policy/frameworks/cluster/controller/main.zeek +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -6,16 +6,35 @@ @load ./api @load ./log @load ./request +@load ./util redef ClusterController::role = ClusterController::Types::CONTROLLER; -global g_config_current: ClusterController::Types::Configuration; +global check_instances_ready: function(insts: set[string], tc: TableChange, inst: string); +global add_instance: function(inst: ClusterController::Types::Instance); +global drop_instance: function(inst: ClusterController::Types::Instance); + +# The desired set of agents the controller interact with, as provided by the +# most recent config update sent by the client. They key is a name of each +# instance. This should match the $name member of the instance records. +global g_instances: table[string] of ClusterController::Types::Instance = table(); + +# A corresponding set of instances/agents that we track in order to understand +# when all of the above instances have checked in with a notify_agent_hello +# event. (An alternative would be to use a record that adds a single state bit +# for each instance, and store that above.) +global g_instances_ready: set[string] = set() &on_change=check_instances_ready; + +# The request ID of the most recent configuration update that's come in from +# a client. We track it here until we know we are ready to communicate with all +# agents required by the update. global g_config_reqid_pending: string = ""; + function send_config_to_agents(req: ClusterController::Request::Request, config: ClusterController::Types::Configuration) { - for ( name in ClusterController::instances ) + for ( name in g_instances ) { local agent_topic = ClusterAgent::topic_prefix + "/" + name; local areq = ClusterController::Request::create(); @@ -26,100 +45,129 @@ function send_config_to_agents(req: ClusterController::Request::Request, # and once all are, we respond back to the client. req$set_configuration_state$requests += areq; - # XXX could also broadcast just once on the agent prefix, but + # We could also broadcast just once on the agent prefix, but # explicit request/response pairs for each agent seems cleaner. ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", areq$id, name)); Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config); } } +# This is the &on_change handler for the g_instances_ready set. +function check_instances_ready(insts: set[string], tc: TableChange, inst: string) + { + local cur_instances: set[string]; + + # See if the new update to the readiness set makes it match the current + # instances. If so, trigger the notify_agents_ready event. + if ( tc == TABLE_ELEMENT_NEW || tc == TABLE_ELEMENT_REMOVED ) + { + for ( inst in g_instances ) + add cur_instances[inst]; + + if ( cur_instances == g_instances_ready ) + { + event ClusterController::API::notify_agents_ready(cur_instances); + } + } + } + +function add_instance(inst: ClusterController::Types::Instance) + { + g_instances[inst$name] = inst; + + if ( inst?$listen_port ) + Broker::peer(cat(inst$host), inst$listen_port, + ClusterController::connect_retry); + } + function drop_instance(inst: ClusterController::Types::Instance) { - if ( inst$name in ClusterController::instances ) - { - # Send this agent a config that will terminate any data cluster - # nodes it might have. This is "fire and forget" -- there will - # not be a response. - Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, - ClusterAgent::API::set_configuration_request, - "", ClusterController::Types::Configuration()); + if ( inst$name !in g_instances ) + return; - delete ClusterController::instances[inst$name]; - } + # Send this agent a config that will terminate any data cluster + # nodes it might have. This is "fire and forget" -- there will + # not be a response. + Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, + ClusterAgent::API::set_configuration_request, + "", ClusterController::Types::Configuration()); # If the instance has a port, we peered with it, so now unpeer. if ( inst?$listen_port ) Broker::unpeer(cat(inst$host), inst$listen_port ); + delete g_instances[inst$name]; + + if ( inst$name in g_instances_ready ) + delete g_instances_ready[inst$name]; + ClusterController::Log::info(fmt("dropped instance %s", inst$name)); } -event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) +event ClusterController::API::notify_agents_ready(instances: set[string]) { - # See if we already know about this agent; if not, register - # it. - # - # XXX protection against rogue agents? + local insts = ClusterController::Util::set_to_vector(instances); - local inst: ClusterController::Types::Instance; - - if ( instance in ClusterController::instances ) - { - # Do nothing, unless this known agent checks in with a mismatching - # API version, in which case we kick it out. - if ( api_version != ClusterController::API::version ) - drop_instance(ClusterController::instances[instance]); - - # Update the instance name in the pointed-to record, in case it - # was previously named otherwise. Not being too picky here allows - # the user some leeway in spelling out the original config. - ClusterController::instances[instance]$name = instance; - return; - } - - if ( api_version != ClusterController::API::version ) - { - ClusterController::Log::warning( - fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), ignoring", - instance, host, api_version, ClusterController::API::version)); - return; - } - - ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host); - ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host)); - - # If we have a pending configuration request, check in on it now to see whether - # we have all agents required, and finalize the config request. - if ( g_config_reqid_pending == "" ) - return; + ClusterController::Log::info(fmt("rx ClusterController::API:notify_agents_ready %s", join_string_vec(insts, ","))); + # When all agents are ready, send them the pending config update. local req = ClusterController::Request::lookup(g_config_reqid_pending); + # If the request isn't available or doesn't have config state, we just + # clear it out and stop. if ( ClusterController::Request::is_null(req) || ! req?$set_configuration_state ) { - # Odd, just clear out pending state. g_config_reqid_pending = ""; return; } - for ( inst in req$set_configuration_state$config$instances ) - { - if ( inst$name !in ClusterController::instances ) - { - # The request still has instances not known to us, try again - # later. - return; - } - } - - # All instances requested in the configuration are now known to us. - # Send them the config. As they send their response events we - # update the request state and eventually send the response event - # to the client. + # All instances requested in the pending configuration update are now + # known to us. Send them the config. As they send their response events + # we update the client's request state and eventually send the response + # event to the it. send_config_to_agents(req, req$set_configuration_state$config); + + # The request object continues to exist and will be referenced by the + # responses coming in, but there's no longer a pending config update to + # track. g_config_reqid_pending = ""; } +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + if ( instance !in g_instances ) + { + # An unknown agent has checked in. This can happen with agentsthat aren't yet + # showing in a configuration received by the client. We log at debug level only. + ClusterController::Log::debug( + fmt("unknown instance %s/%s has checked in, ignoring", instance, host)); + return; + } + + ClusterController::Log::info(fmt("rx ClusterAgent::API::notify_agent_hello %s %s", instance, host)); + + local inst = g_instances[instance]; + + # When a known agent checks in with a mismatching API version we kick it out. + if ( api_version != ClusterController::API::version ) + { + ClusterController::Log::warning( + fmt("instance %s/%s has checked in with incompatible API version %s, dropping", + instance, host, api_version)); + drop_instance(inst); + return; + } + + if ( instance !in g_instances_ready ) + { + ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host)); + add g_instances_ready[instance]; + } + + Broker::publish(ClusterAgent::topic_prefix + "/" + instance, + ClusterAgent::API::notify_controller_hello, ClusterController::name, + to_addr(ClusterController::network_info()$address)); + } event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node, old: ClusterController::Types::State, @@ -213,6 +261,7 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C local res: ClusterController::Types::Result; local req = ClusterController::Request::create(reqid); + req$set_configuration_state = ClusterController::Request::SetConfigurationState($config = config); # At the moment there can only be one pending request. @@ -230,91 +279,78 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C return; } - # Compare the instance configuration to our current one. For instances - # that are supposed to be checking in with us but have not, record - # errors. We will fail the request in those cases. For instances we - # don't know and are supposed to connect to, do so next. + # Compare the instance configuration to our current one. If it matches, + # we can proceed to deploying the new data cluster topology. If it does + # not, we need to establish connectivity with agents we connect to, or + # wait until all instances that connect to us have done so. Either triggers + # a notify_agents_ready event, upon which we then deploy the data cluster. + + # The current & new set of instance names. + local insts_current: set[string]; + local insts_new: set[string]; + + # A set of current instances not contained in the new config. + # Those will need to get dropped. + local insts_to_drop: set[string]; + + # The opposite: new instances not yet in our current set. Those we will need + # to establish contact with (or they with us). + local insts_to_add: set[string]; + + # The overlap: instances in both the current and new set. For those we verify + # that we're actually dealign with the same entities, and might need to re- + # connect if not. + local insts_to_keep: set[string]; + + # Alternative representation of insts_to_add, directly providing the instances. + local insts_to_peer: table[string] of ClusterController::Types::Instance; + + # Helpful locals. local inst_name: string; local inst: ClusterController::Types::Instance; - local insts_to_add: ClusterController::Types::InstanceVec; - # A set of instances not contained in the config, that therefore needs - # to be dropped. - local insts_to_drop: set[string] = {}; + for ( inst_name in g_instances ) + add insts_current[inst_name]; + for ( inst in config$instances ) + add insts_new[inst$name]; - for ( inst_name in ClusterController::instances ) - add insts_to_drop[inst_name]; + # Populate TODO lists for instances we need to drop, check, or add. + insts_to_drop = insts_current - insts_new; + insts_to_add = insts_new - insts_current; + insts_to_keep = insts_new & insts_current; for ( inst in config$instances ) { - if ( inst$name in ClusterController::instances ) + if ( inst$name in insts_to_add ) { - # Verify that it's actually the same endpoint. If not, kick out - # the old one. XXX 0.0.0.0 is a workaround until we've resolved - # how agents that peer with us obtain their identity. - if ( ( inst$host != 0.0.0.0 && inst$host != ClusterController::instances[inst$name]$host ) || - inst?$listen_port != ClusterController::instances[inst$name]?$listen_port || - ( inst?$listen_port && ClusterController::instances[inst$name]?$listen_port && - inst$listen_port != ClusterController::instances[inst$name]$listen_port ) ) - { - drop_instance(ClusterController::instances[inst$name]); - } - else - { - # We know this instance, don't drop it below. - delete insts_to_drop[inst$name]; - } + insts_to_peer[inst$name] = inst; + next; } - if ( inst$name !in ClusterController::instances ) + # Focus on the keepers: check for change in identity/location. + if ( inst$name !in insts_to_keep ) + next; + + # XXX 0.0.0.0 is a workaround until we've resolved + # how agents that peer with us obtain their identity. + # Broker ID? + if ( ( inst$host != 0.0.0.0 && inst$host != g_instances[inst$name]$host ) || + inst?$listen_port != g_instances[inst$name]?$listen_port || + ( inst?$listen_port && g_instances[inst$name]?$listen_port && + inst$listen_port != g_instances[inst$name]$listen_port ) ) { - # It's an instance we don't currently know about. - if ( ! inst?$listen_port ) - { - # If a requested instance doesn't have a listen port and isn't known - # to us, we have no way to establish connectivity. We reject the - # request. - res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name); - res$success = F; - res$error = fmt("instance %s is unknown", inst$name); - req$results += res; - } - else - { - # We'll need to connect to this instance. - insts_to_add[|insts_to_add|] = inst; - } + # The endpoint looks different. We drop the current one + # and need to re-establish connectivity. + add insts_to_drop[inst$name]; } } - # An error at this point means that we're rejecting this request. - if ( |req$results| > 0 ) - { - ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", - ClusterController::Request::to_string(req))); - event ClusterController::API::set_configuration_response(req$id, req$results); - ClusterController::Request::finish(req$id); - return; - } - + # Process our TODO lists. for ( inst_name in insts_to_drop ) - drop_instance(ClusterController::instances[inst_name]); + drop_instance(g_instances[inst_name]); - # We have instances to connect to, so initiate peering and stop for now. - # Processing will continue as the agents check in. That's also when they - # get added to ClusterController::instances. - if ( |insts_to_add| > 0 ) - { - for ( idx in insts_to_add ) - { - inst = insts_to_add[idx]; - Broker::peer(cat(inst$host), inst$listen_port, - ClusterController::connect_retry); - } - - g_config_reqid_pending = req$id; - return; - } + for ( inst_name in insts_to_peer ) + add_instance(insts_to_peer[inst_name]); # XXX validate the configuration: # - Are node instances among defined instances? @@ -323,8 +359,15 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C # - Do node types with optional fields have required values? # ... - # Response event gets sent via the agents' reponse event. - send_config_to_agents(req, config); + # Track this config request globally until all of the agents required + # for it have checked in. It gets cleared in the notify_agents_ready + # event handler. + g_config_reqid_pending = req$id; + + # Special case: if the new request kept the set of instances identical, + # trigger notify_agents_ready explicitly so we transmit the new config. + if ( |insts_to_drop| == 0 && |insts_to_add| == 0 && |insts_to_keep| > 0 ) + event ClusterController::API::notify_agents_ready(insts_to_keep); } event ClusterController::API::get_instances_request(reqid: string) @@ -333,8 +376,8 @@ event ClusterController::API::get_instances_request(reqid: string) local insts: vector of ClusterController::Types::Instance; - for ( i in ClusterController::instances ) - insts += ClusterController::instances[i]; + for ( i in g_instances ) + insts += g_instances[i]; ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid)); event ClusterController::API::get_instances_response(reqid, insts); @@ -342,42 +385,25 @@ event ClusterController::API::get_instances_request(reqid: string) event zeek_init() { - # Controller always listens -- it needs to be able to respond - # to the Zeek client. This port is also used by the agents - # if they connect to the client. + # The controller always listens -- it needs to be able to respond to the + # Zeek client. This port is also used by the agents if they connect to + # the client. The client doesn't automatically establish or accept + # connectivity to agents: agents are defined and communicated with as + # defined via configurations defined by the client. + local cni = ClusterController::network_info(); + Broker::listen(cat(cni$address), cni$bound_port); Broker::subscribe(ClusterAgent::topic_prefix); Broker::subscribe(ClusterController::topic); + # Events sent to the client: + Broker::auto_publish(ClusterController::topic, ClusterController::API::get_instances_response); Broker::auto_publish(ClusterController::topic, ClusterController::API::set_configuration_response); - if ( |ClusterController::instances| > 0 ) - { - # We peer with the agents -- otherwise, the agents peer - # with (i.e., connect to) us. - for ( i in ClusterController::instances ) - { - local inst = ClusterController::instances[i]; - - if ( ! inst?$listen_port ) - { - # XXX config error -- this must be there - next; - } - - Broker::peer(cat(inst$host), inst$listen_port, - ClusterController::connect_retry); - } - } - - # If ClusterController::instances is empty, agents peer with - # us and we do nothing. We'll build up state as the - # notify_agent_hello() events come int. - ClusterController::Log::info("controller is live"); }