diff --git a/CHANGES b/CHANGES index a7cf56e01c..b004f188d4 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,17 @@ +7.2.0-dev.644 | 2025-04-25 10:02:58 -0700 + + * Add basic btest to verify that Broker peering telemetry is available. (Christian Kreibich, Corelight) + + * Add cluster framework telemetry for Broker's send-buffer use (Christian Kreibich, Corelight) + + See NEWS or scripts/base/frameworks/cluster/broker-telemetry.zeek for details. + + * Add peer buffer update tracking to the Broker manager's event_observer (Christian Kreibich, Corelight) + + * Rename the Broker manager's LoggerAdapter (Christian Kreibich, Corelight) + + * Avoid race in the cluster/broker/publish-any btest (Christian Kreibich, Corelight) + 7.2.0-dev.638 | 2025-04-25 06:41:06 -0700 * Skip linting on highwayhash and src/3rdparty files (Tim Wojtulewicz, Corelight) diff --git a/VERSION b/VERSION index 0ccf1e55ed..7709feadd5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.638 +7.2.0-dev.644 diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index a3704b7157..7402d39ecb 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -104,6 +104,10 @@ export { ## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients. const web_socket_overflow_policy = "disconnect" &redef; + ## How frequently Zeek resets some peering/client buffer statistics, + ## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`. + const buffer_stats_reset_interval = 1min &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be @@ -392,6 +396,12 @@ export { ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; + ## Obtain each peering's send-buffer statistics. The keys are Broker + ## endpoint IDs. + ## + ## Returns: per-peering statistics. + global peering_stats: function(): table[string] of BrokerPeeringStats; + ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; @@ -554,6 +564,11 @@ function node_id(): string return __node_id(); } +function peering_stats(): table[string] of BrokerPeeringStats + { + return __peering_stats(); + } + function flush_logs(): count { return __flush_logs(); diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 0d6372e3d4..85fef40c5f 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -14,8 +14,11 @@ redef Broker::log_topic = Cluster::rr_log_topic; # Add a cluster prefix. @prefixes += cluster -# This should soon condition on loading only when Broker is in use. +# Broker-specific additions: +@if ( Cluster::backend == Cluster::CLUSTER_BACKEND_BROKER ) @load ./broker-backpressure +@load ./broker-telemetry +@endif @if ( Supervisor::is_supervised() ) # When running a supervised cluster, populate Cluster::nodes from the node table diff --git a/scripts/base/frameworks/cluster/broker-telemetry.zeek b/scripts/base/frameworks/cluster/broker-telemetry.zeek new file mode 100644 index 0000000000..7aa1e8fb3f --- /dev/null +++ b/scripts/base/frameworks/cluster/broker-telemetry.zeek @@ -0,0 +1,69 @@ +# Additional Broker-specific metrics that use Zeek cluster-level node names. + +@load base/frameworks/telemetry + +module Cluster; + +## This gauge tracks the current number of locally queued messages in each +## Broker peering's send buffer. The "peer" label identifies the remote side of +## the peering, containing a Zeek cluster node name. +global broker_peer_buffer_messages_gf = Telemetry::register_gauge_family([ + $prefix="zeek", + $name="broker-peer-buffer-messages", + $unit="", + $label_names=vector("peer"), + $help_text="Number of messages queued in Broker's send buffers", +]); + +## This gauge tracks recent maximum queue lengths for each Broker peering's send +## buffer. Most of the time the send buffers are nearly empty, so this gauge +## helps understand recent bursts of messages. "Recent" here means +## :zeek:see:`Broker::buffer_stats_reset_interval`. The time window advances in +## increments of at least the stats interval, not incrementally with every new +## observed message. That is, Zeek keeps a timestamp of when the window started, +## and once it notices that the interval has passed, it moves the start of the +## window to current time. +global broker_peer_buffer_recent_max_messages_gf = Telemetry::register_gauge_family([ + $prefix="zeek", + $name="broker-peer-buffer-recent-max-messages", + $unit="", + $label_names=vector("peer"), + $help_text="Maximum number of messages recently queued in Broker's send buffers", +]); + +## This counter tracks for each Broker peering the number of times its send +## buffer has overflowed. For the "disconnect" policy this can at most be 1, +## since Broker stops the peering at this time. For the "drop_oldest" and +## "drop_newest" policies (see :zeek:see:`Broker:peer_overflow_policy`) the count +## instead reflects the number of messages lost. +global broker_peer_buffer_overflows_cf = Telemetry::register_counter_family([ + $prefix="zeek", + $name="broker-peer-buffer-overflows", + $unit="", + $label_names=vector("peer"), + $help_text="Number of overflows in Broker's send buffers", +]); + +hook Telemetry::sync() + { + local peers = Broker::peering_stats(); + local nn: NamedNode; + + for ( peer, stats in peers ) + { + # Translate the Broker IDs to Zeek-level node names. We skip + # telemetry for peers where this mapping fails, i.e. ones for + # connections to external systems. + nn = nodeid_to_node(peer); + + if ( |nn$name| > 0 ) + { + Telemetry::gauge_family_set(broker_peer_buffer_messages_gf, + vector(nn$name), stats$num_queued); + Telemetry::gauge_family_set(broker_peer_buffer_recent_max_messages_gf, + vector(nn$name), stats$max_queued_recently); + Telemetry::counter_family_set(broker_peer_buffer_overflows_cf, + vector(nn$name), stats$num_overflows); + } + } + } diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index eaeb9f70ca..02d5ab599d 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -1135,6 +1135,20 @@ type BrokerStats: record { num_ids_outgoing: count; }; +## Broker statistics for an individual peering. +## +type BrokerPeeringStats: record { + ## The number of messages currently queued locally for transmission. + num_queued: count; + ## The maximum number of messages queued in the recent + ## :zeek:see:`Broker::buffer_stats_reset_interval` time interval. + max_queued_recently: count; + ## The number of times the send buffer has overflowed. + num_overflows: count; +}; + +type BrokerPeeringStatsTable: table[string] of BrokerPeeringStats; + ## Statistics about reporter messages and weirds. ## ## .. zeek:see:: get_reporter_stats diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index dee1c162cf..853255523b 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -92,6 +92,174 @@ void print_escaped(std::string& buf, std::string_view str) { buf.push_back('"'); } +// Track metrics for a given peering's send buffer. +class PeerBufferState { +public: + struct Stats { + // The rendered peer ID. Storing this here helps reuse. + // Note that we only ever touch this from Zeek's main thread, not + // any of Broker's. + zeek::StringValPtr peer_id; + + // Whether Broker has removed the peer, and this instance still + // needs to be removed. + bool is_zombie = false; + + // Number of messages queued locally in the send buffer. + uint32_t queued = 0; + + // Maximum number queued in the last Broker::buffer_stats_reset_interval. + // This improces visibility into message bursts since instantaneous + // queueing (captured above) can be short-lived. + uint32_t max_queued_recently = 0; + + // Number of times the buffer overflowed at send time. For the + // "disconnect" overflow policy (via Broker::peer_overflow_policy), this + // count will at most be 1 since Broker will remove the peering upon + // overflow. The existing Zeek-level metric for tracking disconnects + // (see frameworks/broker/broker-backpressure.zeek) covers this one more + // permanently. For the "drop_newest" and "drop_oldest" policies it + // equals a count of the number of messages lost, since the peering + // continues. + uint64_t overflows = 0; + + // When we last started a stats-tracking interval for this peering. + double last_interval = 0; + }; + + // For per-peering tracking, map endpoint IDs to the above state. + using EndpointMetricMap = std::unordered_map; + + PeerBufferState(size_t a_buffer_size, double a_stats_reset_interval) + : buffer_size(a_buffer_size), stats_reset_interval(a_stats_reset_interval) { + stats_table = + zeek::make_intrusive(zeek::id::find_type("BrokerPeeringStatsTable")); + stats_record_type = zeek::id::find_type("BrokerPeeringStats"); + } + + void SetEndpoint(const broker::endpoint* a_endpoint) { endpoint = a_endpoint; } + + // Update the peering's stats. This runs in Broker's execution context. + // Broker does not expose send-buffer/queue state explicitly, so track + // arrivals (a push, is_push == true) and departures (a pull, is_push == + // false) as they happen. Note that this must not touch Zeek-side Vals. + void Observe(const broker::endpoint_id& peer, bool is_push) { + std::lock_guard lock(mutex); + auto it = stats_map.find(peer); + + if ( it == stats_map.end() ) { + stats_map.emplace(peer, Stats()); + it = stats_map.find(peer); + } + + auto& stats = it->second; + + // Stick to Broker's notion of time here. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + if ( stats.queued == 0 ) { + // Watch for underflows. We could report somehow. Note that this + // runs in the context of Broker's threads. + assert(is_push); + } + + if ( is_push && stats.queued == buffer_size ) + stats.overflows += 1; + else { + stats.queued += is_push ? 1 : -1; + if ( stats.queued > stats.max_queued_recently ) + stats.max_queued_recently = stats.queued; + } + } + + // Updates the internal table[string] of BrokerPeeringStats and returns it. + const zeek::TableValPtr& GetPeeringStatsTable() { + std::lock_guard lock(mutex); + + for ( auto it = stats_map.begin(); it != stats_map.end(); ) { + auto& peer = it->first; + auto& stats = it->second; + + if ( stats.peer_id == nullptr ) + stats.peer_id = PeerIdToStringVal(peer); + + // Broker told us the peer is gone, in RemovePeer() below. Remove it + // now from both tables. We add/remove from stats_table only here, + // not in Observer() and/or RemovePeer(), to ensure we only touch + // the Zeek-side Table from Zeek's main thread. + if ( stats.is_zombie ) { + stats_table->Remove(*stats.peer_id); + it = stats_map.erase(it); + continue; + } + + auto stats_v = stats_table->Find(stats.peer_id); + + if ( stats_v == nullptr ) { + stats_v = zeek::make_intrusive(stats_record_type); + stats_table->Assign(stats.peer_id, stats_v); + } + + // We may get here more than stats_reset_interval after the last + // Observe(), in which case the max_queued_recently value is now + // stale. Update if so. + double now{0}; + if ( endpoint != nullptr ) + broker::convert(endpoint->now(), now); + + if ( now - stats.last_interval > stats_reset_interval ) { + stats.last_interval = now; + stats.max_queued_recently = stats.queued; + } + + int n = 0; + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.queued)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.max_queued_recently)); + stats_v->AsRecordVal()->Assign(n++, zeek::val_mgr->Count(stats.overflows)); + + ++it; + } + + return stats_table; + } + + void RemovePeer(const broker::endpoint_id& peer) { + std::lock_guard lock(mutex); + if ( auto it = stats_map.find(peer); it != stats_map.end() ) + it->second.is_zombie = true; + } + +private: + zeek::StringValPtr PeerIdToStringVal(const broker::endpoint_id& peer) const { + std::string peer_s; + broker::convert(peer, peer_s); + return zeek::make_intrusive(peer_s); + } + + // The maximum number of messages queueable for transmission to a peer, + // see Broker::peer_buffer_size and Broker::web_socket_buffer_size. + size_t buffer_size; + + // Seconds after which we reset stats tracked per time window. + double stats_reset_interval; + + EndpointMetricMap stats_map; + zeek::TableValPtr stats_table; + zeek::RecordTypePtr stats_record_type; + + mutable std::mutex mutex; + const broker::endpoint* endpoint = nullptr; +}; + +using PeerBufferStatePtr = std::shared_ptr; + class LoggerQueue { public: void Push(broker::event_ptr event) { @@ -128,26 +296,40 @@ using LoggerQueuePtr = std::shared_ptr; using BrokerSeverityLevel = broker::event::severity_level; -class LoggerAdapter : public broker::event_observer { +class Observer : public broker::event_observer { public: - using SeverityLevel = broker::event::severity_level; + using LogSeverityLevel = broker::event::severity_level; - explicit LoggerAdapter(SeverityLevel severity, LoggerQueuePtr queue) - : severity_(severity), queue_(std::move(queue)) {} + explicit Observer(LogSeverityLevel severity, LoggerQueuePtr queue, PeerBufferStatePtr pbstate) + : severity_(severity), queue_(std::move(queue)), pbstate_(std::move(pbstate)) {} + + void on_peer_buffer_push(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, true); + } + + void on_peer_buffer_pull(const broker::endpoint_id& peer, const broker::node_message&) override { + pbstate_->Observe(peer, false); + } + + void on_peer_disconnect(const broker::endpoint_id& peer, const broker::error&) override { + pbstate_->RemovePeer(peer); + } void observe(broker::event_ptr what) override { queue_->Push(std::move(what)); } - bool accepts(SeverityLevel severity, broker::event::component_type) const override { return severity <= severity_; } + bool accepts(LogSeverityLevel severity, broker::event::component_type) const override { + return severity <= severity_; + } private: - SeverityLevel severity_; + LogSeverityLevel severity_; LoggerQueuePtr queue_; + PeerBufferStatePtr pbstate_; }; } // namespace namespace zeek::Broker { - static inline Val* get_option(const char* option) { const auto& id = zeek::detail::global_scope()->Find(option); @@ -220,19 +402,24 @@ struct opt_mapping { class BrokerState { public: - using SeverityLevel = LoggerAdapter::SeverityLevel; + using LogSeverityLevel = Observer::LogSeverityLevel; - BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue) + BrokerState(broker::configuration config, size_t congestion_queue_size, LoggerQueuePtr queue, + PeerBufferStatePtr pbstate) : endpoint(std::move(config), telemetry_mgr->GetRegistry()), subscriber( endpoint.make_subscriber({broker::topic::statuses(), broker::topic::errors()}, congestion_queue_size)), - loggerQueue(std::move(queue)) {} + loggerQueue(std::move(queue)), + peerBufferState(std::move(pbstate)) { + peerBufferState->SetEndpoint(&endpoint); + } broker::endpoint endpoint; broker::subscriber subscriber; LoggerQueuePtr loggerQueue; - SeverityLevel logSeverity = SeverityLevel::critical; - SeverityLevel stderrSeverity = SeverityLevel::critical; + PeerBufferStatePtr peerBufferState; + LogSeverityLevel logSeverity = LogSeverityLevel::critical; + LogSeverityLevel stderrSeverity = LogSeverityLevel::critical; std::unordered_set outbound_peerings; }; @@ -402,11 +589,13 @@ void Manager::DoInitPostScript() { checkLogSeverity(stderrSeverityVal); auto adapterVerbosity = static_cast(std::max(logSeverityVal, stderrSeverityVal)); auto queue = std::make_shared(); - auto adapter = std::make_shared(adapterVerbosity, queue); - broker::logger(adapter); // *must* be called before creating the BrokerState + auto pbstate = std::make_shared(options.peer_buffer_size, + get_option("Broker::buffer_stats_reset_interval")->AsDouble()); + auto observer = std::make_shared(adapterVerbosity, queue, pbstate); + broker::logger(observer); // *must* be called before creating the BrokerState auto cqs = get_option("Broker::congestion_queue_size")->AsCount(); - bstate = std::make_shared(std::move(config), cqs, queue); + bstate = std::make_shared(std::move(config), cqs, queue, pbstate); bstate->logSeverity = static_cast(logSeverityVal); bstate->stderrSeverity = static_cast(stderrSeverityVal); @@ -1968,6 +2157,8 @@ const Stats& Manager::GetStatistics() { return statistics; } +TableValPtr Manager::GetPeeringStatsTable() { return bstate->peerBufferState->GetPeeringStatsTable(); } + bool Manager::AddForwardedStore(const std::string& name, TableValPtr table) { if ( forwarded_stores.find(name) != forwarded_stores.end() ) { reporter->Error("same &broker_store %s specified for two different variables", name.c_str()); diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 1673818349..d9c4a4fa81 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -384,6 +384,14 @@ public: */ const Stats& GetStatistics(); + /** + * Returns a table[string] of BrokerPeeringStats, with each peering's + * send-buffer stats filled in. The keys are Broker node IDs identifying the + * current peers. + * @return Each peering's send-buffer statistics. + */ + TableValPtr GetPeeringStatsTable(); + /** * Creating an instance of this struct simply helps the manager * keep track of whether calls into its API are coming from script diff --git a/src/broker/comm.bif b/src/broker/comm.bif index d348b5ebff..29e9d84680 100644 --- a/src/broker/comm.bif +++ b/src/broker/comm.bif @@ -264,3 +264,8 @@ function Broker::__node_id%(%): string zeek::Broker::Manager::ScriptScopeGuard ssg; return zeek::make_intrusive(broker_mgr->NodeId()); %} + +function Broker::__peering_stats%(%): BrokerPeeringStatsTable + %{ + return broker_mgr->GetPeeringStatsTable(); + %} diff --git a/testing/btest/Baseline/broker.telemetry/manager.out b/testing/btest/Baseline/broker.telemetry/manager.out new file mode 100644 index 0000000000..ce348b2b02 --- /dev/null +++ b/testing/btest/Baseline/broker.telemetry/manager.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [manager, worker] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [manager, worker] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [manager, worker] diff --git a/testing/btest/Baseline/broker.telemetry/worker.out b/testing/btest/Baseline/broker.telemetry/worker.out new file mode 100644 index 0000000000..b2486b0ace --- /dev/null +++ b/testing/btest/Baseline/broker.telemetry/worker.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Telemetry::COUNTER, zeek, zeek_broker_peer_buffer_overflows_total, [endpoint, peer], [worker, manager] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_messages, [endpoint, peer], [worker, manager] +Telemetry::GAUGE, zeek, zeek_broker_peer_buffer_recent_max_messages, [endpoint, peer], [worker, manager] diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 9997ec4fd8..93af34614e 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -1,6 +1,7 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -./frameworks/cluster/broker-backpressure.zeek -./frameworks/cluster/broker-stores.zeek +-./frameworks/cluster/broker-telemetry.zeek -./frameworks/cluster/nodes/logger.zeek -./frameworks/cluster/nodes/manager.zeek -./frameworks/cluster/nodes/proxy.zeek diff --git a/testing/btest/Baseline/opt.ZAM-bif-tracking/output b/testing/btest/Baseline/opt.ZAM-bif-tracking/output index c20efb9801..67cc8f4838 100644 --- a/testing/btest/Baseline/opt.ZAM-bif-tracking/output +++ b/testing/btest/Baseline/opt.ZAM-bif-tracking/output @@ -1,2 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -558 seen BiFs, 0 unseen BiFs (), 0 new BiFs () +559 seen BiFs, 0 unseen BiFs (), 0 new BiFs () diff --git a/testing/btest/broker/telemetry.zeek b/testing/btest/broker/telemetry.zeek new file mode 100644 index 0000000000..3008dd72f0 --- /dev/null +++ b/testing/btest/broker/telemetry.zeek @@ -0,0 +1,55 @@ +# @TEST-DOC: run a mini two-node cluster and check that Broker's peering telemetry is available. +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run manager ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=manager zeek -b manager.zeek +# @TEST-EXEC: btest-bg-run worker ZEEKPATH=$ZEEKPATH:.. CLUSTER_NODE=worker zeek -b worker.zeek +# @TEST-EXEC: btest-bg-wait 15 +# +# @TEST-EXEC: btest-diff manager/out +# @TEST-EXEC: btest-diff worker/out + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT"))], + ["worker"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $manager="manager"], +}; +# @TEST-END-FILE + +# @TEST-START-FILE common.zeek +@load base/frameworks/cluster +@load policy/frameworks/cluster/experimental + +redef exit_only_after_terminate = T; +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0secs; +redef Cluster::retry_interval = 1sec; + +function print_metrics(metrics: vector of Telemetry::Metric) + { + local f = open("out"); + + for (i in metrics) + { + local m = metrics[i]; + print f, m$opts$metric_type, m$opts$prefix, m$opts$name, m$label_names, m?$label_values ? m$label_values : vector(); + } + + close(f); + } + +event Cluster::Experimental::cluster_started() + { + local broker_metrics = Telemetry::collect_metrics("zeek_broker_peer_buffer*", "*"); + print_metrics(broker_metrics); + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek +# @TEST-END-FILE diff --git a/testing/btest/cluster/broker/publish-any.zeek b/testing/btest/cluster/broker/publish-any.zeek index 6271548116..818d153ce4 100644 --- a/testing/btest/cluster/broker/publish-any.zeek +++ b/testing/btest/cluster/broker/publish-any.zeek @@ -56,8 +56,6 @@ event send_any() local e = Cluster::make_event(ping, i, type_name(val), val); Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); ++i; - - schedule 0.05sec { send_any() }; } event pong(c: count, what: string, val: any) @@ -65,10 +63,17 @@ event pong(c: count, what: string, val: any) ++pongs; print "got pong", pongs, "for ping", c, what, type_name(val), val; - # We send 5 pings in 3 different variations and - # get 4 one pong for each. + # The manager send 5 types of pings, in 3 different ways. The worker + # answers each ping in 4 ways, for a total of 60 expected pongs at the + # manager. Every batch of pings for one type involves 12 pongs. + if ( pongs == 60 ) Cluster::publish(Cluster::worker_topic, finish); + else if ( pongs > 0 && pongs % 12 == 0 ) + { + # Wait for a batch to complete before sending the next. + event send_any(); + } } event Cluster::node_up(name: string, id: string) diff --git a/testing/btest/opt/ZAM-bif-tracking.zeek b/testing/btest/opt/ZAM-bif-tracking.zeek index 4dbce8e247..5184cd68a6 100644 --- a/testing/btest/opt/ZAM-bif-tracking.zeek +++ b/testing/btest/opt/ZAM-bif-tracking.zeek @@ -7,7 +7,7 @@ # @TEST-EXEC: btest-diff output # This set tracks the BiFs that have been characterized for ZAM analysis. -# As new ones are added or old ones removed, attend to updating FuncInfo.cc +# As new ones are added or old ones removed, update src/script_opt/FuncInfo.cc # for ZAM, and then update the list here. global known_BiFs = set( "Analyzer::__disable_all_analyzers", @@ -45,6 +45,7 @@ global known_BiFs = set( "Broker::__opaque_clone_through_serialization", "Broker::__peer", "Broker::__peer_no_retry", + "Broker::__peering_stats", "Broker::__peers", "Broker::__pop", "Broker::__publish_id",