core.network_time.broker: Test reliability improvement

I wasn't able to reproduce this locally, but after looking at
-B main-loop,tm for a bit it dawned that if the manager is sending
ticks too fast, the Broker IO source may consume two ticks in one go
before expiring timers and that would explain the observed baseline
differences.

Solve this by removing the reliance on realtime delays and switch to
a request-reply pattern instead.

Also fix indentation and bogus messages.

Fixes #3013
This commit is contained in:
Arne Welzel 2023-05-05 14:02:07 +02:00
parent 2044cf661a
commit 979f2df57d
3 changed files with 17 additions and 11 deletions

View file

@ -1,4 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
manager: listening manager: listening
manager: peer added, publishing do_continue_processing manager: peer added, publishing timer_tick() events
manager: peer lost, terminating manager: peer lost, terminating

View file

@ -17,7 +17,7 @@
44.750, timer_tick, 13, 45.0 44.750, timer_tick, 13, 45.0
45.000, timer, second timer (3 sec) 45.000, timer, second timer (3 sec)
45.000, timer_tick, 14, 45.25 45.000, timer_tick, 14, 45.25
45.250, timer, third timer (3.5 sec) 45.250, timer, third timer (3.25 sec)
45.250, timer_tick, 15, 45.5 45.250, timer_tick, 15, 45.5
45.500, timer_tick, 16, 45.75 45.500, timer_tick, 16, 45.75
45.750, timer_tick, 17, 46.0 45.750, timer_tick, 17, 46.0

View file

@ -11,6 +11,8 @@
redef allow_network_time_forward = F; redef allow_network_time_forward = F;
global timer_tock: event();
event zeek_init() event zeek_init()
{ {
print network_time(), "zeek_init: broker peering"; print network_time(), "zeek_init: broker peering";
@ -34,7 +36,8 @@ event timer(s: string)
print fmt("%.3f", network_time()), "timer", s; print fmt("%.3f", network_time()), "timer", s;
} }
# The manager tells us our network_time. # The manager sends timer_tick() with the network time to the worker which
# replies back with a timer_tock().
global received_ticks = 0; global received_ticks = 0;
event timer_tick(ts: time) &is_used event timer_tick(ts: time) &is_used
{ {
@ -48,10 +51,12 @@ event timer_tick(ts: time) &is_used
{ {
schedule 0.5sec { timer("first timer (1 sec)") }; schedule 0.5sec { timer("first timer (1 sec)") };
schedule 3sec { timer("second timer (3 sec)") }; schedule 3sec { timer("second timer (3 sec)") };
schedule 3.25sec { timer("third timer (3.5 sec)") }; schedule 3.25sec { timer("third timer (3.25 sec)") };
schedule 5sec { timer("fourth timer (10 sec)") }; schedule 5sec { timer("fourth timer (10 sec)") };
} }
Broker::publish("zeek/event/my_topic", timer_tock);
if ( received_ticks == 30 ) if ( received_ticks == 30 )
terminate(); terminate();
} }
@ -74,20 +79,21 @@ event zeek_init()
{ {
print "manager: listening"; print "manager: listening";
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
Broker::subscribe("zeek/event/my_topic");
} }
event local_tick() { # Received from the worker once it has processed the tick.
event timer_tock() &is_used
{
fake_network_time = fake_network_time + double_to_interval(0.25); fake_network_time = fake_network_time + double_to_interval(0.25);
Broker::publish("zeek/event/my_topic", timer_tick, fake_network_time); Broker::publish("zeek/event/my_topic", timer_tick, fake_network_time);
schedule 0.05 sec { local_tick() }; }
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{ {
print "manager: peer added, publishing do_continue_processing"; print "manager: peer added, publishing timer_tick() events";
Broker::publish("zeek/event/my_topic", timer_tick, fake_network_time); Broker::publish("zeek/event/my_topic", timer_tick, fake_network_time);
schedule 0.05 sec { local_tick() }; }
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{ {