diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek index d33decc732..7669942585 100644 --- a/scripts/policy/frameworks/cluster/agent/main.zeek +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -149,12 +149,16 @@ event ClusterAgent::API::set_configuration_request(reqid: string, config: Cluste # events asynchonously. The only indication of error will be # notification events to the controller. - local res = ClusterController::Types::Result( - $reqid = reqid, - $instance = ClusterAgent::name); + if ( reqid != "" ) + { + local res = ClusterController::Types::Result( + $reqid = reqid, + $instance = ClusterAgent::name); - ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s", reqid)); - event ClusterAgent::API::set_configuration_response(reqid, res); + ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s", + ClusterController::Types::result_to_string(res))); + event ClusterAgent::API::set_configuration_response(reqid, res); + } } event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek index 3ba14e446a..11e88fe3fd 100644 --- a/scripts/policy/frameworks/cluster/controller/main.zeek +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -33,6 +33,27 @@ function send_config_to_agents(req: ClusterController::Request::Request, } } +function drop_instance(inst: ClusterController::Types::Instance) + { + if ( inst$name in ClusterController::instances ) + { + # Send this agent a config that will terminate any data cluster + # nodes it might have. This is "fire and forget" -- there will + # not be a response. + Broker::publish(ClusterAgent::topic_prefix + "/" + inst$name, + ClusterAgent::API::set_configuration_request, + "", ClusterController::Types::Configuration()); + + delete ClusterController::instances[inst$name]; + } + + # If the instance has a port, we peered with it, so now unpeer. + if ( inst?$listen_port ) + Broker::unpeer(cat(inst$host), inst$listen_port ); + + ClusterController::Log::info(fmt("dropped instance %s", inst$name)); + } + event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) { # See if we already know about this agent; if not, register @@ -47,16 +68,7 @@ event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_ve # Do nothing, unless this known agent checks in with a mismatching # API version, in which case we kick it out. if ( api_version != ClusterController::API::version ) - { - inst = ClusterController::instances[instance]; - if ( inst?$listen_port ) - { - # We peered with this instance, unpeer. - Broker::unpeer(cat(inst$host), inst$listen_port ); - # XXX what to do if they connected to us? - } - delete ClusterController::instances[instance]; - } + drop_instance(ClusterController::instances[instance]); # Update the instance name in the pointed-to record, in case it # was previously named otherwise. Not being too picky here allows @@ -222,13 +234,41 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C # that are supposed to be checking in with us but have not, record # errors. We will fail the request in those cases. For instances we # don't know and are supposed to connect to, do so next. + local inst_name: string; local inst: ClusterController::Types::Instance; - local insts_todo: ClusterController::Types::InstanceVec; + local insts_to_add: ClusterController::Types::InstanceVec; + + # A set of instances not contained in the config, that therefore needs + # to be dropped. + local insts_to_drop: set[string] = {}; + + for ( inst_name in ClusterController::instances ) + add insts_to_drop[inst_name]; for ( inst in config$instances ) { + if ( inst$name in ClusterController::instances ) + { + # Verify that it's actually the same endpoint. If not, kick out + # the old one. XXX 0.0.0.0 is a workaround until we've resolved + # how agents that peer with us obtain their identity. + if ( ( inst$host != 0.0.0.0 && inst$host != ClusterController::instances[inst$name]$host ) || + inst?$listen_port != ClusterController::instances[inst$name]?$listen_port || + ( inst?$listen_port && ClusterController::instances[inst$name]?$listen_port && + inst$listen_port != ClusterController::instances[inst$name]$listen_port ) ) + { + drop_instance(ClusterController::instances[inst$name]); + } + else + { + # We know this instance, don't drop it below. + delete insts_to_drop[inst$name]; + } + } + if ( inst$name !in ClusterController::instances ) { + # It's an instance we don't currently know about. if ( ! inst?$listen_port ) { # If a requested instance doesn't have a listen port and isn't known @@ -242,7 +282,7 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C else { # We'll need to connect to this instance. - insts_todo[|insts_todo|] = inst; + insts_to_add[|insts_to_add|] = inst; } } } @@ -257,14 +297,17 @@ event ClusterController::API::set_configuration_request(reqid: string, config: C return; } - # We have instances to connect to, so initiate peering and - # stop for now. Processing will continue as the agents check - # in. - if ( |insts_todo| > 0 ) + for ( inst_name in insts_to_drop ) + drop_instance(ClusterController::instances[inst_name]); + + # We have instances to connect to, so initiate peering and stop for now. + # Processing will continue as the agents check in. That's also when they + # get added to ClusterController::instances. + if ( |insts_to_add| > 0 ) { - for ( idx in insts_todo ) + for ( idx in insts_to_add ) { - inst = insts_todo[idx]; + inst = insts_to_add[idx]; Broker::peer(cat(inst$host), inst$listen_port, ClusterController::connect_retry); }