Merge branch 'topic/christian/disconnect-slow-peers'

* topic/christian/disconnect-slow-peers:
  Bump cluster testsuite to pull in Broker backpressure tests
  Expand documentation of Broker events.
  Add sleep() BiF.
  Add backpressure disconnect notification to cluster.log and via telemetry
  Remove unneeded @loads from base/misc/version.zeek
  Add Cluster::nodeid_to_node() helper function
  Support re-peering with Broker peers that fall behind
  Add Zeek-level configurability of Broker slow-peer disconnects
  Bump Broker to pull in disconnect feature and infinite-loop fix
  No need to namespace Cluster:: functions in their own namespace
This commit is contained in:
Christian Kreibich 2024-12-09 23:28:30 -08:00
commit 1c42bfc715
25 changed files with 331 additions and 19 deletions

View file

@ -1,3 +1,4 @@
@load ./main
@load ./store
@load ./log
@load ./backpressure

View file

@ -0,0 +1,35 @@
##! This handles Broker peers that fall so far behind in handling messages that
##! this node sends it that the local Broker endpoint decides to unpeer them.
##! Zeek captures this as follows:
##!
##! - In broker.log, with a regular "peer-removed" entry indicating CAF's reason.
##! - Via eventing through :zeek:see:`Broker::peer_removed` as done in this script.
##!
##! The cluster framework additionally captures the unpeering as follows:
##!
##! - In cluster.log, with a higher-level message indicating the node names involved.
##! - Via telemetry, using a labeled counter.
event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string)
{
if ( "caf::sec::backpressure_overflow" !in msg ) {
return;
}
if ( ! endpoint?$network ) {
Reporter::error(fmt("Missing network info to re-peer with %s", endpoint$id));
return;
}
# Re-establish the peering so Broker's reconnect behavior kicks in once
# the other endpoint catches up. Broker will periodically re-try
# connecting as necessary. If the other endpoint originally connected to
# us, our attempt will fail (since we attempt to connect to the peer's
# ephemeral port), but in that case the peer will reconnect with us once
# it recovers.
#
# We could do this more cleanly by leveraging information from the
# cluster framework (since it knows who connects to whom), but that
# would further entangle Broker into it.
Broker::peer(endpoint$network$address, endpoint$network$bound_port);
}

View file

@ -86,6 +86,24 @@ export {
## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.
const max_threads = 1 &redef;
## Max number of items we buffer at most per peer. What action to take when
## the buffer reaches its maximum size is determined by
## `peer_overflow_policy`.
const peer_buffer_size = 2048 &redef;
## Configures how Broker responds to peers that cannot keep up with the
## incoming message rate. Available strategies:
## - disconnect: drop the connection to the unresponsive peer
## - drop_newest: replace the newest message in the buffer
## - drop_oldest: removed the olsted message from the buffer, then append
const peer_overflow_policy = "disconnect" &redef;
## Same as `peer_buffer_size` but for WebSocket clients.
const web_socket_buffer_size = 512 &redef;
## Same as `peer_overflow_policy` but for WebSocket clients.
const web_socket_overflow_policy = "disconnect" &redef;
## The CAF scheduling policy to use. Available options are "sharing" and
## "stealing". The "sharing" policy uses a single, global work queue along
## with mutex and condition variable used for accessing it, which may be

View file

@ -14,6 +14,9 @@ redef Broker::log_topic = Cluster::rr_log_topic;
# Add a cluster prefix.
@prefixes += cluster
# This should soon condition on loading only when Broker is in use.
@load ./broker-backpressure
@if ( Supervisor::is_supervised() )
# When running a supervised cluster, populate Cluster::nodes from the node table
# the Supervisor provides to new Zeek nodes. The management framework configures

View file

@ -0,0 +1,29 @@
# Notifications for Broker-reported backpressure overflow.
# See base/frameworks/broker/backpressure.zeek for context.
@load base/frameworks/telemetry
module Cluster;
global broker_backpressure_disconnects_cf = Telemetry::register_counter_family([
$prefix="zeek",
$name="broker-backpressure-disconnects",
$unit="",
$label_names=vector("peer"),
$help_text="Number of Broker peerings dropped due to a neighbor falling behind in message I/O",
]);
event Broker::peer_removed(endpoint: Broker::EndpointInfo, msg: string)
{
if ( ! endpoint?$network || "caf::sec::backpressure_overflow" !in msg )
return;
local nn = nodeid_to_node(endpoint$id);
Cluster::log(fmt("removed due to backpressure overflow: %s%s:%s (%s)",
nn$name != "" ? "" : "non-cluster peer ",
endpoint$network$address, endpoint$network$bound_port,
nn$name != "" ? nn$name : endpoint$id));
Telemetry::counter_family_inc(broker_backpressure_disconnects_cf,
vector(nn$name != "" ? nn$name : "unknown"));
}

View file

@ -281,6 +281,15 @@ export {
## a given cluster node.
global nodeid_topic: function(id: string): string;
## Retrieve the cluster-level naming of a node based on its node ID,
## a backend-specific identifier.
##
## id: the node ID of a peer.
##
## Returns: the :zeek:see:`Cluster::NamedNode` for the requested node, if
## known, otherwise a "null" instance with an empty name field.
global nodeid_to_node: function(id: string): NamedNode;
## Initialize the cluster backend.
##
## Cluster backends usually invoke this from a :zeek:see:`zeek_init` handler.
@ -336,7 +345,7 @@ function nodes_with_type(node_type: NodeType): vector of NamedNode
{ return strcmp(n1$name, n2$name); });
}
function Cluster::get_node_count(node_type: NodeType): count
function get_node_count(node_type: NodeType): count
{
local cnt = 0;
@ -349,7 +358,7 @@ function Cluster::get_node_count(node_type: NodeType): count
return cnt;
}
function Cluster::get_active_node_count(node_type: NodeType): count
function get_active_node_count(node_type: NodeType): count
{
return node_type in active_node_ids ? |active_node_ids[node_type]| : 0;
}
@ -394,6 +403,17 @@ function nodeid_topic(id: string): string
return nodeid_topic_prefix + id + "/";
}
function nodeid_to_node(id: string): NamedNode
{
for ( name, n in nodes )
{
if ( n?$id && n$id == id )
return NamedNode($name=name, $node=n);
}
return NamedNode($name="", $node=[$node_type=NONE, $ip=0.0.0.0]);
}
event Cluster::hello(name: string, id: string) &priority=10
{
if ( name !in nodes )

View file

@ -2,9 +2,6 @@
##! The most convenient way to access this are the Version::number
##! and Version::info constants.
@load base/frameworks/reporter
@load base/utils/strings
module Version;
export {