diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 1cd99b3771..9bba22ed09 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -391,6 +391,9 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + global peer_buffer_levels: function(): table[string] of count; + global peer_buffer_overflows: function(): table[string] of count; + ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; @@ -551,6 +554,16 @@ function node_id(): string return __node_id(); } +function peer_buffer_levels(): table[string] of count + { + return __peer_buffer_levels(); + } + +function peer_buffer_overflows(): table[string] of count + { + return __peer_buffer_overflows(); + } + function flush_logs(): count { return __flush_logs(); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index f1b8363196..e41382c66b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -92,6 +92,96 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +// Track basic metrics for a given peering's send buffer. +class PeerBufferState { +public: + struct Stats { + // Number of messages queued locally in the send buffer. + uint32_t level = 0; + + // Number of times the buffer overflowed at send time. For the + // "disconnect" overflow policy (via Broker::peer_overflow_policy), this + // count will often be short-lived, since Broker will remove the + // affected peering upon overflow. The existing Zeek-level metric for + // tracking disconnects (see frameworks/broker/broker-backpressure.zeek) + // covers this one more permanently. For the "drop_newest" and + // "drop_oldest" policies it equals a count of the number of messages + // lost, since the peering itself continues. + uint64_t overflows = 0; + }; + + // For per-peering tracking, map endpoint IDs to the above state. + using EndpointMetricMap = std::unordered_map; + + PeerBufferState(size_t peer_buffer_size) : peer_buffer_size_(peer_buffer_size) {} + + // Update the peering's stats. This gets called from Broker's execution + // context, via an event_observer. Broker does not expose send-buffer state + // explicitly, so we track by doing our own counting of arrivals (a push, + // push_or_pull is true) and departures (pull, push_or_pull is false). + void Observe(const broker::endpoint_id& peer, bool push_or_pull) { + std::lock_guard lock(mutex_); + + if ( peer_buffer_stats_.find(peer) == peer_buffer_stats_.end() ) + peer_buffer_stats_.emplace(peer, Stats()); + + auto& stats = peer_buffer_stats_[peer]; + + if ( stats.level == 0 ) { + // Watch for underflows, which indicate a bug. We could report + // somehow. Note that this runs in the context of Broker's threads. + assert(push_or_pull); + } + + // Are we about to overflow the buffer? This tracks overflows regardless + // of buffer management policy. + if ( push_or_pull && stats.level == peer_buffer_size_ ) + stats.overflows += 1; + else + stats.level += push_or_pull ? 1 : -1; + } + + // Creates a table[string] of count for the peerings' send buffer fill + // levels. The cluster frameworks uses this table to map Broker IDs to + // cluster node names, and produce proper telemetry. + void FillPeerBufferLevelsTable(zeek::TableValPtr table) const { + std::lock_guard lock(mutex_); + + for ( const auto& [peer, stats] : peer_buffer_stats_ ) { + std::string peer_s; + broker::convert(peer, peer_s); + auto peer_v = zeek::make_intrusive(peer_s); + table->Assign(std::move(peer_v), zeek::val_mgr->Count(stats.level)); + } + } + + // As above, but for overflow counts. + void FillPeerBufferOverflowsTable(zeek::TableValPtr table) const { + std::lock_guard lock(mutex_); + + for ( const auto& [peer, stats] : peer_buffer_stats_ ) { + std::string peer_s; + broker::convert(peer, peer_s); + auto peer_v = zeek::make_intrusive(peer_s); + table->Assign(std::move(peer_v), zeek::val_mgr->Count(stats.overflows)); + } + } + + void RemovePeer(const broker::endpoint_id& peer) { + std::lock_guard lock(mutex_); + peer_buffer_stats_.erase(peer); + } + +private: + size_t peer_buffer_size_; + + mutable std::mutex mutex_; + + EndpointMetricMap peer_buffer_stats_; +}; + +using PeerBufferStatePtr = std::shared_ptr; + class LoggerQueue { public: void Push(broker::event_ptr event) { @@ -132,8 +222,20 @@ class LoggerAdapter : public broker::event_observer { public: using SeverityLevel = broker::event::severity_level; - explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) - : severity_(severity), queue_(std::move(queue)) {} + explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate) + : severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {} + + void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, true); + } + + void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, false); + } + + void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override { + pbstate_->RemovePeer(peer); + } void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } @@ -142,6 +244,7 @@ public: private: SeverityLevel severity_; LoggerQueuePtr queue_; + PeerBufferStatePtr pbstate_; }; } // namespace @@ -222,15 +325,19 @@ class BrokerState { public: using SeverityLevel = LoggerAdapter::SeverityLevel; - BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue, + PeerBufferStatePtr pbstate) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), - loggerQueue(std::move(queue)) {} + loggerQueue(std::move(queue)), + peerBufferState(std::move(pbstate)) {} broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; + PeerBufferStatePtr peerBufferState; + SeverityLevel logSeverity = SeverityLevel::critical; SeverityLevel stderrSeverity = SeverityLevel::critical; std::unordered_set outbound_peerings; @@ -401,11 +508,12 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto adapter = std::make_shared(adapterVerbosity, queue); + auto pbstate = std::make_shared(options.peer_buffer_size); + auto adapter = std::make_shared(adapterVerbosity, queue, pbstate); broker::logger(adapter); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs, queue); + bstate = std::make_shared(std::move(config), cqs, queue, pbstate); bstate->logSeverity = static_cast(logSeverityVal); bstate->stderrSeverity = static_cast(stderrSeverityVal); @@ -1984,4 +2092,16 @@ void Manager::PrepareForwarding(const std::string& name) { DBG_LOG(DBG_BROKER, "Resolved table forward for data store %s", name.c_str()); } +TableValPtr Manager::GetPeerBufferLevelsTable() const { + auto res = zeek::make_intrusive(zeek::id::find_type("table_string_of_count")); + bstate->peerBufferState->FillPeerBufferLevelsTable(res); + return res; +} + +TableValPtr Manager::GetPeerBufferOverflowsTable() const { + auto res = zeek::make_intrusive(zeek::id::find_type("table_string_of_count")); + bstate->peerBufferState->FillPeerBufferOverflowsTable(res); + return res; +} + } // namespace zeek::Broker diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 4c5f1711eb..f2aa7fe00f 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -383,6 +383,21 @@ public: */ const Stats& GetStatistics(); + /** + * Returns a table[string] of count with each peering's send-buffer fill + * level. The keys are Broker node IDs identifying each peer. + * @return Each peering's send-buffer fill level. + */ + TableValPtr GetPeerBufferLevelsTable() const; + + /** + * Returns a table[string] of count with the number of times each peering's + * send-buffer has overflowed upon message send-time. The keys are Broker + * node IDs identifying each peer. + * @return Each peering's send-buffer message overflow count. + */ + TableValPtr GetPeerBufferOverflowsTable() const; + /** * Creating an instance of this struct simply helps the manager * keep track of whether calls into its API are coming from script diff --git a/src/broker/comm.bif b/src/broker/comm.bif index 0fefce871d..542389d064 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -264,3 +264,13 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeID()); %} + +function Broker::__peer_buffer_levels%(%): table_string_of_count + %{ + return broker_mgr->GetPeerBufferLevelsTable(); + %} + +function Broker::__peer_buffer_overflows%(%): table_string_of_count + %{ + return broker_mgr->GetPeerBufferOverflowsTable(); + %} diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index c20efb9801..546f4ca5f9 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +560 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 4dbce8e247..1840155598 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -7,7 +7,7 @@ # @TEST-EXEC: btest-diff output # This set tracks the BiFs that have been characterized for ZAM analysis. -# As new ones are added or old ones removed, attend to updating FuncInfo.cc +# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc # for ZAM, and then update the list here. global known_BiFs = set( "Analyzer::__disable_all_analyzers", @@ -44,6 +44,8 @@ global known_BiFs = set( "Broker::__node_id", "Broker::__opaque_clone_through_serialization", "Broker::__peer", + "Broker::__peer_buffer_levels", + "Broker::__peer_buffer_overflows", "Broker::__peer_no_retry", "Broker::__peers", "Broker::__pop",