mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Management framework: support auto-assignment of ports in cluster nodes
This enables the controller to assign listening ports to managers, loggers, and proxies. (We don't currently make the workers listen.) The feature is controlled by the Management::Controller::auto_assign_ports flag. When enabled (the default), enumeration starts from Management::Controller::auto_assign_start_port, beginning with the manager, then the logger(s), then proxy(s). When the feature is disabled and nodes that require a port lack it, the controller rejects the configuration.
This commit is contained in:
parent
48a858f5db
commit
7a471df1a1
2 changed files with 116 additions and 0 deletions
|
@ -39,6 +39,17 @@ export {
|
|||
## remains empty.
|
||||
const default_port = 2150/tcp &redef;
|
||||
|
||||
## Whether the controller should auto-assign listening ports to cluster
|
||||
## nodes that need them and don't have them explicitly specified in
|
||||
## cluster configurations.
|
||||
const auto_assign_ports = T &redef;
|
||||
|
||||
## The TCP start port to use for auto-assigning cluster node listening
|
||||
## ports, if :zeek:see:`Management::Controller::auto_assign_ports` is
|
||||
## enabled (the default) and the provided configurations don't have
|
||||
## ports assigned.
|
||||
const auto_assign_start_port = 2200/tcp &redef;
|
||||
|
||||
## The controller's Broker topic. Clients send requests to this topic.
|
||||
const topic = "zeek/management/controller" &redef;
|
||||
|
||||
|
|
|
@ -93,6 +93,22 @@ global drop_instance: function(inst: Management::Instance);
|
|||
global null_config: function(): Management::Configuration;
|
||||
global is_null_config: function(config: Management::Configuration): bool;
|
||||
|
||||
# Returns list of names of nodes in the given configuration that require a
|
||||
# listening port. Returns empty list if the config has no such nodes.
|
||||
global config_nodes_lacking_ports: function(config: Management::Configuration): vector of string;
|
||||
|
||||
# Assign node listening ports in the given configuration by counting up from
|
||||
# Management::Controller::auto_assign_start_port. Scans the included nodes and
|
||||
# fills in ports for any non-worker cluster node that doesn't have an existing
|
||||
# port. This assumes those ports are actually available on the instances.
|
||||
global config_assign_ports: function(config: Management::Configuration);
|
||||
|
||||
# Rejects the given configuration with the given error message. The function
|
||||
# adds a non-success result record to the given request and send the
|
||||
# set_configuration_response event back to the client. It does not call finish()
|
||||
# on the request.
|
||||
global send_set_configuration_response_error: function(req: Management::Request::Request, error: string);
|
||||
|
||||
# Given a Broker ID, this returns the endpoint info associated with it.
|
||||
# On error, returns a dummy record with an empty ID string.
|
||||
global find_endpoint: function(id: string): Broker::EndpointInfo;
|
||||
|
@ -221,6 +237,64 @@ function null_config(): Management::Configuration
|
|||
return Management::Configuration($id="");
|
||||
}
|
||||
|
||||
function config_nodes_lacking_ports(config: Management::Configuration): vector of string
|
||||
{
|
||||
local res: vector of string;
|
||||
local roles = { Supervisor::MANAGER, Supervisor::LOGGER, Supervisor::PROXY };
|
||||
|
||||
for ( node in config$nodes )
|
||||
{
|
||||
if ( node$role in roles && ! node?$p )
|
||||
res += node$name;
|
||||
}
|
||||
|
||||
return sort(res, strcmp);
|
||||
}
|
||||
|
||||
function config_assign_ports(config: Management::Configuration)
|
||||
{
|
||||
# We're changing nodes in the configuration's set, so need to rebuild it:
|
||||
local new_nodes: set[Management::Node];
|
||||
|
||||
# Workers don't need listening ports, but these do:
|
||||
local roles = vector(Supervisor::MANAGER, Supervisor::LOGGER, Supervisor::PROXY);
|
||||
|
||||
local p = port_to_count(Management::Controller::auto_assign_start_port);
|
||||
local roles_set: set[Supervisor::ClusterRole];
|
||||
|
||||
for ( i in roles )
|
||||
add roles_set[roles[i]];
|
||||
|
||||
# Copy any nodes to the new set that have roles we don't care about.
|
||||
for ( node in config$nodes )
|
||||
{
|
||||
if ( node$role !in roles_set )
|
||||
add new_nodes[node];
|
||||
}
|
||||
|
||||
# Now process the ones that may need ports, in order.
|
||||
for ( i in roles )
|
||||
{
|
||||
for ( node in config$nodes )
|
||||
{
|
||||
if ( node$role != roles[i] )
|
||||
next;
|
||||
|
||||
if ( node?$p ) # Already has a port.
|
||||
{
|
||||
add new_nodes[node];
|
||||
next;
|
||||
}
|
||||
|
||||
node$p = count_to_port(p, tcp);
|
||||
add new_nodes[node];
|
||||
++p;
|
||||
}
|
||||
}
|
||||
|
||||
config$nodes = new_nodes;
|
||||
}
|
||||
|
||||
function find_endpoint(id: string): Broker::EndpointInfo
|
||||
{
|
||||
local peers = Broker::peers();
|
||||
|
@ -277,6 +351,18 @@ function filter_config_nodes_by_name(nodes: set[string]): set[string]
|
|||
return nodes & cluster_nodes;
|
||||
}
|
||||
|
||||
function send_set_configuration_response_error(req: Management::Request::Request, error: string)
|
||||
{
|
||||
local res = Management::Result($reqid=req$id);
|
||||
|
||||
res$success = F;
|
||||
res$error = error;
|
||||
req$results += res;
|
||||
|
||||
Broker::publish(Management::Controller::topic,
|
||||
Management::Controller::API::set_configuration_response, req$id, req$results);
|
||||
}
|
||||
|
||||
event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||
{
|
||||
local insts = Management::Util::set_to_vector(instances);
|
||||
|
@ -486,6 +572,25 @@ event Management::Controller::API::set_configuration_request(reqid: string, conf
|
|||
# - Do node types with optional fields have required values?
|
||||
# ...
|
||||
|
||||
if ( Management::Controller::auto_assign_ports )
|
||||
config_assign_ports(config);
|
||||
else
|
||||
{
|
||||
local nodes = config_nodes_lacking_ports(config);
|
||||
|
||||
if ( |nodes| > 0 )
|
||||
{
|
||||
local nodes_str = join_string_vec(nodes, ", ");
|
||||
send_set_configuration_response_error(req,
|
||||
fmt("port auto-assignment disabled but nodes %s lack ports", nodes_str));
|
||||
|
||||
Management::Request::finish(req$id);
|
||||
Management::Log::info(fmt("tx Management::Controller::API::set_configuration_response %s",
|
||||
Management::Request::to_string(req)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# The incoming request is now the pending one. It gets cleared when all
|
||||
# agents have processed their config updates successfully, or their
|
||||
# responses time out.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue