Merge remote-tracking branch 'origin/topic/awelzel/pseudo-realtime-again'

* origin/topic/awelzel/pseudo-realtime-again:
  PktSrc: Remove first_timestamp condition check
  PktSrc: Fix includes
  PktSrc/RunState: Scale on first_wallclock and move pseudo realtime logic to RunState
  RunState.h: Deprecate misleadingly named current_packet_timestamp()
  debug: Add processing suspended/continued to debug.log
This commit is contained in:
Arne Welzel 2024-11-12 16:00:00 +01:00
commit d0bf4e428a
6 changed files with 62 additions and 25 deletions

28
CHANGES
View file

@ -1,3 +1,31 @@
7.1.0-dev.511 | 2024-11-12 16:00:00 +0100
* PktSrc: Remove first_timestamp condition check (Arne Welzel, Corelight)
The comment is stale and first_timestamp is only relevant/available
in pseudo_realtime.
* PktSrc: Fix includes (Arne Welzel, Corelight)
* PktSrc/RunState: Scale on first_wallclock and move pseudo realtime logic to RunState (Arne Welzel, Corelight)
check_pseudo_time() used zeek_start_time which skews things sufficiently
around being in the past when ZAM compilation takes multiple seconds. Switch
to using first_wallclock instead.
Further, move setting of first_timestamp and first_wallclock from PktSrc
into RunState's dispatch_packet(), so it's more centralized now.
The only pseudo_realtime piece left in PktSrc() is in GetNextTimeout() to
determine how long the PktSrc is idle until the next packet is ready.
* RunState.h: Deprecate misleadingly named current_packet_timestamp() (Arne Welzel, Corelight)
This returns current_pseudo, naming it current_packet_timestamp()
is actively misleading.
* debug: Add processing suspended/continued to debug.log (Arne Welzel, Corelight)
7.1.0-dev.505 | 2024-11-12 10:26:15 +0100 7.1.0-dev.505 | 2024-11-12 10:26:15 +0100
* ZAM fixes for assignments involving "any" record fields (Vern Paxson, Corelight) * ZAM fixes for assignments involving "any" record fields (Vern Paxson, Corelight)

View file

@ -1 +1 @@
7.1.0-dev.505 7.1.0-dev.511

View file

