diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index a3704b7157..7402d39ecb 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -104,6 +104,10 @@ export { ## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients. const web_socket_overflow_policy = "disconnect" &redef; + ## How frequently Zeek resets some peering/client buffer statistics, + ## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`. + const buffer_stats_reset_interval = 1min &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be @@ -392,6 +396,12 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + ## Obtain each peering's send-buffer statistics. The keys are Broker + ## endpoint IDs. + ## + ## Returns: per-peering statistics. + global peering_stats: function(): table[string] of BrokerPeeringStats; + ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; @@ -554,6 +564,11 @@ function node_id(): string return __node_id(); } +function peering_stats(): table[string] of BrokerPeeringStats + { + return __peering_stats(); + } + function flush_logs(): count { return __flush_logs(); diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 98b16e103e..c8e6c7440a 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -1135,6 +1135,20 @@ type BrokerStats: record { num_ids_outgoing: count; }; +## Broker statistics for an individual peering. +## +type BrokerPeeringStats: record { + ## The number of messages currently queued locally for transmission. + num_queued: count; + ## The maximum number of messages queued in the recent + ## :zeek:see:`Broker::buffer_stats_reset_interval` time interval. + max_queued_recently: count; + ## The number of times the send buffer has overflowed. + num_overflows: count; +}; + +type BrokerPeeringStatsTable: table[string] of BrokerPeeringStats; + ## Statistics about reporter messages and weirds. ## ## .. zeek:see:: get_reporter_stats diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 717d562592..853255523b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -92,6 +92,174 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +// Track metrics for a given peering's send buffer. +class PeerBufferState { +public: + struct Stats { + // The rendered peer ID. Storing this here helps reuse. + // Note that we only ever touch this from Zeek's main thread, not + // any of Broker's. + zeek::StringValPtr peer_id; + + // Whether Broker has removed the peer, and this instance still + // needs to be removed. + bool is_zombie = false; + + // Number of messages queued locally in the send buffer. + uint32_t queued = 0; + + // Maximum number queued in the last Broker::buffer_stats_reset_interval. + // This improces visibility into message bursts since instantaneous + // queueing (captured above) can be short-lived. + uint32_t max_queued_recently = 0; + + // Number of times the buffer overflowed at send time. For the + // "disconnect" overflow policy (via Broker::peer_overflow_policy), this + // count will at most be 1 since Broker will remove the peering upon + // overflow. The existing Zeek-level metric for tracking disconnects + // (see frameworks/broker/broker-backpressure.zeek) covers this one more + // permanently. For the "drop_newest" and "drop_oldest" policies it + // equals a count of the number of messages lost, since the peering + // continues. + uint64_t overflows = 0; + + // When we last started a stats-tracking interval for this peering. + double last_interval = 0; + }; + + // For per-peering tracking, map endpoint IDs to the above state. + using EndpointMetricMap = std::unordered_map; + + PeerBufferState(size_t a_buffer_size, double a_stats_reset_interval) + : buffer_size(a_buffer_size), stats_reset_interval(a_stats_reset_interval) { + stats_table = + zeek::make_intrusive(zeek::id::find_type("BrokerPeeringStatsTable")); + stats_record_type = zeek::id::find_type("BrokerPeeringStats"); + } + + void SetEndpoint(const broker::endpoint* a_endpoint) { endpoint = a_endpoint; } + + // Update the peering's stats. This runs in Broker's execution context. + // Broker does not expose send-buffer/queue state explicitly, so track + // arrivals (a push, is_push == true) and departures (a pull, is_push == + // false) as they happen. Note that this must not touch Zeek-side Vals. + void Observe(const broker::endpoint_id& peer, bool is_push) { + std::lock_guard lock(mutex); + auto it = stats_map.find(peer); + + if ( it == stats_map.end() ) { + stats_map.emplace(peer, Stats()); + it = stats_map.find(peer); + } + + auto& stats = it->second; + + // Stick to Broker's notion of time here. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + if ( stats.queued == 0 ) { + // Watch for underflows. We could report somehow. Note that this + // runs in the context of Broker's threads. + assert(is_push); + } + + if ( is_push && stats.queued == buffer_size ) + stats.overflows += 1; + else { + stats.queued += is_push ? 1 : -1; + if ( stats.queued > stats.max_queued_recently ) + stats.max_queued_recently = stats.queued; + } + } + + // Updates the internal table[string] of BrokerPeeringStats and returns it. + const zeek::TableValPtr& GetPeeringStatsTable() { + std::lock_guard lock(mutex); + + for ( auto it = stats_map.begin(); it != stats_map.end(); ) { + auto& peer = it->first; + auto& stats = it->second; + + if ( stats.peer_id == nullptr ) + stats.peer_id = PeerIdToStringVal(peer); + + // Broker told us the peer is gone, in RemovePeer() below. Remove it + // now from both tables. We add/remove from stats_table only here, + // not in Observer() and/or RemovePeer(), to ensure we only touch + // the Zeek-side Table from Zeek's main thread. + if ( stats.is_zombie ) { + stats_table->Remove(*stats.peer_id); + it = stats_map.erase(it); + continue; + } + + auto stats_v = stats_table->Find(stats.peer_id); + + if ( stats_v == nullptr ) { + stats_v = zeek::make_intrusive(stats_record_type); + stats_table->Assign(stats.peer_id, stats_v); + } + + // We may get here more than stats_reset_interval after the last + // Observe(), in which case the max_queued_recently value is now + // stale. Update if so. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + int n = 0; + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.queued)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.max_queued_recently)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.overflows)); + + ++it; + } + + return stats_table; + } + + void RemovePeer(const broker::endpoint_id& peer) { + std::lock_guard lock(mutex); + if ( auto it = stats_map.find(peer); it != stats_map.end() ) + it->second.is_zombie = true; + } + +private: + zeek::StringValPtr PeerIdToStringVal(const broker::endpoint_id& peer) const { + std::string peer_s; + broker::convert(peer, peer_s); + return zeek::make_intrusive(peer_s); + } + + // The maximum number of messages queueable for transmission to a peer, + // see Broker::peer_buffer_size and Broker::web_socket_buffer_size. + size_t buffer_size; + + // Seconds after which we reset stats tracked per time window. + double stats_reset_interval; + + EndpointMetricMap stats_map; + zeek::TableValPtr stats_table; + zeek::RecordTypePtr stats_record_type; + + mutable std::mutex mutex; + const broker::endpoint* endpoint = nullptr; +}; + +using PeerBufferStatePtr = std::shared_ptr; + class LoggerQueue { public: void Push(broker::event_ptr event) { @@ -132,8 +300,20 @@ class Observer : public broker::event_observer { public: using LogSeverityLevel = broker::event::severity_level; - explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue) - : severity_(severity), queue_(std::move(queue)) {} + explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate) + : severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {} + + void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, true); + } + + void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, false); + } + + void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override { + pbstate_->RemovePeer(peer); + } void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } @@ -144,12 +324,12 @@ public: private: LogSeverityLevel severity_; LoggerQueuePtr queue_; + PeerBufferStatePtr pbstate_; }; } // namespace namespace zeek::Broker { - static inline Val* get_option(const char* option) { const auto& id = zeek::detail::global_scope()->Find(option); @@ -224,15 +404,20 @@ class BrokerState { public: using LogSeverityLevel = Observer::LogSeverityLevel; - BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue, + PeerBufferStatePtr pbstate) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), - loggerQueue(std::move(queue)) {} + loggerQueue(std::move(queue)), + peerBufferState(std::move(pbstate)) { + peerBufferState->SetEndpoint(&endpoint); + } broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; + PeerBufferStatePtr peerBufferState; LogSeverityLevel logSeverity = LogSeverityLevel::critical; LogSeverityLevel stderrSeverity = LogSeverityLevel::critical; std::unordered_set outbound_peerings; @@ -404,11 +589,13 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto observer = std::make_shared(adapterVerbosity, queue); + auto pbstate = std::make_shared(options.peer_buffer_size, + get_option("Broker::buffer_stats_reset_interval")->AsDouble()); + auto observer = std::make_shared(adapterVerbosity, queue, pbstate); broker::logger(observer); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs, queue); + bstate = std::make_shared(std::move(config), cqs, queue, pbstate); bstate->logSeverity = static_cast(logSeverityVal); bstate->stderrSeverity = static_cast(stderrSeverityVal); @@ -1970,6 +2157,8 @@ const Stats& Manager::GetStatistics() { return statistics; } +TableValPtr Manager::GetPeeringStatsTable() { return bstate->peerBufferState->GetPeeringStatsTable(); } + bool Manager::AddForwardedStore(const std::string& name, TableValPtr table) { if ( forwarded_stores.find(name) != forwarded_stores.end() ) { reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 1673818349..d9c4a4fa81 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -384,6 +384,14 @@ public: */ const Stats& GetStatistics(); + /** + * Returns a table[string] of BrokerPeeringStats, with each peering's + * send-buffer stats filled in. The keys are Broker node IDs identifying the + * current peers. + * @return Each peering's send-buffer statistics. + */ + TableValPtr GetPeeringStatsTable(); + /** * Creating an instance of this struct simply helps the manager * keep track of whether calls into its API are coming from script diff --git a/src/broker/comm.bif b/src/broker/comm.bif index d348b5ebff..29e9d84680 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -264,3 +264,8 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeId()); %} + +function Broker::__peering_stats%(%): BrokerPeeringStatsTable + %{ + return broker_mgr->GetPeeringStatsTable(); + %} diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index c20efb9801..67cc8f4838 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +559 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 4dbce8e247..5184cd68a6 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -7,7 +7,7 @@ # @TEST-EXEC: btest-diff output # This set tracks the BiFs that have been characterized for ZAM analysis. -# As new ones are added or old ones removed, attend to updating FuncInfo.cc +# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc # for ZAM, and then update the list here. global known_BiFs = set( "Analyzer::__disable_all_analyzers", @@ -45,6 +45,7 @@ global known_BiFs = set( "Broker::__opaque_clone_through_serialization", "Broker::__peer", "Broker::__peer_no_retry", + "Broker::__peering_stats", "Broker::__peers", "Broker::__pop", "Broker::__publish_id",