From 89780514fac173b806c690bbd236de7d0554c252 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Thu, 17 Apr 2025 14:58:43 -0700 Subject: [PATCH 1/5] Avoid race in the cluster/broker/publish-any btest On very busy machines the hardwired scheduling of the ping batches could move around among the arriving pongs, causing baseline deviations. We now wait for each batch to complete before triggering the next one. --- testing/btest/cluster/broker/publish-any.zeek | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/testing/btest/cluster/broker/publish-any.zeek b/testing/btest/cluster/broker/publish-any.zeek index 6271548116..818d153ce4 100644 --- a/testing/btest/cluster/broker/publish-any.zeek +++ b/testing/btest/cluster/broker/publish-any.zeek @@ -56,8 +56,6 @@ event send_any() local e = Cluster::make_event(ping, i, type_name(val), val); Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); ++i; - - schedule 0.05sec { send_any() }; } event pong(c: count, what: string, val: any) @@ -65,10 +63,17 @@ event pong(c: count, what: string, val: any) ++pongs; print "got pong", pongs, "for ping", c, what, type_name(val), val; - # We send 5 pings in 3 different variations and - # get 4 one pong for each. + # The manager send 5 types of pings, in 3 different ways. The worker + # answers each ping in 4 ways, for a total of 60 expected pongs at the + # manager. Every batch of pings for one type involves 12 pongs. + if ( pongs == 60 ) Cluster::publish(Cluster::worker_topic, finish); + else if ( pongs > 0 && pongs % 12 == 0 ) + { + # Wait for a batch to complete before sending the next. + event send_any(); + } } event Cluster::node_up(name: string, id: string) From 23554280e0ab6d0e6ff6f72550b3d7a6da67c4c1 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 23 Apr 2025 22:46:02 -0700 Subject: [PATCH 2/5] Rename the Broker manager's LoggerAdapter This is about to do more than just log handling, so this renames it simply to Observer, reflecting the fact that it implements broker::event_observer. --- src/broker/Manager.cc | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index dee1c162cf..717d562592 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -128,19 +128,21 @@ using LoggerQueuePtr = std::shared_ptr; using BrokerSeverityLevel = broker::event::severity_level; -class LoggerAdapter : public broker::event_observer { +class Observer : public broker::event_observer { public: - using SeverityLevel = broker::event::severity_level; + using LogSeverityLevel = broker::event::severity_level; - explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) + explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue) : severity_(severity), queue_(std::move(queue)) {} void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } - bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; } + bool accepts(LogSeverityLevel severity, broker::event::component_type) const override { + return severity <= severity_; + } private: - SeverityLevel severity_; + LogSeverityLevel severity_; LoggerQueuePtr queue_; }; @@ -220,7 +222,7 @@ struct opt_mapping { class BrokerState { public: - using SeverityLevel = LoggerAdapter::SeverityLevel; + using LogSeverityLevel = Observer::LogSeverityLevel; BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), @@ -231,8 +233,8 @@ public: broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; - SeverityLevel logSeverity = SeverityLevel::critical; - SeverityLevel stderrSeverity = SeverityLevel::critical; + LogSeverityLevel logSeverity = LogSeverityLevel::critical; + LogSeverityLevel stderrSeverity = LogSeverityLevel::critical; std::unordered_set outbound_peerings; }; @@ -402,8 +404,8 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto adapter = std::make_shared(adapterVerbosity, queue); - broker::logger(adapter); // *must* be called before creating the BrokerState + auto observer = std::make_shared(adapterVerbosity, queue); + broker::logger(observer); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); bstate = std::make_shared(std::move(config), cqs, queue); From f5fbad23ffead8f752946e0c2692a166b9dcd092 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 15 Apr 2025 18:08:16 -0700 Subject: [PATCH 3/5] Add peer buffer update tracking to the Broker manager's event_observer This implements basic tracking of each peering's current fill level, the maximum level over a recent time interval (via a new Broker::buffer_stats_reset_interval tunable, defaulting to 1min), and the number of times a buffer overflows. For the disconnect policy this is the number of depeerings, but for drop_newest and drop_oldest it implies the number of messages lost. This doesn't use "proper" telemetry metrics for a few reasons: this tracking is Broker-specific, so we need to track each peering via endpoint_ids, while we want the metrics to use Cluster node name labels, and the latter live in the script layer. Using broker::endpoint_id directly as keys also means we rely on their ability to hash in STL containers, which should be fast. This does not track the buffer levels for Broker "clients" (as opposed to "peers"), i.e. WebSockets, since we currently don't have a way to name these, and we don't want to use ephemeral Broker IDs in their telemetry. To make the stats accessible to the script layer the Broker manager (via a new helper class that lives in the event_observer) maintains a TableVal mapping Broker IDs to a new BrokerPeeringStats record. The table's members get updated every time that table is requested. This minimizes new val instantiation and allows the script layer to customize the BrokerPeeringStats record by redefing, updating fields, etc. Since we can't use Zeek vals outside the main thread, this requires some care so all table updates happen only in the Zeek-side table updater, PeerBufferState::GetPeeringStatsTable(). --- scripts/base/frameworks/broker/main.zeek | 15 ++ scripts/base/init-bare.zeek | 14 ++ src/broker/Manager.cc | 203 +++++++++++++++++- src/broker/Manager.h | 8 + src/broker/comm.bif | 5 + .../Baseline/opt.ZAM-bif-tracking/output | 2 +- testing/btest/opt/ZAM-bif-tracking.zeek | 3 +- 7 files changed, 241 insertions(+), 9 deletions(-) diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index a3704b7157..7402d39ecb 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -104,6 +104,10 @@ export { ## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients. const web_socket_overflow_policy = "disconnect" &redef; + ## How frequently Zeek resets some peering/client buffer statistics, + ## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`. + const buffer_stats_reset_interval = 1min &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be @@ -392,6 +396,12 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + ## Obtain each peering's send-buffer statistics. The keys are Broker + ## endpoint IDs. + ## + ## Returns: per-peering statistics. + global peering_stats: function(): table[string] of BrokerPeeringStats; + ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; @@ -554,6 +564,11 @@ function node_id(): string return __node_id(); } +function peering_stats(): table[string] of BrokerPeeringStats + { + return __peering_stats(); + } + function flush_logs(): count { return __flush_logs(); diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 98b16e103e..c8e6c7440a 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -1135,6 +1135,20 @@ type BrokerStats: record { num_ids_outgoing: count; }; +## Broker statistics for an individual peering. +## +type BrokerPeeringStats: record { + ## The number of messages currently queued locally for transmission. + num_queued: count; + ## The maximum number of messages queued in the recent + ## :zeek:see:`Broker::buffer_stats_reset_interval` time interval. + max_queued_recently: count; + ## The number of times the send buffer has overflowed. + num_overflows: count; +}; + +type BrokerPeeringStatsTable: table[string] of BrokerPeeringStats; + ## Statistics about reporter messages and weirds. ## ## .. zeek:see:: get_reporter_stats diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 717d562592..853255523b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -92,6 +92,174 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +// Track metrics for a given peering's send buffer. +class PeerBufferState { +public: + struct Stats { + // The rendered peer ID. Storing this here helps reuse. + // Note that we only ever touch this from Zeek's main thread, not + // any of Broker's. + zeek::StringValPtr peer_id; + + // Whether Broker has removed the peer, and this instance still + // needs to be removed. + bool is_zombie = false; + + // Number of messages queued locally in the send buffer. + uint32_t queued = 0; + + // Maximum number queued in the last Broker::buffer_stats_reset_interval. + // This improces visibility into message bursts since instantaneous + // queueing (captured above) can be short-lived. + uint32_t max_queued_recently = 0; + + // Number of times the buffer overflowed at send time. For the + // "disconnect" overflow policy (via Broker::peer_overflow_policy), this + // count will at most be 1 since Broker will remove the peering upon + // overflow. The existing Zeek-level metric for tracking disconnects + // (see frameworks/broker/broker-backpressure.zeek) covers this one more + // permanently. For the "drop_newest" and "drop_oldest" policies it + // equals a count of the number of messages lost, since the peering + // continues. + uint64_t overflows = 0; + + // When we last started a stats-tracking interval for this peering. + double last_interval = 0; + }; + + // For per-peering tracking, map endpoint IDs to the above state. + using EndpointMetricMap = std::unordered_map; + + PeerBufferState(size_t a_buffer_size, double a_stats_reset_interval) + : buffer_size(a_buffer_size), stats_reset_interval(a_stats_reset_interval) { + stats_table = + zeek::make_intrusive(zeek::id::find_type("BrokerPeeringStatsTable")); + stats_record_type = zeek::id::find_type("BrokerPeeringStats"); + } + + void SetEndpoint(const broker::endpoint* a_endpoint) { endpoint = a_endpoint; } + + // Update the peering's stats. This runs in Broker's execution context. + // Broker does not expose send-buffer/queue state explicitly, so track + // arrivals (a push, is_push == true) and departures (a pull, is_push == + // false) as they happen. Note that this must not touch Zeek-side Vals. + void Observe(const broker::endpoint_id& peer, bool is_push) { + std::lock_guard lock(mutex); + auto it = stats_map.find(peer); + + if ( it == stats_map.end() ) { + stats_map.emplace(peer, Stats()); + it = stats_map.find(peer); + } + + auto& stats = it->second; + + // Stick to Broker's notion of time here. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + if ( stats.queued == 0 ) { + // Watch for underflows. We could report somehow. Note that this + // runs in the context of Broker's threads. + assert(is_push); + } + + if ( is_push && stats.queued == buffer_size ) + stats.overflows += 1; + else { + stats.queued += is_push ? 1 : -1; + if ( stats.queued > stats.max_queued_recently ) + stats.max_queued_recently = stats.queued; + } + } + + // Updates the internal table[string] of BrokerPeeringStats and returns it. + const zeek::TableValPtr& GetPeeringStatsTable() { + std::lock_guard lock(mutex); + + for ( auto it = stats_map.begin(); it != stats_map.end(); ) { + auto& peer = it->first; + auto& stats = it->second; + + if ( stats.peer_id == nullptr ) + stats.peer_id = PeerIdToStringVal(peer); + + // Broker told us the peer is gone, in RemovePeer() below. Remove it + // now from both tables. We add/remove from stats_table only here, + // not in Observer() and/or RemovePeer(), to ensure we only touch + // the Zeek-side Table from Zeek's main thread. + if ( stats.is_zombie ) { + stats_table->Remove(*stats.peer_id); + it = stats_map.erase(it); + continue; + } + + auto stats_v = stats_table->Find(stats.peer_id); + + if ( stats_v == nullptr ) { + stats_v = zeek::make_intrusive(stats_record_type); + stats_table->Assign(stats.peer_id, stats_v); + } + + // We may get here more than stats_reset_interval after the last + // Observe(), in which case the max_queued_recently value is now + // stale. Update if so. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + int n = 0; + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.queued)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.max_queued_recently)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.overflows)); + + ++it; + } + + return stats_table; + } + + void RemovePeer(const broker::endpoint_id& peer) { + std::lock_guard lock(mutex); + if ( auto it = stats_map.find(peer); it != stats_map.end() ) + it->second.is_zombie = true; + } + +private: + zeek::StringValPtr PeerIdToStringVal(const broker::endpoint_id& peer) const { + std::string peer_s; + broker::convert(peer, peer_s); + return zeek::make_intrusive(peer_s); + } + + // The maximum number of messages queueable for transmission to a peer, + // see Broker::peer_buffer_size and Broker::web_socket_buffer_size. + size_t buffer_size; + + // Seconds after which we reset stats tracked per time window. + double stats_reset_interval; + + EndpointMetricMap stats_map; + zeek::TableValPtr stats_table; + zeek::RecordTypePtr stats_record_type; + + mutable std::mutex mutex; + const broker::endpoint* endpoint = nullptr; +}; + +using PeerBufferStatePtr = std::shared_ptr; + class LoggerQueue { public: void Push(broker::event_ptr event) { @@ -132,8 +300,20 @@ class Observer : public broker::event_observer { public: using LogSeverityLevel = broker::event::severity_level; - explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue) - : severity_(severity), queue_(std::move(queue)) {} + explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate) + : severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {} + + void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, true); + } + + void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, false); + } + + void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override { + pbstate_->RemovePeer(peer); + } void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } @@ -144,12 +324,12 @@ public: private: LogSeverityLevel severity_; LoggerQueuePtr queue_; + PeerBufferStatePtr pbstate_; }; } // namespace namespace zeek::Broker { - static inline Val* get_option(const char* option) { const auto& id = zeek::detail::global_scope()->Find(option); @@ -224,15 +404,20 @@ class BrokerState { public: using LogSeverityLevel = Observer::LogSeverityLevel; - BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue, + PeerBufferStatePtr pbstate) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), - loggerQueue(std::move(queue)) {} + loggerQueue(std::move(queue)), + peerBufferState(std::move(pbstate)) { + peerBufferState->SetEndpoint(&endpoint); + } broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; + PeerBufferStatePtr peerBufferState; LogSeverityLevel logSeverity = LogSeverityLevel::critical; LogSeverityLevel stderrSeverity = LogSeverityLevel::critical; std::unordered_set outbound_peerings; @@ -404,11 +589,13 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto observer = std::make_shared(adapterVerbosity, queue); + auto pbstate = std::make_shared(options.peer_buffer_size, + get_option("Broker::buffer_stats_reset_interval")->AsDouble()); + auto observer = std::make_shared(adapterVerbosity, queue, pbstate); broker::logger(observer); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs, queue); + bstate = std::make_shared(std::move(config), cqs, queue, pbstate); bstate->logSeverity = static_cast(logSeverityVal); bstate->stderrSeverity = static_cast(stderrSeverityVal); @@ -1970,6 +2157,8 @@ const Stats& Manager::GetStatistics() { return statistics; } +TableValPtr Manager::GetPeeringStatsTable() { return bstate->peerBufferState->GetPeeringStatsTable(); } + bool Manager::AddForwardedStore(const std::string& name, TableValPtr table) { if ( forwarded_stores.find(name) != forwarded_stores.end() ) { reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 1673818349..d9c4a4fa81 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -384,6 +384,14 @@ public: */ const Stats& GetStatistics(); + /** + * Returns a table[string] of BrokerPeeringStats, with each peering's + * send-buffer stats filled in. The keys are Broker node IDs identifying the + * current peers. + * @return Each peering's send-buffer statistics. + */ + TableValPtr GetPeeringStatsTable(); + /** * Creating an instance of this struct simply helps the manager * keep track of whether calls into its API are coming from script diff --git a/src/broker/comm.bif b/src/broker/comm.bif index d348b5ebff..29e9d84680 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -264,3 +264,8 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeId()); %} + +function Broker::__peering_stats%(%): BrokerPeeringStatsTable + %{ + return broker_mgr->GetPeeringStatsTable(); + %} diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index c20efb9801..67cc8f4838 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +559 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 4dbce8e247..5184cd68a6 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -7,7 +7,7 @@ # @TEST-EXEC: btest-diff output # This set tracks the BiFs that have been characterized for ZAM analysis. -# As new ones are added or old ones removed, attend to updating FuncInfo.cc +# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc # for ZAM, and then update the list here. global known_BiFs = set( "Analyzer::__disable_all_analyzers", @@ -45,6 +45,7 @@ global known_BiFs = set( "Broker::__opaque_clone_through_serialization", "Broker::__peer", "Broker::__peer_no_retry", + "Broker::__peering_stats", "Broker::__peers", "Broker::__pop", "Broker::__publish_id", From 88a0cda8ca33b3912478df0ab1afcb4a273a1ee0 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 16 Apr 2025 18:00:28 -0700 Subject: [PATCH 4/5] Add cluster framework telemetry for Broker's send-buffer use This hooks into Telemetry::sync() to update Broker-level metrics tracking the peerings' send buffer state. We do this in the cluster framework so we can label the resulting metrics with Zeek cluster node names, not Broker's endpoint IDs. --- scripts/base/frameworks/cluster/__load__.zeek | 5 +- .../frameworks/cluster/broker-telemetry.zeek | 69 +++++++++++++++++++ .../coverage.init-default/missing_loads | 1 + 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 scripts/base/frameworks/cluster/broker-telemetry.zeek diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 0d6372e3d4..85fef40c5f 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -14,8 +14,11 @@ redef Broker::log_topic = Cluster::rr_log_topic; # Add a cluster prefix. @prefixes += cluster -# This should soon condition on loading only when Broker is in use. +# Broker-specific additions: +@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) @load ./broker-backpressure +@load ./broker-telemetry +@endif @if ( Supervisor::is_supervised() ) # When running a supervised cluster, populate Cluster::nodes from the node table diff --git a/scripts/base/frameworks/cluster/broker-telemetry.zeek b/scripts/base/frameworks/cluster/broker-telemetry.zeek new file mode 100644 index 0000000000..7aa1e8fb3f --- /dev/null +++ b/scripts/base/frameworks/cluster/broker-telemetry.zeek @@ -0,0 +1,69 @@ +# Additional Broker-specific metrics that use Zeek cluster-level node names. + +@load base/frameworks/telemetry + +module Cluster; + +## This gauge tracks the current number of locally queued messages in each +## Broker peering's send buffer. The "peer" label identifies the remote side of +## the peering, containing a Zeek cluster node name. +global broker_peer_buffer_messages_gf = Telemetry::register_gauge_family([ + $prefix="zeek", + $name="broker-peer-buffer-messages", + $unit="", + $label_names=vector("peer"), + $help_text="Number of messages queued in Broker's send buffers", +]); + +## This gauge tracks recent maximum queue lengths for each Broker peering's send +## buffer. Most of the time the send buffers are nearly empty, so this gauge +## helps understand recent bursts of messages. "Recent" here means +## :zeek:see:`Broker::buffer_stats_reset_interval`. The time window advances in +## increments of at least the stats interval, not incrementally with every new +## observed message. That is, Zeek keeps a timestamp of when the window started, +## and once it notices that the interval has passed, it moves the start of the +## window to current time. +global broker_peer_buffer_recent_max_messages_gf = Telemetry::register_gauge_family([ + $prefix="zeek", + $name="broker-peer-buffer-recent-max-messages", + $unit="", + $label_names=vector("peer"), + $help_text="Maximum number of messages recently queued in Broker's send buffers", +]); + +## This counter tracks for each Broker peering the number of times its send +## buffer has overflowed. For the "disconnect" policy this can at most be 1, +## since Broker stops the peering at this time. For the "drop_oldest" and +## "drop_newest" policies (see :zeek:see:`Broker:peer_overflow_policy`) the count +## instead reflects the number of messages lost. +global broker_peer_buffer_overflows_cf = Telemetry::register_counter_family([ + $prefix="zeek", + $name="broker-peer-buffer-overflows", + $unit="", + $label_names=vector("peer"), + $help_text="Number of overflows in Broker's send buffers", +]); + +hook Telemetry::sync() + { + local peers = Broker::peering_stats(); + local nn: NamedNode; + + for ( peer, stats in peers ) + { + # Translate the Broker IDs to Zeek-level node names. We skip + # telemetry for peers where this mapping fails, i.e. ones for + # connections to external systems. + nn = nodeid_to_node(peer); + + if ( |nn$name| > 0 ) + { + Telemetry::gauge_family_set(broker_peer_buffer_messages_gf, + vector(nn$name), stats$num_queued); + Telemetry::gauge_family_set(broker_peer_buffer_recent_max_messages_gf, + vector(nn$name), stats$max_queued_recently); + Telemetry::counter_family_set(broker_peer_buffer_overflows_cf, + vector(nn$name), stats$num_overflows); + } + } + } diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 9997ec4fd8..93af34614e 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,6 +1,7 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -./frameworks/cluster/broker-backpressure.zeek -./frameworks/cluster/broker-stores.zeek +-./frameworks/cluster/broker-telemetry.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek From 35ab9d5c807a43de43f5fd8c4dee505eade0fc22 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 23 Apr 2025 23:46:43 -0700 Subject: [PATCH 5/5] Add basic btest to verify that Broker peering telemetry is available. --- .../Baseline/broker.telemetry/manager.out | 4 ++ .../Baseline/broker.telemetry/worker.out | 4 ++ testing/btest/broker/telemetry.zeek | 55 +++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 testing/btest/Baseline/broker.telemetry/manager.out create mode 100644 testing/btest/Baseline/broker.telemetry/worker.out create mode 100644 testing/btest/broker/telemetry.zeek diff --git a/testing/btest/Baseline/broker.telemetry/manager.out b/testing/btest/Baseline/broker.telemetry/manager.out new file mode 100644 index 0000000000..ce348b2b02 --- /dev/null +++ b/testing/btest/Baseline/broker.telemetry/manager.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [manager, worker] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [manager, worker] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [manager, worker] diff --git a/testing/btest/Baseline/broker.telemetry/worker.out b/testing/btest/Baseline/broker.telemetry/worker.out new file mode 100644 index 0000000000..b2486b0ace --- /dev/null +++ b/testing/btest/Baseline/broker.telemetry/worker.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [worker, manager] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [worker, manager] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [worker, manager] diff --git a/testing/btest/broker/telemetry.zeek b/testing/btest/broker/telemetry.zeek new file mode 100644 index 0000000000..3008dd72f0 --- /dev/null +++ b/testing/btest/broker/telemetry.zeek @@ -0,0 +1,55 @@ +# @TEST-DOC: run a mini two-node cluster and check that Broker's peering telemetry is available. +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b manager.zeek +# @TEST-EXEC: btest-bg-run worker ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker zeek -b worker.zeek +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff worker/out + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT"))], + ["worker"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $manager="manager"], +}; +# @TEST-END-FILE + +# @TEST-START-FILE common.zeek +@load base/frameworks/cluster +@load policy/frameworks/cluster/experimental + +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; +redef Cluster::retry_interval = 1sec; + +function print_metrics(metrics: vector of Telemetry::Metric) + { + local f = open("out"); + + for (i in metrics) + { + local m = metrics[i]; + print f, m$opts$metric_type, m$opts$prefix, m$opts$name, m$label_names, m?$label_values ? m$label_values : vector(); + } + + close(f); + } + +event Cluster::Experimental::cluster_started() + { + local broker_metrics = Telemetry::collect_metrics("zeek_broker_peer_buffer*", "*"); + print_metrics(broker_metrics); + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek +# @TEST-END-FILE