@ -129,8 +129,6 @@ void update_network_time(double new_network_time) {
static bool should_forward_network_time() { static bool should_forward_network_time() {
// In pseudo_realtime mode, always update time once // In pseudo_realtime mode, always update time once
// we've dispatched and processed the first packet. // we've dispatched and processed the first packet.
// run_state::detail::first_timestamp is currently set
// in PktSrc::ExtractNextPacketInternal()
if ( pseudo_realtime != 0.0 && run_state::detail::first_timestamp != 0.0 ) if ( pseudo_realtime != 0.0 && run_state::detail::first_timestamp != 0.0 )
return true; return true;
@ -221,7 +219,19 @@ void expire_timers() {
} }
void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) { void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) {
double t = run_state::pseudo_realtime ? check_pseudo_time(pkt) : pkt->time; double t = pkt->time;
if ( pseudo_realtime != 0.0 ) {
current_wallclock = util::current_time(true);
if ( first_wallclock == 0.0 ) {
first_wallclock = util::current_time(true);
first_timestamp = pkt->time;
}
// Scale pkt time based on pseudo_realtime
t = check_pseudo_time(pkt);
}
if ( ! zeek_start_network_time ) { if ( ! zeek_start_network_time ) {
zeek_start_network_time = t; zeek_start_network_time = t;
@ -244,9 +254,6 @@ void dispatch_packet(Packet* pkt, iosource::PktSrc* pkt_src) {
processing_start_time = 0.0; // = "we're not processing now" processing_start_time = 0.0; // = "we're not processing now"
current_dispatched = 0; current_dispatched = 0;
if ( pseudo_realtime && ! first_wallclock )
first_wallclock = util::current_time(true);
current_iosrc = nullptr; current_iosrc = nullptr;
current_pktsrc = nullptr; current_pktsrc = nullptr;
} }
@ -399,10 +406,19 @@ void delete_run() {
} }
double check_pseudo_time(const Packet* pkt) { double check_pseudo_time(const Packet* pkt) {
assert(pkt->time > 0.0);
assert(first_wallclock > 0.0);
assert(first_timestamp > 0.0);
double pseudo_time = pkt->time - first_timestamp; double pseudo_time = pkt->time - first_timestamp;
double ct = (util::current_time(true) - first_wallclock) * pseudo_realtime; double ct = (util::current_time(true) - first_wallclock) * pseudo_realtime;
current_pseudo = pseudo_time <= ct ? zeek_start_time + pseudo_time : 0; current_pseudo = pseudo_time <= ct ? first_wallclock + pseudo_time : 0;
DBG_LOG(DBG_MAINLOOP,
"check_pseudo_time: first_wallclock=%.6f first_timestamp=%.6f pkt->time=%.6f pseudo_time=%.6f ct=%.6f "
"current_pseudo=%.6f",
first_wallclock, first_timestamp, pkt->time, pseudo_time, ct, current_pseudo);
return current_pseudo; return current_pseudo;
} }
@ -438,14 +454,17 @@ double current_timestamp = 0.0;
static int _processing_suspended = 0; static int _processing_suspended = 0;
void suspend_processing() { void suspend_processing() {
if ( _processing_suspended == 0 ) if ( _processing_suspended == 0 ) {
DBG_LOG(DBG_MAINLOOP, "processing suspended");
reporter->Info("processing suspended"); reporter->Info("processing suspended");
}
++_processing_suspended; ++_processing_suspended;
} }
void continue_processing() { void continue_processing() {
if ( _processing_suspended == 1 ) { if ( _processing_suspended == 1 ) {
DBG_LOG(DBG_MAINLOOP, "processing continued");
reporter->Info("processing continued"); reporter->Info("processing continued");
detail::current_wallclock = util::current_time(true); detail::current_wallclock = util::current_time(true);
} }

View file

@ -70,6 +70,7 @@ extern void suspend_processing();
extern void continue_processing(); extern void continue_processing();
bool is_processing_suspended(); bool is_processing_suspended();
[[deprecated("Remove with v8.1. Use run_state::current_pseudo directly if needed.")]]
extern double current_packet_timestamp(); extern double current_packet_timestamp();
extern double current_packet_wallclock(); extern double current_packet_wallclock();

View file

@ -2,17 +2,13 @@
#include "zeek/iosource/PktSrc.h" #include "zeek/iosource/PktSrc.h"
#include "zeek/zeek-config.h"
#include <sys/stat.h> #include <sys/stat.h>
#include "zeek/Hash.h" #include "zeek/DebugLogger.h"
#include "zeek/RunState.h" #include "zeek/RunState.h"
#include "zeek/broker/Manager.h"
#include "zeek/iosource/BPF_Program.h" #include "zeek/iosource/BPF_Program.h"
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
#include "zeek/iosource/pcap/pcap.bif.h" #include "zeek/iosource/pcap/pcap.bif.h"
#include "zeek/packet_analysis/Manager.h"
#include "zeek/session/Manager.h" #include "zeek/session/Manager.h"
#include "zeek/util.h" #include "zeek/util.h"
@ -131,14 +127,10 @@ bool PktSrc::ExtractNextPacketInternal() {
have_packet = false; have_packet = false;
// Don't return any packets if processing is suspended (except for the // Don't return any packets if processing is suspended.
// very first packet which we need to set up times). if ( run_state::is_processing_suspended() )
if ( run_state::is_processing_suspended() && run_state::detail::first_timestamp )
return false; return false;
if ( run_state::pseudo_realtime )
run_state::detail::current_wallclock = util::current_time(true);
if ( ExtractNextPacket(&current_packet) ) { if ( ExtractNextPacket(&current_packet) ) {
had_packet = true; had_packet = true;
@ -147,9 +139,6 @@ bool PktSrc::ExtractNextPacketInternal() {
return false; return false;
} }
if ( ! run_state::detail::first_timestamp )
run_state::detail::first_timestamp = current_packet.time;
have_packet = true; have_packet = true;
return true; return true;
} }

View file

@ -1867,11 +1867,11 @@ double current_time(bool real) {
iosource::PktSrc* src = iosource_mgr->GetPktSrc(); iosource::PktSrc* src = iosource_mgr->GetPktSrc();
if ( run_state::is_processing_suspended() ) if ( run_state::is_processing_suspended() )
return run_state::current_packet_timestamp(); return run_state::detail::current_pseudo;
// We don't scale with pseudo_realtime here as that would give us a // We don't scale with pseudo_realtime here as that would give us a
// jumping real-time. // jumping real-time.
return run_state::current_packet_timestamp() + (t - run_state::current_packet_wallclock()); return run_state::detail::current_pseudo + (t - run_state::current_packet_wallclock());
} }
struct timeval double_to_timeval(double t) { struct timeval double_to_timeval(double t) {