diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 9bba22ed09..84c3d01935 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -391,7 +391,18 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + ## Obtain each peering's send-buffer fill level. The keys are Broker + ## endpoint IDs. + ## + ## Returns: number of messages queued for sending, for each peering. global peer_buffer_levels: function(): table[string] of count; + + ## Obtain each peering's number of send-buffer overflows. The keys are + ## Broker endpoint IDs. Note that for buffer policy "disconnect", these + ## overflows are short-lived, since Broker will remove those peerings + ## upon overflow. + ## + ## Returns: number of send-buffer overflows, for each peering. global peer_buffer_overflows: function(): table[string] of count; ## Sends all pending log messages to remote peers. This normally diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 0d6372e3d4..85fef40c5f 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -14,8 +14,11 @@ redef Broker::log_topic = Cluster::rr_log_topic; # Add a cluster prefix. @prefixes += cluster -# This should soon condition on loading only when Broker is in use. +# Broker-specific additions: +@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) @load ./broker-backpressure +@load ./broker-telemetry +@endif @if ( Supervisor::is_supervised() ) # When running a supervised cluster, populate Cluster::nodes from the node table diff --git a/scripts/base/frameworks/cluster/broker-telemetry.zeek b/scripts/base/frameworks/cluster/broker-telemetry.zeek new file mode 100644 index 0000000000..b61c9fcdf7 --- /dev/null +++ b/scripts/base/frameworks/cluster/broker-telemetry.zeek @@ -0,0 +1,50 @@ +# Additional Broker-specific metrics that use Zeek cluster-level node names. + +@load base/frameworks/telemetry + +module Cluster; + +global broker_peer_buffer_levels_gf = Telemetry::register_gauge_family([ + $prefix="zeek", + $name="broker-peer-buffer-levels", + $unit="", + $label_names=vector("peer"), + $help_text="Number of messages queued in Broker's per-peer send buffers", +]); + +global broker_peer_buffer_overflows_cf = Telemetry::register_counter_family([ + $prefix="zeek", + $name="broker-peer-buffer-overflows", + $unit="", + $label_names=vector("peer"), + $help_text="Number of overflows in Broker's per-peer send buffers", +]); + +hook Telemetry::sync() + { + local peers: table[string] of count = Broker::peer_buffer_levels(); + local nn: NamedNode; + + for ( peer, level in peers ) + { + # Translate the Broker IDs to Zeek-level node names. We skip + # telemetry for peers where this mapping fails, i.e. ones for + # connections to external systems. + nn = nodeid_to_node(peer); + + if ( |nn$name| > 0 ) + Telemetry::gauge_family_set(broker_peer_buffer_levels_gf, + vector(nn$name), level); + } + + peers = Broker::peer_buffer_overflows(); + + for ( peer, overflows in peers ) + { + nn = nodeid_to_node(peer); + + if ( |nn$name| > 0 ) + Telemetry::counter_family_set(broker_peer_buffer_overflows_cf, + vector(nn$name), overflows); + } + } diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 9997ec4fd8..93af34614e 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,6 +1,7 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -./frameworks/cluster/broker-backpressure.zeek -./frameworks/cluster/broker-stores.zeek +-./frameworks/cluster/broker-telemetry.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek