mirror of
https://github.com/zeek/zeek.git
synced 2025-10-10 10:38:20 +00:00
Merge remote-tracking branch 'origin/topic/christian/management-instance-handling'
* origin/topic/christian/management-instance-handling: Management framework: bump zeek-client to pull in rendering tweaks Management framework: bump external cluster testsuite Management framework: improve address and port handling Management framework: broaden get_instances response data to connected instances Management framework: expand notify_agent_hello event arguments Management framework: comment-only tweaks and typo fixes
This commit is contained in:
commit
febdc97f09
10 changed files with 130 additions and 50 deletions
12
CHANGES
12
CHANGES
|
@ -1,3 +1,15 @@
|
||||||
|
5.0.0-dev.569 | 2022-06-03 09:50:01 -0700
|
||||||
|
|
||||||
|
* Management framework: bump zeek-client to pull in rendering tweaks (Christian Kreibich, Corelight)
|
||||||
|
|
||||||
|
* Management framework: bump external cluster testsuite (Christian Kreibich, Corelight)
|
||||||
|
|
||||||
|
* Mark lookup_asn() BIF as deprecated in v6.1 (Phil Rzewski)
|
||||||
|
|
||||||
|
* Define geo_autonomous_system record type (Phil Rzewski)
|
||||||
|
|
||||||
|
* Add lookup_autonomous_system() BIF that returns AS number and org (Phil Rzewski)
|
||||||
|
|
||||||
5.0.0-dev.559 | 2022-06-02 16:58:58 -0700
|
5.0.0-dev.559 | 2022-06-02 16:58:58 -0700
|
||||||
|
|
||||||
* Mark lookup_asn() BIF as deprecated in v6.1 (Phil Rzewski)
|
* Mark lookup_asn() BIF as deprecated in v6.1 (Phil Rzewski)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
5.0.0-dev.561
|
5.0.0-dev.569
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 2b00ec50c0e79e9f3ad18bad58b92ba32b405274
|
Subproject commit 71e2de5864e0c1a9c232d26d592f809e983e56ed
|
|
@ -145,17 +145,20 @@ export {
|
||||||
## The agent sends this event upon peering as a "check-in", informing
|
## The agent sends this event upon peering as a "check-in", informing
|
||||||
## the controller that an agent of the given name is now available to
|
## the controller that an agent of the given name is now available to
|
||||||
## communicate with. It is a controller-level equivalent of
|
## communicate with. It is a controller-level equivalent of
|
||||||
## `:zeek:see:`Broker::peer_added`.
|
## `:zeek:see:`Broker::peer_added` and triggered by it.
|
||||||
##
|
##
|
||||||
## instance: an instance name, really the agent's name as per
|
## instance: an instance name, really the agent's name as per
|
||||||
## :zeek:see:`Management::Agent::get_name`.
|
## :zeek:see:`Management::Agent::get_name`.
|
||||||
##
|
##
|
||||||
## host: the IP address of the agent. (This may change in the future.)
|
## id: the Broker ID of the agent.
|
||||||
|
##
|
||||||
|
## connecting: true if this agent connected to the controller,
|
||||||
|
## false if the controller connected to the agent.
|
||||||
##
|
##
|
||||||
## api_version: the API version of this agent.
|
## api_version: the API version of this agent.
|
||||||
##
|
##
|
||||||
global notify_agent_hello: event(instance: string, host: addr,
|
global notify_agent_hello: event(instance: string, id: string,
|
||||||
api_version: count);
|
connecting: bool, api_version: count);
|
||||||
|
|
||||||
|
|
||||||
# The following are not yet implemented.
|
# The following are not yet implemented.
|
||||||
|
|
|
@ -59,7 +59,7 @@ redef record Management::Request::Request += {
|
||||||
redef Management::role = Management::AGENT;
|
redef Management::role = Management::AGENT;
|
||||||
|
|
||||||
# Conduct more frequent table expiration checks. This helps get more predictable
|
# Conduct more frequent table expiration checks. This helps get more predictable
|
||||||
# timing for request timeouts and only affects the controller, which is mostly idle.
|
# timing for request timeouts and only affects the agent, which is mostly idle.
|
||||||
redef table_expire_interval = 2 sec;
|
redef table_expire_interval = 2 sec;
|
||||||
|
|
||||||
# Tweak the request timeout so it's relatively quick, and quick enough always to
|
# Tweak the request timeout so it's relatively quick, and quick enough always to
|
||||||
|
@ -694,7 +694,8 @@ event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
|
|
||||||
Broker::publish(agent_topic(),
|
Broker::publish(agent_topic(),
|
||||||
Management::Agent::API::notify_agent_hello,
|
Management::Agent::API::notify_agent_hello,
|
||||||
epi$id, to_addr(epi$network$address),
|
epi$id, Broker::node_id(),
|
||||||
|
Management::Agent::controller$address != "0.0.0.0",
|
||||||
Management::Agent::API::version);
|
Management::Agent::API::version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -736,5 +737,5 @@ event zeek_init()
|
||||||
# If the controller connects to us, it also uses this port.
|
# If the controller connects to us, it also uses this port.
|
||||||
Broker::listen(cat(epi$network$address), epi$network$bound_port);
|
Broker::listen(cat(epi$network$address), epi$network$bound_port);
|
||||||
|
|
||||||
Management::Log::info("agent is live");
|
Management::Log::info(fmt("agent is live, Broker ID %s", Broker::node_id()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,8 +164,8 @@ export {
|
||||||
## The controller triggers this event when the operational cluster
|
## The controller triggers this event when the operational cluster
|
||||||
## instances align with the ones desired by the cluster
|
## instances align with the ones desired by the cluster
|
||||||
## configuration. It's essentially a cluster management readiness
|
## configuration. It's essentially a cluster management readiness
|
||||||
## event. This event is currently only used by the controller and not
|
## event. This event is currently only used internally by the controller,
|
||||||
## published to other topics.
|
## and not published to topics.
|
||||||
##
|
##
|
||||||
## instances: the set of instance names now ready.
|
## instances: the set of instance names now ready.
|
||||||
##
|
##
|
||||||
|
|
|
@ -74,37 +74,52 @@ redef record Management::Request::Request += {
|
||||||
redef Management::role = Management::CONTROLLER;
|
redef Management::role = Management::CONTROLLER;
|
||||||
|
|
||||||
# Conduct more frequent table expiration checks. This helps get more predictable
|
# Conduct more frequent table expiration checks. This helps get more predictable
|
||||||
# timing for request timeouts and only affects the agent, which is mostly idle.
|
# timing for request timeouts and only affects the controller, which is mostly idle.
|
||||||
redef table_expire_interval = 2 sec;
|
redef table_expire_interval = 2 sec;
|
||||||
|
|
||||||
|
# Helper that checks whether the agents that are ready match those we need to
|
||||||
|
# operate the current cluster configuration. When that is the case, triggers
|
||||||
|
# notify_agents_ready event.
|
||||||
global check_instances_ready: function();
|
global check_instances_ready: function();
|
||||||
|
|
||||||
|
# Adds the given instance to g_instances and peers, if needed.
|
||||||
global add_instance: function(inst: Management::Instance);
|
global add_instance: function(inst: Management::Instance);
|
||||||
|
|
||||||
|
# Drops the given instance from g_instances and sends it an
|
||||||
|
# agent_standby_request, so it drops its current cluster nodes (if any).
|
||||||
global drop_instance: function(inst: Management::Instance);
|
global drop_instance: function(inst: Management::Instance);
|
||||||
|
|
||||||
|
# Helpers to simplify handling of config records.
|
||||||
global null_config: function(): Management::Configuration;
|
global null_config: function(): Management::Configuration;
|
||||||
global is_null_config: function(config: Management::Configuration): bool;
|
global is_null_config: function(config: Management::Configuration): bool;
|
||||||
|
|
||||||
|
# Given a Broker ID, this returns the endpoint info associated with it.
|
||||||
|
# On error, returns a dummy record with an empty ID string.
|
||||||
|
global find_endpoint: function(id: string): Broker::EndpointInfo;
|
||||||
|
|
||||||
# Checks whether the given instance is one that we know with different
|
# Checks whether the given instance is one that we know with different
|
||||||
# communication settings: a different peering direction, a different listening
|
# communication settings: a different peering direction, a different listening
|
||||||
# port, etc. Used as a predicate to indicate when we need to drop the existing
|
# port, etc. Used as a predicate to indicate when we need to drop the existing
|
||||||
# one from our internal state.
|
# one from our internal state.
|
||||||
global is_instance_connectivity_change: function(inst: Management::Instance): bool;
|
global is_instance_connectivity_change: function(inst: Management::Instance): bool;
|
||||||
|
|
||||||
# The set of agents the controller interacts with to manage to currently
|
# The set of agents the controller interacts with to manage the currently
|
||||||
# configured cluster. This may be a subset of all the agents known to the
|
# configured cluster. This may be a subset of all the agents known to the
|
||||||
# controller, as tracked by the g_instances_known set. They key is the instance
|
# controller, tracked by the g_instances_known table. They key is the instance
|
||||||
# name and should match the $name member of the corresponding instance record.
|
# name and should match the $name member of the corresponding instance record.
|
||||||
global g_instances: table[string] of Management::Instance = table();
|
global g_instances: table[string] of Management::Instance = table();
|
||||||
|
|
||||||
# The set of instances that have checked in with the controller. This is a
|
# The instances the controller is communicating with. This can deviate from
|
||||||
# superset of g_instances, since it covers any agent that has sent us a
|
# g_instances, since it covers any agent that has sent us a notify_agent_hello
|
||||||
# notify_agent_hello event.
|
# event, regardless of whether it relates to the current configuration.
|
||||||
global g_instances_known: set[string] = set();
|
global g_instances_known: table[string] of Management::Instance = table();
|
||||||
|
|
||||||
# A corresponding set of instances/agents that we track in order to understand
|
# The set of instances we need for the current configuration and that are ready
|
||||||
# when all of the above instances have sent agent_welcome_response events. (An
|
# (meaning they have responded to our agent_welcome_request message). Once they
|
||||||
# alternative would be to use a record that adds a single state bit for each
|
# match g_instances, a Management::Controller::API::notify_agents_ready event
|
||||||
# instance, and store that above.)
|
# signals readiness and triggers config deployment. (An alternative
|
||||||
|
# implementation would be via a record that adds a single state bit for each
|
||||||
|
# instance, and store that in g_instances.)
|
||||||
global g_instances_ready: set[string] = set();
|
global g_instances_ready: set[string] = set();
|
||||||
|
|
||||||
# The request ID of the most recent configuration update that's come in from
|
# The request ID of the most recent configuration update that's come in from
|
||||||
|
@ -139,8 +154,6 @@ function send_config_to_agents(req: Management::Request::Request, config: Manage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# This is the &on_change handler for the g_instances_ready set, meaning
|
|
||||||
# it runs whenever a required agent has confirmed it's ready.
|
|
||||||
function check_instances_ready()
|
function check_instances_ready()
|
||||||
{
|
{
|
||||||
local cur_instances: set[string];
|
local cur_instances: set[string];
|
||||||
|
@ -162,10 +175,15 @@ function add_instance(inst: Management::Instance)
|
||||||
|
|
||||||
if ( inst$name in g_instances_known )
|
if ( inst$name in g_instances_known )
|
||||||
{
|
{
|
||||||
# The agent has already peered with us. Send welcome to indicate
|
# The agent has already peered with us. This means we have its
|
||||||
# it's part of cluster management. Once it responds, we update
|
# IP address as observed by us, so use it if this agent
|
||||||
# the set of ready instances and proceed as feasible with config
|
# connected to us.
|
||||||
# deployments.
|
if ( ! inst?$listen_port )
|
||||||
|
inst$host = g_instances_known[inst$name]$host;
|
||||||
|
|
||||||
|
# Send welcome to indicate it's part of cluster management. Once
|
||||||
|
# it responds, we update the set of ready instances and proceed
|
||||||
|
# as feasible with config deployments.
|
||||||
|
|
||||||
local req = Management::Request::create();
|
local req = Management::Request::create();
|
||||||
|
|
||||||
|
@ -203,6 +221,20 @@ function null_config(): Management::Configuration
|
||||||
return Management::Configuration($id="");
|
return Management::Configuration($id="");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function find_endpoint(id: string): Broker::EndpointInfo
|
||||||
|
{
|
||||||
|
local peers = Broker::peers();
|
||||||
|
|
||||||
|
for ( i in peers )
|
||||||
|
{
|
||||||
|
if ( peers[i]$peer$id == id )
|
||||||
|
return peers[i]$peer;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Return a dummy instance.
|
||||||
|
return Broker::EndpointInfo($id="");
|
||||||
|
}
|
||||||
|
|
||||||
function is_null_config(config: Management::Configuration): bool
|
function is_null_config(config: Management::Configuration): bool
|
||||||
{
|
{
|
||||||
return config$id == "";
|
return config$id == "";
|
||||||
|
@ -266,9 +298,10 @@ event Management::Controller::API::notify_agents_ready(instances: set[string])
|
||||||
send_config_to_agents(req, req$set_configuration_state$config);
|
send_config_to_agents(req, req$set_configuration_state$config);
|
||||||
}
|
}
|
||||||
|
|
||||||
event Management::Agent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count)
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Agent::API::notify_agent_hello %s %s", instance, host));
|
Management::Log::info(fmt("rx Management::Agent::API::notify_agent_hello %s %s %s",
|
||||||
|
instance, id, connecting));
|
||||||
|
|
||||||
# When an agent checks in with a mismatching API version, we log the
|
# When an agent checks in with a mismatching API version, we log the
|
||||||
# fact and drop its state, if any.
|
# fact and drop its state, if any.
|
||||||
|
@ -276,7 +309,7 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a
|
||||||
{
|
{
|
||||||
Management::Log::warning(
|
Management::Log::warning(
|
||||||
fmt("instance %s/%s has checked in with incompatible API version %s",
|
fmt("instance %s/%s has checked in with incompatible API version %s",
|
||||||
instance, host, api_version));
|
instance, id, api_version));
|
||||||
|
|
||||||
if ( instance in g_instances )
|
if ( instance in g_instances )
|
||||||
drop_instance(g_instances[instance]);
|
drop_instance(g_instances[instance]);
|
||||||
|
@ -286,13 +319,38 @@ event Management::Agent::API::notify_agent_hello(instance: string, host: addr, a
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
add g_instances_known[instance];
|
local ei = find_endpoint(id);
|
||||||
|
|
||||||
|
if ( ei$id == "" )
|
||||||
|
{
|
||||||
|
Management::Log::warning(fmt("notify_agent_hello from %s with unknown Broker ID %s",
|
||||||
|
instance, id));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( ! ei?$network )
|
||||||
|
{
|
||||||
|
Management::Log::warning(fmt("notify_agent_hello from %s lacks network state, Broker ID %s",
|
||||||
|
instance, id));
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( ei$id != "" && ei?$network )
|
||||||
|
{
|
||||||
|
g_instances_known[instance] = Management::Instance(
|
||||||
|
$name=instance, $host=to_addr(ei$network$address));
|
||||||
|
|
||||||
|
if ( ! connecting )
|
||||||
|
{
|
||||||
|
# We connected to this agent, note down its port.
|
||||||
|
g_instances_known[instance]$listen_port = ei$network$bound_port;
|
||||||
|
}
|
||||||
|
|
||||||
Management::Log::debug(fmt("instance %s now known to us", instance));
|
Management::Log::debug(fmt("instance %s now known to us", instance));
|
||||||
|
}
|
||||||
|
|
||||||
if ( instance in g_instances && instance !in g_instances_ready )
|
if ( instance in g_instances && instance !in g_instances_ready )
|
||||||
{
|
{
|
||||||
# We need this instance for our cluster and have full context for
|
# We need this instance for the requested cluster and have full
|
||||||
# it from the configuration. Tell agent.
|
# context for it from the configuration. Tell agent.
|
||||||
local req = Management::Request::create();
|
local req = Management::Request::create();
|
||||||
|
|
||||||
Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance));
|
Management::Log::info(fmt("tx Management::Agent::API::agent_welcome_request to %s", instance));
|
||||||
|
@ -540,13 +598,19 @@ event Management::Controller::API::get_configuration_request(reqid: string)
|
||||||
|
|
||||||
event Management::Controller::API::get_instances_request(reqid: string)
|
event Management::Controller::API::get_instances_request(reqid: string)
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("rx Management::Controller::API::set_instances_request %s", reqid));
|
Management::Log::info(fmt("rx Management::Controller::API::get_instances_request %s", reqid));
|
||||||
|
|
||||||
local res = Management::Result($reqid = reqid);
|
local res = Management::Result($reqid = reqid);
|
||||||
|
local inst_names: vector of string;
|
||||||
local insts: vector of Management::Instance;
|
local insts: vector of Management::Instance;
|
||||||
|
|
||||||
for ( i in g_instances )
|
for ( inst in g_instances_known )
|
||||||
insts += g_instances[i];
|
inst_names += inst;
|
||||||
|
|
||||||
|
sort(inst_names, strcmp);
|
||||||
|
|
||||||
|
for ( i in inst_names )
|
||||||
|
insts += g_instances_known[inst_names[i]];
|
||||||
|
|
||||||
res$data = insts;
|
res$data = insts;
|
||||||
|
|
||||||
|
@ -606,7 +670,7 @@ event Management::Controller::API::get_nodes_request(reqid: string)
|
||||||
Management::Log::info(fmt("rx Management::Controller::API::get_nodes_request %s", reqid));
|
Management::Log::info(fmt("rx Management::Controller::API::get_nodes_request %s", reqid));
|
||||||
|
|
||||||
# Special case: if we have no instances, respond right away.
|
# Special case: if we have no instances, respond right away.
|
||||||
if ( |g_instances| == 0 )
|
if ( |g_instances_known| == 0 )
|
||||||
{
|
{
|
||||||
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid));
|
Management::Log::info(fmt("tx Management::Controller::API::get_nodes_response %s", reqid));
|
||||||
local res = Management::Result($reqid=reqid, $success=F,
|
local res = Management::Result($reqid=reqid, $success=F,
|
||||||
|
@ -619,11 +683,8 @@ event Management::Controller::API::get_nodes_request(reqid: string)
|
||||||
local req = Management::Request::create(reqid);
|
local req = Management::Request::create(reqid);
|
||||||
req$get_nodes_state = GetNodesState();
|
req$get_nodes_state = GetNodesState();
|
||||||
|
|
||||||
for ( name in g_instances )
|
for ( name in g_instances_known )
|
||||||
{
|
{
|
||||||
if ( name !in g_instances_ready )
|
|
||||||
next;
|
|
||||||
|
|
||||||
local agent_topic = Management::Agent::topic_prefix + "/" + name;
|
local agent_topic = Management::Agent::topic_prefix + "/" + name;
|
||||||
local areq = Management::Request::create();
|
local areq = Management::Request::create();
|
||||||
|
|
||||||
|
@ -855,6 +916,11 @@ event Management::Controller::API::test_timeout_request(reqid: string, with_stat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(peer: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
Management::Log::debug(fmt("broker peer %s added: %s", peer, msg));
|
||||||
|
}
|
||||||
|
|
||||||
event zeek_init()
|
event zeek_init()
|
||||||
{
|
{
|
||||||
# Initialize null config at startup. We will replace it once we have
|
# Initialize null config at startup. We will replace it once we have
|
||||||
|
@ -875,5 +941,5 @@ event zeek_init()
|
||||||
Broker::subscribe(Management::Agent::topic_prefix);
|
Broker::subscribe(Management::Agent::topic_prefix);
|
||||||
Broker::subscribe(Management::Controller::topic);
|
Broker::subscribe(Management::Controller::topic);
|
||||||
|
|
||||||
Management::Log::info("controller is live");
|
Management::Log::info(fmt("controller is live, Broker ID %s", Broker::node_id()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
notify_agent_hello agent 127.0.0.1 1
|
notify_agent_hello agent 1
|
||||||
|
|
|
@ -40,14 +40,12 @@ event zeek_init()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
event Management::Agent::API::notify_agent_hello(instance: string, host: addr, api_version: count)
|
event Management::Agent::API::notify_agent_hello(instance: string, id: string, connecting: bool, api_version: count)
|
||||||
{
|
{
|
||||||
if ( Management::role == Management::CONTROLLER )
|
if ( Management::role == Management::CONTROLLER )
|
||||||
{
|
{
|
||||||
# On rare occasion it can happen that we log this twice, which'll need
|
|
||||||
# investigating. For now we ensure we only do so once.
|
|
||||||
if ( ! logged )
|
if ( ! logged )
|
||||||
print(fmt("notify_agent_hello %s %s %s", instance, host, api_version));
|
print(fmt("notify_agent_hello %s %s", instance, api_version));
|
||||||
|
|
||||||
logged = T;
|
logged = T;
|
||||||
|
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
de28fe5db9a733f8b4b53f84127bab888a184f6a
|
ffc21d5be60e78cab7f8a57b07d1a5842cbd1322
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue