From 402b76878794fec04f2aef2c60b7bbc8f2505b57 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 12 Nov 2024 09:49:06 +0100 Subject: [PATCH 1/5] debug: Add processing suspended/continued to debug.log --- src/RunState.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/RunState.cc b/src/RunState.cc index 6add4134aa..702b3b75e0 100644 --- a/src/RunState.cc +++ b/src/RunState.cc @@ -438,14 +438,17 @@ double current_timestamp = 0.0; static int _processing_suspended = 0; void suspend_processing() { - if ( _processing_suspended == 0 ) + if ( _processing_suspended == 0 ) { + DBG_LOG(DBG_MAINLOOP, "processing suspended"); reporter->Info("processing suspended"); + } ++_processing_suspended; } void continue_processing() { if ( _processing_suspended == 1 ) { + DBG_LOG(DBG_MAINLOOP, "processing continued"); reporter->Info("processing continued"); detail::current_wallclock = util::current_time(true); } From 54d28a2179f7b4164f44a06a72db2c870239272e Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 11 Nov 2024 19:47:23 +0100 Subject: [PATCH 2/5] RunState.h: Deprecate misleadingly named current_packet_timestamp() This returns current_pseudo, naming it current_packet_timestamp() is actively misleading. --- src/RunState.h | 1 + src/util.cc | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/RunState.h b/src/RunState.h index af710bf8ea..05ef5b2c63 100644 --- a/src/RunState.h +++ b/src/RunState.h @@ -70,6 +70,7 @@ extern void suspend_processing(); extern void continue_processing(); bool is_processing_suspended(); +[[deprecated("Remove with v8.1. Use run_state::current_pseudo directly, but you probably should not")]] extern double current_packet_timestamp(); extern double current_packet_wallclock(); diff --git a/src/util.cc b/src/util.cc index be452746cc..fb31e61317 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1867,11 +1867,11 @@ double current_time(bool real) { iosource::PktSrc* src = iosource_mgr->GetPktSrc(); if ( run_state::is_processing_suspended() ) - return run_state::current_packet_timestamp(); + return run_state::detail::current_pseudo; // We don't scale with pseudo_realtime here as that would give us a // jumping real-time. - return run_state::current_packet_timestamp() + (t - run_state::current_packet_wallclock()); + return run_state::detail::current_pseudo + (t - run_state::current_packet_wallclock()); } struct timeval double_to_timeval(double t) { From d9a7f9f36fc2f3f9f5850311b9fe2777ff3a92d6 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 11 Nov 2024 20:00:44 +0100 Subject: [PATCH 3/5] PktSrc/RunState: Scale on first_wallclock and move pseudo realtime logic to RunState check_pseudo_time() used zeek_start_time which skews things sufficiently around being in the past when ZAM compilation takes multiple seconds. Switch to using first_wallclock instead. Further, move setting of first_timestamp and first_wallclock from PktSrc into RunState's dispatch_packet(), so it's more centralized now. The only pseudo_realtime piece left in PktSrc() is in GetNextTimeout() to determine how long the PktSrc is idle until the next packet is ready. --- src/RunState.cc | 30 +++++++++++++++++++++++------- src/RunState.h | 2 +- src/iosource/PktSrc.cc | 6 ------ 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/RunState.cc b/src/RunState.cc index 702b3b75e0..1074100fa6 100644 --- a/src/RunState.cc +++ b/src/RunState.cc @@ -129,8 +129,6 @@ void update_network_time(double new_network_time) { static bool should_forward_network_time() { // In pseudo_realtime mode, always update time once // we've dispatched and processed the first packet. - // run_state::detail::first_timestamp is currently set - // in PktSrc::ExtractNextPacketInternal() if ( pseudo_realtime != 0.0 && run_state::detail::first_timestamp != 0.0 ) return true; @@ -221,7 +219,19 @@ void expire_timers() { } void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) { - double t = run_state::pseudo_realtime ? check_pseudo_time(pkt) : pkt->time; + double t = pkt->time; + + if ( pseudo_realtime != 0.0 ) { + current_wallclock = util::current_time(true); + + if ( first_wallclock == 0.0 ) { + first_wallclock = util::current_time(true); + first_timestamp = pkt->time; + } + + // Scale pkt time based on pseudo_realtime + t = check_pseudo_time(pkt); + } if ( ! zeek_start_network_time ) { zeek_start_network_time = t; @@ -244,9 +254,6 @@ void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) { processing_start_time = 0.0; // = "we're not processing now" current_dispatched = 0; - if ( pseudo_realtime && ! first_wallclock ) - first_wallclock = util::current_time(true); - current_iosrc = nullptr; current_pktsrc = nullptr; } @@ -399,10 +406,19 @@ void delete_run() { } double check_pseudo_time(const Packet* pkt) { + assert(pkt->time > 0.0); + assert(first_wallclock > 0.0); + assert(first_timestamp > 0.0); double pseudo_time = pkt->time - first_timestamp; double ct = (util::current_time(true) - first_wallclock) * pseudo_realtime; - current_pseudo = pseudo_time <= ct ? zeek_start_time + pseudo_time : 0; + current_pseudo = pseudo_time <= ct ? first_wallclock + pseudo_time : 0; + + DBG_LOG(DBG_MAINLOOP, + "check_pseudo_time: first_wallclock=%.6f first_timestamp=%.6f pkt->time=%.6f pseudo_time=%.6f ct=%.6f " + "current_pseudo=%.6f", + first_wallclock, first_timestamp, pkt->time, pseudo_time, ct, current_pseudo); + return current_pseudo; } diff --git a/src/RunState.h b/src/RunState.h index 05ef5b2c63..1015aa888a 100644 --- a/src/RunState.h +++ b/src/RunState.h @@ -70,7 +70,7 @@ extern void suspend_processing(); extern void continue_processing(); bool is_processing_suspended(); -[[deprecated("Remove with v8.1. Use run_state::current_pseudo directly, but you probably should not")]] +[[deprecated("Remove with v8.1. Use run_state::current_pseudo directly if needed.")]] extern double current_packet_timestamp(); extern double current_packet_wallclock(); diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index 2e49f3e8ff..df228b3851 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -136,9 +136,6 @@ bool PktSrc::ExtractNextPacketInternal() { if ( run_state::is_processing_suspended() && run_state::detail::first_timestamp ) return false; - if ( run_state::pseudo_realtime ) - run_state::detail::current_wallclock = util::current_time(true); - if ( ExtractNextPacket(¤t_packet) ) { had_packet = true; @@ -147,9 +144,6 @@ bool PktSrc::ExtractNextPacketInternal() { return false; } - if ( ! run_state::detail::first_timestamp ) - run_state::detail::first_timestamp = current_packet.time; - have_packet = true; return true; } From ffa1fafa036f93927ab0c13fd5cb3f25a3444379 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 11 Nov 2024 20:12:12 +0100 Subject: [PATCH 4/5] PktSrc: Fix includes --- src/iosource/PktSrc.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index df228b3851..48a8e65dcb 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -2,17 +2,13 @@ #include "zeek/iosource/PktSrc.h" -#include "zeek/zeek-config.h" - #include -#include "zeek/Hash.h" +#include "zeek/DebugLogger.h" #include "zeek/RunState.h" -#include "zeek/broker/Manager.h" #include "zeek/iosource/BPF_Program.h" #include "zeek/iosource/Manager.h" #include "zeek/iosource/pcap/pcap.bif.h" -#include "zeek/packet_analysis/Manager.h" #include "zeek/session/Manager.h" #include "zeek/util.h" From fcab5fd6cfc817884d5740bf2f14f891677ffc31 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 12 Nov 2024 10:07:01 +0100 Subject: [PATCH 5/5] PktSrc: Remove first_timestamp condition check The comment is stale and first_timestamp is only relevant/available in pseudo_realtime. --- src/iosource/PktSrc.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index 48a8e65dcb..786e5a3a40 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -127,9 +127,8 @@ bool PktSrc::ExtractNextPacketInternal() { have_packet = false; - // Don't return any packets if processing is suspended (except for the - // very first packet which we need to set up times). - if ( run_state::is_processing_suspended() && run_state::detail::first_timestamp ) + // Don't return any packets if processing is suspended. + if ( run_state::is_processing_suspended() ) return false; if ( ExtractNextPacket(¤t_packet) ) {