Merge branch 'topic/christian/broker-backpressure-metrics'

* topic/christian/broker-backpressure-metrics:
  Add basic btest to verify that Broker peering telemetry is available.
  Add cluster framework telemetry for Broker's send-buffer use
  Add peer buffer update tracking to the Broker manager's event_observer
  Rename the Broker manager's LoggerAdapter
  Avoid race in the cluster/broker/publish-any btest
This commit is contained in:
Christian Kreibich 2025-04-25 10:02:58 -07:00
commit c1a5f70df8
16 changed files with 412 additions and 23 deletions

14
CHANGES
View file

@ -1,3 +1,17 @@
7.2.0-dev.644 | 2025-04-25 10:02:58 -0700
* Add basic btest to verify that Broker peering telemetry is available. (Christian Kreibich, Corelight)
* Add cluster framework telemetry for Broker's send-buffer use (Christian Kreibich, Corelight)
See NEWS or scripts/base/frameworks/cluster/broker-telemetry.zeek for details.
* Add peer buffer update tracking to the Broker manager's event_observer (Christian Kreibich, Corelight)
* Rename the Broker manager's LoggerAdapter (Christian Kreibich, Corelight)
* Avoid race in the cluster/broker/publish-any btest (Christian Kreibich, Corelight)
7.2.0-dev.638 | 2025-04-25 06:41:06 -0700
* Skip linting on highwayhash and src/3rdparty files (Tim Wojtulewicz, Corelight)

View file

@ -1 +1 @@
7.2.0-dev.638
7.2.0-dev.644

View file

@ -104,6 +104,10 @@ export {
## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients.
const web_socket_overflow_policy = "disconnect" &redef;
## How frequently Zeek resets some peering/client buffer statistics,
## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`.
const buffer_stats_reset_interval = 1min &redef;
## The CAF scheduling policy to use. Available options are "sharing" and
## "stealing". The "sharing" policy uses a single, global work queue along
## with mutex and condition variable used for accessing it, which may be
@ -392,6 +396,12 @@ export {
## Returns: a unique identifier for the local broker endpoint.
global node_id: function(): string;
## Obtain each peering's send-buffer statistics. The keys are Broker
## endpoint IDs.
##
## Returns: per-peering statistics.
global peering_stats: function(): table[string] of BrokerPeeringStats;
## Sends all pending log messages to remote peers. This normally
## doesn't need to be used except for test cases that are time-sensitive.
global flush_logs: function(): count;
@ -554,6 +564,11 @@ function node_id(): string
return __node_id();
}
function peering_stats(): table[string] of BrokerPeeringStats
{
return __peering_stats();
}
function flush_logs(): count
{
return __flush_logs();

View file

@ -14,8 +14,11 @@ redef Broker::log_topic = Cluster::rr_log_topic;
# Add a cluster prefix.
@prefixes += cluster
# This should soon condition on loading only when Broker is in use.
# Broker-specific additions:
@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER )
@load ./broker-backpressure
@load ./broker-telemetry
@endif
@if ( Supervisor::is_supervised() )
# When running a supervised cluster, populate Cluster::nodes from the node table

View file

@ -0,0 +1,69 @@
# Additional Broker-specific metrics that use Zeek cluster-level node names.
@load base/frameworks/telemetry
module Cluster;
## This gauge tracks the current number of locally queued messages in each
## Broker peering's send buffer. The "peer" label identifies the remote side of
## the peering, containing a Zeek cluster node name.
global broker_peer_buffer_messages_gf = Telemetry::register_gauge_family([
$prefix="zeek",
$name="broker-peer-buffer-messages",
$unit="",
$label_names=vector("peer"),
$help_text="Number of messages queued in Broker's send buffers",
]);
## This gauge tracks recent maximum queue lengths for each Broker peering's send
## buffer. Most of the time the send buffers are nearly empty, so this gauge
## helps understand recent bursts of messages. "Recent" here means
## :zeek:see:`Broker::buffer_stats_reset_interval`. The time window advances in
## increments of at least the stats interval, not incrementally with every new
## observed message. That is, Zeek keeps a timestamp of when the window started,
## and once it notices that the interval has passed, it moves the start of the
## window to current time.
global broker_peer_buffer_recent_max_messages_gf = Telemetry::register_gauge_family([
$prefix="zeek",
$name="broker-peer-buffer-recent-max-messages",
$unit="",
$label_names=vector("peer"),
$help_text="Maximum number of messages recently queued in Broker's send buffers",
]);
## This counter tracks for each Broker peering the number of times its send
## buffer has overflowed. For the "disconnect" policy this can at most be 1,
## since Broker stops the peering at this time. For the "drop_oldest" and
## "drop_newest" policies (see :zeek:see:`Broker:peer_overflow_policy`) the count
## instead reflects the number of messages lost.
global broker_peer_buffer_overflows_cf = Telemetry::register_counter_family([
$prefix="zeek",
$name="broker-peer-buffer-overflows",
$unit="",
$label_names=vector("peer"),
$help_text="Number of overflows in Broker's send buffers",
]);
hook Telemetry::sync()
{
local peers = Broker::peering_stats();
local nn: NamedNode;
for ( peer, stats in peers )
{
# Translate the Broker IDs to Zeek-level node names. We skip
# telemetry for peers where this mapping fails, i.e. ones for
# connections to external systems.
nn = nodeid_to_node(peer);
if ( |nn$name| > 0 )
{
Telemetry::gauge_family_set(broker_peer_buffer_messages_gf,
vector(nn$name), stats$num_queued);
Telemetry::gauge_family_set(broker_peer_buffer_recent_max_messages_gf,
vector(nn$name), stats$max_queued_recently);
Telemetry::counter_family_set(broker_peer_buffer_overflows_cf,
vector(nn$name), stats$num_overflows);
}
}
}

View file

@ -1135,6 +1135,20 @@ type BrokerStats: record {
num_ids_outgoing: count;
};
## Broker statistics for an individual peering.
##
type BrokerPeeringStats: record {
## The number of messages currently queued locally for transmission.
num_queued: count;
## The maximum number of messages queued in the recent
## :zeek:see:`Broker::buffer_stats_reset_interval` time interval.
max_queued_recently: count;
## The number of times the send buffer has overflowed.
num_overflows: count;
};
type BrokerPeeringStatsTable: table[string] of BrokerPeeringStats;
## Statistics about reporter messages and weirds.
##
## .. zeek:see:: get_reporter_stats

View file

@ -92,6 +92,174 @@ void print_escaped(std::string& buf, std::string_view str) {
buf.push_back('"');
}
// Track metrics for a given peering's send buffer.
class PeerBufferState {
public:
struct Stats {
// The rendered peer ID. Storing this here helps reuse.
// Note that we only ever touch this from Zeek's main thread, not
// any of Broker's.
zeek::StringValPtr peer_id;
// Whether Broker has removed the peer, and this instance still
// needs to be removed.
bool is_zombie = false;
// Number of messages queued locally in the send buffer.
uint32_t queued = 0;
// Maximum number queued in the last Broker::buffer_stats_reset_interval.
// This improces visibility into message bursts since instantaneous
// queueing (captured above) can be short-lived.
uint32_t max_queued_recently = 0;
// Number of times the buffer overflowed at send time. For the
// "disconnect" overflow policy (via Broker::peer_overflow_policy), this
// count will at most be 1 since Broker will remove the peering upon
// overflow. The existing Zeek-level metric for tracking disconnects
// (see frameworks/broker/broker-backpressure.zeek) covers this one more
// permanently. For the "drop_newest" and "drop_oldest" policies it
// equals a count of the number of messages lost, since the peering
// continues.
uint64_t overflows = 0;
// When we last started a stats-tracking interval for this peering.
double last_interval = 0;
};
// For per-peering tracking, map endpoint IDs to the above state.
using EndpointMetricMap = std::unordered_map<broker::endpoint_id, Stats>;
PeerBufferState(size_t a_buffer_size, double a_stats_reset_interval)
: buffer_size(a_buffer_size), stats_reset_interval(a_stats_reset_interval) {
stats_table =
zeek::make_intrusive<zeek::TableVal>(zeek::id::find_type<zeek::TableType>("BrokerPeeringStatsTable"));
stats_record_type = zeek::id::find_type<zeek::RecordType>("BrokerPeeringStats");
}
void SetEndpoint(const broker::endpoint* a_endpoint) { endpoint = a_endpoint; }
// Update the peering's stats. This runs in Broker's execution context.
// Broker does not expose send-buffer/queue state explicitly, so track
// arrivals (a push, is_push == true) and departures (a pull, is_push ==
// false) as they happen. Note that this must not touch Zeek-side Vals.
void Observe(const broker::endpoint_id& peer, bool is_push) {
std::lock_guard<std::mutex> lock(mutex);
auto it = stats_map.find(peer);
if ( it == stats_map.end() ) {
stats_map.emplace(peer, Stats());
it = stats_map.find(peer);
}
auto& stats = it->second;
// Stick to Broker's notion of time here.
double now{0};
if ( endpoint != nullptr )
broker::convert(endpoint->now(), now);
if ( now - stats.last_interval > stats_reset_interval ) {
stats.last_interval = now;
stats.max_queued_recently = stats.queued;
}
if ( stats.queued == 0 ) {
// Watch for underflows. We could report somehow. Note that this
// runs in the context of Broker's threads.
assert(is_push);
}
if ( is_push && stats.queued == buffer_size )
stats.overflows += 1;
else {
stats.queued += is_push ? 1 : -1;
if ( stats.queued > stats.max_queued_recently )
stats.max_queued_recently = stats.queued;
}
}
// Updates the internal table[string] of BrokerPeeringStats and returns it.
const zeek::TableValPtr& GetPeeringStatsTable() {
std::lock_guard<std::mutex> lock(mutex);
for ( auto it = stats_map.begin(); it != stats_map.end(); ) {
auto& peer = it->first;
auto& stats = it->second;
if ( stats.peer_id == nullptr )
stats.peer_id = PeerIdToStringVal(peer);
// Broker told us the peer is gone, in RemovePeer() below. Remove it
// now from both tables. We add/remove from stats_table only here,
// not in Observer() and/or RemovePeer(), to ensure we only touch
// the Zeek-side Table from Zeek's main thread.
if ( stats.is_zombie ) {
stats_table->Remove(*stats.peer_id);
it = stats_map.erase(it);
continue;
}
auto stats_v = stats_table->Find(stats.peer_id);
if ( stats_v == nullptr ) {
stats_v = zeek::make_intrusive<zeek::RecordVal>(stats_record_type);
stats_table->Assign(stats.peer_id, stats_v);
}
// We may get here more than stats_reset_interval after the last
// Observe(), in which case the max_queued_recently value is now
// stale. Update if so.
double now{0};
if ( endpoint != nullptr )
broker::convert(endpoint->now(), now);
if ( now - stats.last_interval > stats_reset_interval ) {
stats.last_interval = now;
stats.max_queued_recently = stats.queued;
}
int n = 0;
stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.queued));
stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.max_queued_recently));
stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.overflows));
++it;
}
return stats_table;
}
void RemovePeer(const broker::endpoint_id& peer) {
std::lock_guard<std::mutex> lock(mutex);
if ( auto it = stats_map.find(peer); it != stats_map.end() )
it->second.is_zombie = true;
}
private:
zeek::StringValPtr PeerIdToStringVal(const broker::endpoint_id& peer) const {
std::string peer_s;
broker::convert(peer, peer_s);
return zeek::make_intrusive<zeek::StringVal>(peer_s);
}
// The maximum number of messages queueable for transmission to a peer,
// see Broker::peer_buffer_size and Broker::web_socket_buffer_size.
size_t buffer_size;
// Seconds after which we reset stats tracked per time window.
double stats_reset_interval;
EndpointMetricMap stats_map;
zeek::TableValPtr stats_table;
zeek::RecordTypePtr stats_record_type;
mutable std::mutex mutex;
const broker::endpoint* endpoint = nullptr;
};
using PeerBufferStatePtr = std::shared_ptr<PeerBufferState>;
class LoggerQueue {
public:
void Push(broker::event_ptr event) {
@ -128,26 +296,40 @@ using LoggerQueuePtr = std::shared_ptr<LoggerQueue>;
using BrokerSeverityLevel = broker::event::severity_level;
class LoggerAdapter : public broker::event_observer {
class Observer : public broker::event_observer {
public:
using SeverityLevel = broker::event::severity_level;
using LogSeverityLevel = broker::event::severity_level;
explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue)
: severity_(severity), queue_(std::move(queue)) {}
explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate)
: severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {}
void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override {
pbstate_->Observe(peer, true);
}
void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override {
pbstate_->Observe(peer, false);
}
void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override {
pbstate_->RemovePeer(peer);
}
void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); }
bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; }
bool accepts(LogSeverityLevel severity, broker::event::component_type) const override {
return severity <= severity_;
}
private:
SeverityLevel severity_;
LogSeverityLevel severity_;
LoggerQueuePtr queue_;
PeerBufferStatePtr pbstate_;
};
} // namespace
namespace zeek::Broker {
static inline Val* get_option(const char* option) {
const auto& id = zeek::detail::global_scope()->Find(option);
@ -220,19 +402,24 @@ struct opt_mapping {
class BrokerState {
public:
using SeverityLevel = LoggerAdapter::SeverityLevel;
using LogSeverityLevel = Observer::LogSeverityLevel;
BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue)
BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue,
PeerBufferStatePtr pbstate)
: endpoint(std::move(config), telemetry_mgr->GetRegistry()),
subscriber(
endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)),
loggerQueue(std::move(queue)) {}
loggerQueue(std::move(queue)),
peerBufferState(std::move(pbstate)) {
peerBufferState->SetEndpoint(&endpoint);
}
broker::endpoint endpoint;
broker::subscriber subscriber;
LoggerQueuePtr loggerQueue;
SeverityLevel logSeverity = SeverityLevel::critical;
SeverityLevel stderrSeverity = SeverityLevel::critical;
PeerBufferStatePtr peerBufferState;
LogSeverityLevel logSeverity = LogSeverityLevel::critical;
LogSeverityLevel stderrSeverity = LogSeverityLevel::critical;
std::unordered_set<broker::network_info> outbound_peerings;
};
@ -402,11 +589,13 @@ void Manager::DoInitPostScript() {
checkLogSeverity(stderrSeverityVal);
auto adapterVerbosity = static_cast<BrokerSeverityLevel>(std::max(logSeverityVal, stderrSeverityVal));
auto queue = std::make_shared<LoggerQueue>();
auto adapter = std::make_shared<LoggerAdapter>(adapterVerbosity, queue);
broker::logger(adapter); // *must* be called before creating the BrokerState
auto pbstate = std::make_shared<PeerBufferState>(options.peer_buffer_size,
get_option("Broker::buffer_stats_reset_interval")->AsDouble());
auto observer = std::make_shared<Observer>(adapterVerbosity, queue, pbstate);
broker::logger(observer); // *must* be called before creating the BrokerState
auto cqs = get_option("Broker::congestion_queue_size")->AsCount();
bstate = std::make_shared<BrokerState>(std::move(config), cqs, queue);
bstate = std::make_shared<BrokerState>(std::move(config), cqs, queue, pbstate);
bstate->logSeverity = static_cast<BrokerSeverityLevel>(logSeverityVal);
bstate->stderrSeverity = static_cast<BrokerSeverityLevel>(stderrSeverityVal);
@ -1968,6 +2157,8 @@ const Stats& Manager::GetStatistics() {
return statistics;
}
TableValPtr Manager::GetPeeringStatsTable() { return bstate->peerBufferState->GetPeeringStatsTable(); }
bool Manager::AddForwardedStore(const std::string& name, TableValPtr table) {
if ( forwarded_stores.find(name) != forwarded_stores.end() ) {
reporter->Error("same &broker_store %s specified for two different variables", name.c_str());

View file

@ -384,6 +384,14 @@ public:
*/
const Stats& GetStatistics();
/**
* Returns a table[string] of BrokerPeeringStats, with each peering's
* send-buffer stats filled in. The keys are Broker node IDs identifying the
* current peers.
* @return Each peering's send-buffer statistics.
*/
TableValPtr GetPeeringStatsTable();
/**
* Creating an instance of this struct simply helps the manager
* keep track of whether calls into its API are coming from script

View file

@ -264,3 +264,8 @@ function Broker::__node_id%(%): string
zeek::Broker::Manager::ScriptScopeGuard ssg;
return zeek::make_intrusive<zeek::StringVal>(broker_mgr->NodeId());
%}
function Broker::__peering_stats%(%): BrokerPeeringStatsTable
%{
return broker_mgr->GetPeeringStatsTable();
%}

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [manager, worker]
Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [manager, worker]
Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [manager, worker]

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [worker, manager]
Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [worker, manager]
Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [worker, manager]

View file

@ -1,6 +1,7 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
-./frameworks/cluster/broker-backpressure.zeek
-./frameworks/cluster/broker-stores.zeek
-./frameworks/cluster/broker-telemetry.zeek
-./frameworks/cluster/nodes/logger.zeek
-./frameworks/cluster/nodes/manager.zeek
-./frameworks/cluster/nodes/proxy.zeek

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
558 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()
559 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()

View file

@ -0,0 +1,55 @@
# @TEST-DOC: run a mini two-node cluster and check that Broker's peering telemetry is available.
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b manager.zeek
# @TEST-EXEC: btest-bg-run worker ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker zeek -b worker.zeek
# @TEST-EXEC: btest-bg-wait 15
#
# @TEST-EXEC: btest-diff manager/out
# @TEST-EXEC: btest-diff worker/out
# @TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT"))],
["worker"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $manager="manager"],
};
# @TEST-END-FILE
# @TEST-START-FILE common.zeek
@load base/frameworks/cluster
@load policy/frameworks/cluster/experimental
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0secs;
redef Cluster::retry_interval = 1sec;
function print_metrics(metrics: vector of Telemetry::Metric)
{
local f = open("out");
for (i in metrics)
{
local m = metrics[i];
print f, m$opts$metric_type, m$opts$prefix, m$opts$name, m$label_names, m?$label_values ? m$label_values : vector();
}
close(f);
}
event Cluster::Experimental::cluster_started()
{
local broker_metrics = Telemetry::collect_metrics("zeek_broker_peer_buffer*", "*");
print_metrics(broker_metrics);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
# @TEST-END-FILE

View file

@ -56,8 +56,6 @@ event send_any()
local e = Cluster::make_event(ping, i, type_name(val), val);
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
++i;
schedule 0.05sec { send_any() };
}
event pong(c: count, what: string, val: any)
@ -65,10 +63,17 @@ event pong(c: count, what: string, val: any)
++pongs;
print "got pong", pongs, "for ping", c, what, type_name(val), val;
# We send 5 pings in 3 different variations and
# get 4 one pong for each.
# The manager send 5 types of pings, in 3 different ways. The worker
# answers each ping in 4 ways, for a total of 60 expected pongs at the
# manager. Every batch of pings for one type involves 12 pongs.
if ( pongs == 60 )
Cluster::publish(Cluster::worker_topic, finish);
else if ( pongs > 0 && pongs % 12 == 0 )
{
# Wait for a batch to complete before sending the next.
event send_any();
}
}
event Cluster::node_up(name: string, id: string)

View file

@ -7,7 +7,7 @@
# @TEST-EXEC: btest-diff output
# This set tracks the BiFs that have been characterized for ZAM analysis.
# As new ones are added or old ones removed, attend to updating FuncInfo.cc
# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc
# for ZAM, and then update the list here.
global known_BiFs = set(
"Analyzer::__disable_all_analyzers",
@ -45,6 +45,7 @@ global known_BiFs = set(
"Broker::__opaque_clone_through_serialization",
"Broker::__peer",
"Broker::__peer_no_retry",
"Broker::__peering_stats",
"Broker::__peers",
"Broker::__pop",
"Broker::__publish_id",