diff --git a/CHANGES b/CHANGES index 6ae617acdc..a5c1a5c265 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,31 @@ +7.1.0-dev.511 | 2024-11-12 16:00:00 +0100 + + * PktSrc: Remove first_timestamp condition check (Arne Welzel, Corelight) + + The comment is stale and first_timestamp is only relevant/available + in pseudo_realtime. + + * PktSrc: Fix includes (Arne Welzel, Corelight) + + * PktSrc/RunState: Scale on first_wallclock and move pseudo realtime logic to RunState (Arne Welzel, Corelight) + + check_pseudo_time() used zeek_start_time which skews things sufficiently + around being in the past when ZAM compilation takes multiple seconds. Switch + to using first_wallclock instead. + + Further, move setting of first_timestamp and first_wallclock from PktSrc + into RunState's dispatch_packet(), so it's more centralized now. + + The only pseudo_realtime piece left in PktSrc() is in GetNextTimeout() to + determine how long the PktSrc is idle until the next packet is ready. + + * RunState.h: Deprecate misleadingly named current_packet_timestamp() (Arne Welzel, Corelight) + + This returns current_pseudo, naming it current_packet_timestamp() + is actively misleading. + + * debug: Add processing suspended/continued to debug.log (Arne Welzel, Corelight) + 7.1.0-dev.505 | 2024-11-12 10:26:15 +0100 * ZAM fixes for assignments involving "any" record fields (Vern Paxson, Corelight) diff --git a/VERSION b/VERSION index 50f3af2132..1f23fcb068 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.1.0-dev.505 +7.1.0-dev.511 diff --git a/src/RunState.cc b/src/RunState.cc index 6add4134aa..1074100fa6 100644 --- a/src/RunState.cc +++ b/src/RunState.cc @@ -129,8 +129,6 @@ void update_network_time(double new_network_time) { static bool should_forward_network_time() { // In pseudo_realtime mode, always update time once // we've dispatched and processed the first packet. - // run_state::detail::first_timestamp is currently set - // in PktSrc::ExtractNextPacketInternal() if ( pseudo_realtime != 0.0 && run_state::detail::first_timestamp != 0.0 ) return true; @@ -221,7 +219,19 @@ void expire_timers() { } void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) { - double t = run_state::pseudo_realtime ? check_pseudo_time(pkt) : pkt->time; + double t = pkt->time; + + if ( pseudo_realtime != 0.0 ) { + current_wallclock = util::current_time(true); + + if ( first_wallclock == 0.0 ) { + first_wallclock = util::current_time(true); + first_timestamp = pkt->time; + } + + // Scale pkt time based on pseudo_realtime + t = check_pseudo_time(pkt); + } if ( ! zeek_start_network_time ) { zeek_start_network_time = t; @@ -244,9 +254,6 @@ void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) { processing_start_time = 0.0; // = "we're not processing now" current_dispatched = 0; - if ( pseudo_realtime && ! first_wallclock ) - first_wallclock = util::current_time(true); - current_iosrc = nullptr; current_pktsrc = nullptr; } @@ -399,10 +406,19 @@ void delete_run() { } double check_pseudo_time(const Packet* pkt) { + assert(pkt->time > 0.0); + assert(first_wallclock > 0.0); + assert(first_timestamp > 0.0); double pseudo_time = pkt->time - first_timestamp; double ct = (util::current_time(true) - first_wallclock) * pseudo_realtime; - current_pseudo = pseudo_time <= ct ? zeek_start_time + pseudo_time : 0; + current_pseudo = pseudo_time <= ct ? first_wallclock + pseudo_time : 0; + + DBG_LOG(DBG_MAINLOOP, + "check_pseudo_time: first_wallclock=%.6f first_timestamp=%.6f pkt->time=%.6f pseudo_time=%.6f ct=%.6f " + "current_pseudo=%.6f", + first_wallclock, first_timestamp, pkt->time, pseudo_time, ct, current_pseudo); + return current_pseudo; } @@ -438,14 +454,17 @@ double current_timestamp = 0.0; static int _processing_suspended = 0; void suspend_processing() { - if ( _processing_suspended == 0 ) + if ( _processing_suspended == 0 ) { + DBG_LOG(DBG_MAINLOOP, "processing suspended"); reporter->Info("processing suspended"); + } ++_processing_suspended; } void continue_processing() { if ( _processing_suspended == 1 ) { + DBG_LOG(DBG_MAINLOOP, "processing continued"); reporter->Info("processing continued"); detail::current_wallclock = util::current_time(true); } diff --git a/src/RunState.h b/src/RunState.h index af710bf8ea..1015aa888a 100644 --- a/src/RunState.h +++ b/src/RunState.h @@ -70,6 +70,7 @@ extern void suspend_processing(); extern void continue_processing(); bool is_processing_suspended(); +[[deprecated("Remove with v8.1. Use run_state::current_pseudo directly if needed.")]] extern double current_packet_timestamp(); extern double current_packet_wallclock(); diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index 2e49f3e8ff..786e5a3a40 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -2,17 +2,13 @@ #include "zeek/iosource/PktSrc.h" -#include "zeek/zeek-config.h" - #include -#include "zeek/Hash.h" +#include "zeek/DebugLogger.h" #include "zeek/RunState.h" -#include "zeek/broker/Manager.h" #include "zeek/iosource/BPF_Program.h" #include "zeek/iosource/Manager.h" #include "zeek/iosource/pcap/pcap.bif.h" -#include "zeek/packet_analysis/Manager.h" #include "zeek/session/Manager.h" #include "zeek/util.h" @@ -131,14 +127,10 @@ bool PktSrc::ExtractNextPacketInternal() { have_packet = false; - // Don't return any packets if processing is suspended (except for the - // very first packet which we need to set up times). - if ( run_state::is_processing_suspended() && run_state::detail::first_timestamp ) + // Don't return any packets if processing is suspended. + if ( run_state::is_processing_suspended() ) return false; - if ( run_state::pseudo_realtime ) - run_state::detail::current_wallclock = util::current_time(true); - if ( ExtractNextPacket(¤t_packet) ) { had_packet = true; @@ -147,9 +139,6 @@ bool PktSrc::ExtractNextPacketInternal() { return false; } - if ( ! run_state::detail::first_timestamp ) - run_state::detail::first_timestamp = current_packet.time; - have_packet = true; return true; } diff --git a/src/util.cc b/src/util.cc index be452746cc..fb31e61317 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1867,11 +1867,11 @@ double current_time(bool real) { iosource::PktSrc* src = iosource_mgr->GetPktSrc(); if ( run_state::is_processing_suspended() ) - return run_state::current_packet_timestamp(); + return run_state::detail::current_pseudo; // We don't scale with pseudo_realtime here as that would give us a // jumping real-time. - return run_state::current_packet_timestamp() + (t - run_state::current_packet_wallclock()); + return run_state::detail::current_pseudo + (t - run_state::current_packet_wallclock()); } struct timeval double_to_timeval(double t) {