From 0a05a9aa995663cfce0c505054f4c914a627324f Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 15 Apr 2025 18:08:16 -0700 Subject: [PATCH] Add peer buffer update tracking to Broker manager's logging adapter This implements basic counts for each peering's fill level and overflows. We don't use proper telemetry here since doing so makes more sense at the Zeek cluster level, so we can label updates with cluster node names. See subsequent commit for that part. --- scripts/base/frameworks/broker/main.zeek | 13 ++ src/broker/Manager.cc | 132 +++++++++++++++++- src/broker/Manager.h | 15 ++ src/broker/comm.bif | 10 ++ .../Baseline/opt.ZAM-bif-tracking/output | 2 +- testing/btest/opt/ZAM-bif-tracking.zeek | 4 +- 6 files changed, 168 insertions(+), 8 deletions(-) diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 1cd99b3771..9bba22ed09 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -391,6 +391,9 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + global peer_buffer_levels: function(): table[string] of count; + global peer_buffer_overflows: function(): table[string] of count; + ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; @@ -551,6 +554,16 @@ function node_id(): string return __node_id(); } +function peer_buffer_levels(): table[string] of count + { + return __peer_buffer_levels(); + } + +function peer_buffer_overflows(): table[string] of count + { + return __peer_buffer_overflows(); + } + function flush_logs(): count { return __flush_logs(); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index f1b8363196..e41382c66b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -92,6 +92,96 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +// Track basic metrics for a given peering's send buffer. +class PeerBufferState { +public: + struct Stats { + // Number of messages queued locally in the send buffer. + uint32_t level = 0; + + // Number of times the buffer overflowed at send time. For the + // "disconnect" overflow policy (via Broker::peer_overflow_policy), this + // count will often be short-lived, since Broker will remove the + // affected peering upon overflow. The existing Zeek-level metric for + // tracking disconnects (see frameworks/broker/broker-backpressure.zeek) + // covers this one more permanently. For the "drop_newest" and + // "drop_oldest" policies it equals a count of the number of messages + // lost, since the peering itself continues. + uint64_t overflows = 0; + }; + + // For per-peering tracking, map endpoint IDs to the above state. + using EndpointMetricMap = std::unordered_map; + + PeerBufferState(size_t peer_buffer_size) : peer_buffer_size_(peer_buffer_size) {} + + // Update the peering's stats. This gets called from Broker's execution + // context, via an event_observer. Broker does not expose send-buffer state + // explicitly, so we track by doing our own counting of arrivals (a push, + // push_or_pull is true) and departures (pull, push_or_pull is false). + void Observe(const broker::endpoint_id& peer, bool push_or_pull) { + std::lock_guard lock(mutex_); + + if ( peer_buffer_stats_.find(peer) == peer_buffer_stats_.end() ) + peer_buffer_stats_.emplace(peer, Stats()); + + auto& stats = peer_buffer_stats_[peer]; + + if ( stats.level == 0 ) { + // Watch for underflows, which indicate a bug. We could report + // somehow. Note that this runs in the context of Broker's threads. + assert(push_or_pull); + } + + // Are we about to overflow the buffer? This tracks overflows regardless + // of buffer management policy. + if ( push_or_pull && stats.level == peer_buffer_size_ ) + stats.overflows += 1; + else + stats.level += push_or_pull ? 1 : -1; + } + + // Creates a table[string] of count for the peerings' send buffer fill + // levels. The cluster frameworks uses this table to map Broker IDs to + // cluster node names, and produce proper telemetry. + void FillPeerBufferLevelsTable(zeek::TableValPtr table) const { + std::lock_guard lock(mutex_); + + for ( const auto& [peer, stats] : peer_buffer_stats_ ) { + std::string peer_s; + broker::convert(peer, peer_s); + auto peer_v = zeek::make_intrusive(peer_s); + table->Assign(std::move(peer_v), zeek::val_mgr->Count(stats.level)); + } + } + + // As above, but for overflow counts. + void FillPeerBufferOverflowsTable(zeek::TableValPtr table) const { + std::lock_guard lock(mutex_); + + for ( const auto& [peer, stats] : peer_buffer_stats_ ) { + std::string peer_s; + broker::convert(peer, peer_s); + auto peer_v = zeek::make_intrusive(peer_s); + table->Assign(std::move(peer_v), zeek::val_mgr->Count(stats.overflows)); + } + } + + void RemovePeer(const broker::endpoint_id& peer) { + std::lock_guard lock(mutex_); + peer_buffer_stats_.erase(peer); + } + +private: + size_t peer_buffer_size_; + + mutable std::mutex mutex_; + + EndpointMetricMap peer_buffer_stats_; +}; + +using PeerBufferStatePtr = std::shared_ptr; + class LoggerQueue { public: void Push(broker::event_ptr event) { @@ -132,8 +222,20 @@ class LoggerAdapter : public broker::event_observer { public: using SeverityLevel = broker::event::severity_level; - explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) - : severity_(severity), queue_(std::move(queue)) {} + explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate) + : severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {} + + void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, true); + } + + void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, false); + } + + void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override { + pbstate_->RemovePeer(peer); + } void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } @@ -142,6 +244,7 @@ public: private: SeverityLevel severity_; LoggerQueuePtr queue_; + PeerBufferStatePtr pbstate_; }; } // namespace @@ -222,15 +325,19 @@ class BrokerState { public: using SeverityLevel = LoggerAdapter::SeverityLevel; - BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue, + PeerBufferStatePtr pbstate) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), - loggerQueue(std::move(queue)) {} + loggerQueue(std::move(queue)), + peerBufferState(std::move(pbstate)) {} broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; + PeerBufferStatePtr peerBufferState; + SeverityLevel logSeverity = SeverityLevel::critical; SeverityLevel stderrSeverity = SeverityLevel::critical; std::unordered_set outbound_peerings; @@ -401,11 +508,12 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto adapter = std::make_shared(adapterVerbosity, queue); + auto pbstate = std::make_shared(options.peer_buffer_size); + auto adapter = std::make_shared(adapterVerbosity, queue, pbstate); broker::logger(adapter); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs, queue); + bstate = std::make_shared(std::move(config), cqs, queue, pbstate); bstate->logSeverity = static_cast(logSeverityVal); bstate->stderrSeverity = static_cast(stderrSeverityVal); @@ -1984,4 +2092,16 @@ void Manager::PrepareForwarding(const std::string& name) { DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); } +TableValPtr Manager::GetPeerBufferLevelsTable() const { + auto res = zeek::make_intrusive(zeek::id::find_type("table_string_of_count")); + bstate->peerBufferState->FillPeerBufferLevelsTable(res); + return res; +} + +TableValPtr Manager::GetPeerBufferOverflowsTable() const { + auto res = zeek::make_intrusive(zeek::id::find_type("table_string_of_count")); + bstate->peerBufferState->FillPeerBufferOverflowsTable(res); + return res; +} + } // namespace zeek::Broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 4c5f1711eb..f2aa7fe00f 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -383,6 +383,21 @@ public: */ const Stats& GetStatistics(); + /** + * Returns a table[string] of count with each peering's send-buffer fill + * level. The keys are Broker node IDs identifying each peer. + * @return Each peering's send-buffer fill level. + */ + TableValPtr GetPeerBufferLevelsTable() const; + + /** + * Returns a table[string] of count with the number of times each peering's + * send-buffer has overflowed upon message send-time. The keys are Broker + * node IDs identifying each peer. + * @return Each peering's send-buffer message overflow count. + */ + TableValPtr GetPeerBufferOverflowsTable() const; + /** * Creating an instance of this struct simply helps the manager * keep track of whether calls into its API are coming from script diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 0fefce871d..542389d064 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -264,3 +264,13 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeID()); %} + +function Broker::__peer_buffer_levels%(%): table_string_of_count + %{ + return broker_mgr->GetPeerBufferLevelsTable(); + %} + +function Broker::__peer_buffer_overflows%(%): table_string_of_count + %{ + return broker_mgr->GetPeerBufferOverflowsTable(); + %} diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index c20efb9801..546f4ca5f9 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +560 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 4dbce8e247..1840155598 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -7,7 +7,7 @@ # @TEST-EXEC: btest-diff output # This set tracks the BiFs that have been characterized for ZAM analysis. -# As new ones are added or old ones removed, attend to updating FuncInfo.cc +# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc # for ZAM, and then update the list here. global known_BiFs = set( "Analyzer::__disable_all_analyzers", @@ -44,6 +44,8 @@ global known_BiFs = set( "Broker::__node_id", "Broker::__opaque_clone_through_serialization", "Broker::__peer", + "Broker::__peer_buffer_levels", + "Broker::__peer_buffer_overflows", "Broker::__peer_no_retry", "Broker::__peers", "Broker::__pop",