diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index c7dcb6aa31..27db949ee8 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -142,7 +142,7 @@ private: std::map subscription_callbacks; std::set xpub_subscriptions; - // Overload policy , initialized in DoInitPostScript() + // Overflow policy and metrics. Initialized in DoInitPostScript(). OverflowPolicy overflow_policy = OverflowPolicy::Block; zeek::telemetry::CounterPtr total_xpub_blocks; zeek::telemetry::CounterPtr total_xpub_drops; diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/manager.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/manager.out index e6ee55fdaa..fef70ab672 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/manager.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/manager.out @@ -1,18 +1,12 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -B nodes_up, 2 -B nodes_up, 3 -B nodes_up, 4 +nodes_up, 2 +nodes_up, 3 +nodes_up, 4 +B sending first tick +sending finish nodes_down, 2 nodes_down, 3 nodes_down, 4 -drop_c, { -[proxy] = 0, -[worker-2] = 0, -[worker-1] = 0 -} -last_c, { -[proxy] = 50000, -[worker-2] = 50000, -[worker-1] = 50000 -} +zeek_done drop_c {\x0a\x09[proxy] = 0,\x0a\x09[worker-2] = 0,\x0a\x09[worker-1] = 0\x0a} +zeek_done last_c {\x0a\x09[proxy] = 50000,\x0a\x09[worker-2] = 50000,\x0a\x09[worker-1] = 50000\x0a} GOOD: Observed no XPUB blocks on manager diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/proxy.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/proxy.out index ff136e2aa4..5696907e32 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/proxy.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/proxy.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB blocks diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-1.out index ff136e2aa4..5696907e32 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-1.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-1.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB blocks diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-2.out index ff136e2aa4..5696907e32 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-2.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-block/worker-2.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB blocks diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/manager.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/manager.out index 890c46ec65..e50ee46f94 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/manager.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/manager.out @@ -1,7 +1,8 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -B nodes_up, 2 -B nodes_up, 3 -B nodes_up, 4 +nodes_up, 2 +nodes_up, 3 +nodes_up, 4 +sending finish nodes_down, 2 nodes_down, 3 nodes_down, 4 diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/proxy.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/proxy.out index 4e0c579ea4..1ea50ca75d 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/proxy.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/proxy.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB drops diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-1.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-1.out index 4e0c579ea4..1ea50ca75d 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-1.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-1.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB drops diff --git a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-2.out b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-2.out index 4e0c579ea4..1ea50ca75d 100644 --- a/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-2.out +++ b/testing/btest/Baseline/cluster.zeromq.overflow-policy-drop/worker-2.out @@ -1,4 +1,2 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -start, XXXXXXXXXX.XXXXXX -zeek_done, XXXXXXXXXX.XXXXXX GOOD: Observed XPUB drops diff --git a/testing/btest/cluster/zeromq/overflow-policy-block.zeek b/testing/btest/cluster/zeromq/overflow-policy-block.zeek index 097e367905..5e78fd3a7d 100644 --- a/testing/btest/cluster/zeromq/overflow-policy-block.zeek +++ b/testing/btest/cluster/zeromq/overflow-policy-block.zeek @@ -28,9 +28,9 @@ # @TEST-START-FILE common.zeek @load ./zeromq-test-bootstrap -global start: event(); -global finish: event(name: string); -global ping: event(sender: string, c: count); +global tick: event() &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; # Lower high watermarks from 1000 (default) to something much lower to provoke blocking. redef Cluster::Backend::ZeroMQ::xpub_sndhwm = 20; @@ -52,17 +52,20 @@ global nodes_up: set[string] = {"manager"}; global nodes_down: set[string] = {"manager"}; event send_finish() { + print "sending finish"; for ( n in nodes_up ) Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); } event Cluster::node_up(name: string, id: string) { add nodes_up[name]; - print "B nodes_up", |nodes_up|; + print "nodes_up", |nodes_up|; - if ( |nodes_up| == 4 ) { - Cluster::publish(Cluster::worker_topic, start); - Cluster::publish(Cluster::proxy_topic, start); + # Get the ball rolling once all nodes are available. + if ( |nodes_up| == |Cluster::nodes| ) { + print "B sending first tick"; + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); } } @@ -103,8 +106,8 @@ event ping(sender: string, c: count) { } event zeek_done() { - print "drop_c", drop_c; - print "last_c", last_c; + print fmt("zeek_done drop_c %s", drop_c); + print fmt("zeek_done last_c %s", last_c); local blocks = get_zeromq_blocks(); if ( blocks == 0 ) @@ -119,6 +122,8 @@ event zeek_done() { @load ./common.zeek global publishes = 0; + +# How many events to publish during a tick() const batch = 100; event tick() { @@ -135,17 +140,12 @@ event tick() { schedule 0.01msec { tick() }; } -event start() { - print "start", current_time(); - event tick(); -} - -event finish(name: string) { +# Send by manager to stop the test. +event finish(name: string) &is_used { terminate(); } event zeek_done() { - print "zeek_done", current_time(); local blocks = get_zeromq_blocks(); if ( blocks > 0 ) print "GOOD: Observed XPUB blocks"; diff --git a/testing/btest/cluster/zeromq/overflow-policy-drop.zeek b/testing/btest/cluster/zeromq/overflow-policy-drop.zeek index 249067d004..690ad7e516 100644 --- a/testing/btest/cluster/zeromq/overflow-policy-drop.zeek +++ b/testing/btest/cluster/zeromq/overflow-policy-drop.zeek @@ -28,9 +28,9 @@ # @TEST-START-FILE common.zeek @load ./zeromq-test-bootstrap -global start: event(); -global finish: event(name: string); -global ping: event(sender: string, c: count); +global tick: event() &is_used; +global finish: event(name: string) &is_used; +global ping: event(sender: string, c: count) &is_used; # Lower high watermarks from 1000 (default) to something much lower to provoke drops. redef Cluster::Backend::ZeroMQ::xpub_sndhwm = 20; @@ -55,17 +55,19 @@ global nodes_up: set[string] = {"manager"}; global nodes_down: set[string] = {"manager"}; event send_finish() { + print "sending finish"; for ( n in nodes_up ) Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); } event Cluster::node_up(name: string, id: string) { add nodes_up[name]; - print "B nodes_up", |nodes_up|; + print "nodes_up", |nodes_up|; - if ( |nodes_up| == 4 ) { - Cluster::publish(Cluster::worker_topic, start); - Cluster::publish(Cluster::proxy_topic, start); + # Get the ball rolling once all nodes are available. + if ( |nodes_up| == |Cluster::nodes| ) { + Cluster::publish(Cluster::worker_topic, tick); + Cluster::publish(Cluster::proxy_topic, tick); } } @@ -149,17 +151,11 @@ event tick() { schedule s { tick() }; } -event start() { - print "start", current_time(); - event tick(); -} - event finish(name: string) { terminate(); } event zeek_done() { - print "zeek_done", current_time(); local drops = get_zeromq_drops(); if ( drops > 0 ) print "GOOD: Observed XPUB drops";