From 69da2d7b1d215832f1d948cefb4d01e7ea698a03 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Wed, 23 Sep 2020 15:22:02 -0700 Subject: [PATCH] Prep work for IP changes - Move all of the time handling code out of PktSrc into RunState - Call packet_mgr->ProcessPacket() from various places to setup layer 2 data in packets --- src/RunState.cc | 50 ++++++++++++++----- src/RunState.h | 21 ++++++-- src/Sessions.cc | 3 ++ src/analyzer/protocol/vxlan/VXLAN.cc | 2 + src/iosource/Packet.cc | 1 - src/iosource/Packet.h | 27 +++++----- src/iosource/PktSrc.cc | 73 ++++++++++------------------ src/iosource/PktSrc.h | 18 +------ src/util.cc | 5 +- src/zeek.bif | 12 ++--- 10 files changed, 108 insertions(+), 104 deletions(-) diff --git a/src/RunState.cc b/src/RunState.cc index 3b960c1e14..0865d9b4fa 100644 --- a/src/RunState.cc +++ b/src/RunState.cc @@ -51,6 +51,10 @@ iosource::PktDumper* pkt_dumper = nullptr; iosource::PktSrc* current_pktsrc = nullptr; iosource::IOSource* current_iosrc = nullptr; bool have_pending_timers = false; +double first_wallclock = 0.0; +double first_timestamp = 0.0; +double current_wallclock = 0.0; +double current_pseudo = 0.0; RETSIGTYPE watchdog(int /* signo */) { @@ -196,7 +200,7 @@ void init_run(const std::optional& interface, } } -void expire_timers(iosource::PktSrc* src_ps) +void expire_timers() { zeek::detail::SegmentProfiler prof(zeek::detail::segment_logger, "expiring-timers"); @@ -205,8 +209,13 @@ void expire_timers(iosource::PktSrc* src_ps) zeek::detail::max_timer_expires - current_dispatched); } -void dispatch_packet(double t, const Packet* pkt, iosource::PktSrc* src_ps) +void dispatch_packet(const Packet* pkt) { + if ( ! pkt->l2_valid ) + return; + + double t = run_state::pseudo_realtime ? check_pseudo_time(pkt) : pkt->time; + if ( ! zeek_start_network_time ) { zeek_start_network_time = t; @@ -217,12 +226,8 @@ void dispatch_packet(double t, const Packet* pkt, iosource::PktSrc* src_ps) // network_time never goes back. update_network_time(zeek::detail::timer_mgr->Time() < t ? t : zeek::detail::timer_mgr->Time()); - - current_pktsrc = src_ps; - current_iosrc = src_ps; processing_start_time = t; - - expire_timers(src_ps); + expire_timers(); zeek::detail::SegmentProfiler* sp = nullptr; @@ -256,8 +261,9 @@ void dispatch_packet(double t, const Packet* pkt, iosource::PktSrc* src_ps) processing_start_time = 0.0; // = "we're not processing now" current_dispatched = 0; - current_iosrc = nullptr; - current_pktsrc = nullptr; + + if ( pseudo_realtime && ! first_wallclock ) + first_wallclock = util::current_time(true); } void run_loop() @@ -396,8 +402,31 @@ void delete_run() delete zeek::detail::ip_anonymizer[i]; } +double check_pseudo_time(const Packet* pkt) + { + double pseudo_time = pkt->time - first_timestamp; + double ct = (util::current_time(true) - first_wallclock) * pseudo_realtime; + + current_pseudo = pseudo_time <= ct ? zeek_start_time + pseudo_time : 0; + return current_pseudo; + } + } // namespace detail +extern double current_packet_timestamp() + { + return detail::current_pseudo; + } + +extern double current_packet_wallclock() + { + // We stop time when we are suspended. + if ( run_state::is_processing_suspended() ) + detail::current_wallclock = util::current_time(true); + + return detail::current_wallclock; + } + bool reading_live = false; bool reading_traces = false; double pseudo_realtime = 0.0; @@ -428,8 +457,7 @@ void continue_processing() if ( _processing_suspended == 1 ) { reporter->Info("processing continued"); - if ( iosource::PktSrc* ps = iosource_mgr->GetPktSrc() ) - ps->ContinueAfterSuspend(); + detail::current_wallclock = util::current_time(true); } --_processing_suspended; diff --git a/src/RunState.h b/src/RunState.h index 0f12b423b5..1994784fbe 100644 --- a/src/RunState.h +++ b/src/RunState.h @@ -24,12 +24,13 @@ extern void get_final_stats(); extern void finish_run(int drain_events); extern void delete_run(); // Reclaim all memory, etc. extern void update_network_time(double new_network_time); -extern void dispatch_packet(double t, const zeek::Packet* pkt, - zeek::iosource::PktSrc* src_ps); -extern void expire_timers(zeek::iosource::PktSrc* src_ps = nullptr); +extern void dispatch_packet(const zeek::Packet* pkt); +extern void expire_timers(); extern void zeek_terminate_loop(const char* reason); -extern zeek::iosource::PktSrc* current_pktsrc; +extern double check_pseudo_time(const Packet *pkt); + +extern zeek::iosource::PktSrc* current_pktsrc [[deprecated("Remove in v4.1. Use static_cast(zeek::detail::iosource.)")]]; extern zeek::iosource::IOSource* current_iosrc; extern zeek::iosource::PktDumper* pkt_dumper; // where to save packets @@ -40,6 +41,13 @@ extern zeek::iosource::PktDumper* pkt_dumper; // where to save packets // on future timers). extern bool have_pending_timers; +extern double first_wallclock; + +// Only set in pseudo-realtime mode. +extern double first_timestamp; +extern double current_wallclock; +extern double current_pseudo; + } // namespace detail // Functions to temporarily suspend processing of live input (network packets @@ -48,6 +56,9 @@ extern void suspend_processing(); extern void continue_processing(); bool is_processing_suspended(); +extern double current_packet_timestamp(); +extern double current_packet_wallclock(); + // Whether we're reading live traffic. extern bool reading_live; @@ -96,7 +107,7 @@ constexpr auto net_update_time [[deprecated("Remove in v4.1. Use zeek::run_state constexpr auto net_packet_dispatch [[deprecated("Remove in v4.1. Use zeek::run_state::detail::dispatch_packet.")]] = zeek::run_state::detail::dispatch_packet; constexpr auto expire_timers [[deprecated("Remove in v4.1. Use zeek::run_state::detail::expire_timers.")]] = zeek::run_state::detail::expire_timers; constexpr auto zeek_terminate_loop [[deprecated("Remove in v4.1. Use zeek::run_state::detail::zeek_terminate_loop.")]] = zeek::run_state::detail::zeek_terminate_loop; -extern zeek::iosource::PktSrc*& current_pktsrc [[deprecated("Remove in v4.1. Use zeek::run_state::detail::current_pktsrc.")]]; +extern zeek::iosource::PktSrc*& current_pktsrc [[deprecated("Remove in v4.1. Use static_cast(zeek::detail::iosource).")]]; extern zeek::iosource::IOSource*& current_iosrc [[deprecated("Remove in v4.1. Use zeek::run_state::detail::current_iosrc.")]]; extern zeek::iosource::PktDumper*& pkt_dumper [[deprecated("Remove in v4.1. Use zeek::run_state::detail::pkt_dumper.")]]; extern bool& have_pending_timers [[deprecated("Remove in v4.1. Use zeek::run_state::detail::have_pending_timers.")]]; diff --git a/src/Sessions.cc b/src/Sessions.cc index 57fcde8e75..4a56d4a35e 100644 --- a/src/Sessions.cc +++ b/src/Sessions.cc @@ -30,6 +30,7 @@ #include "analyzer/Manager.h" #include "iosource/IOSource.h" #include "iosource/PktDumper.h" +#include "packet_analysis/Manager.h" #include "pcap.h" @@ -770,6 +771,7 @@ void NetSessions::DoNextInnerPacket(double t, const Packet* pkt, // Construct fake packet for DoNextPacket Packet p; p.Init(DLT_RAW, &ts, caplen, len, data, false, ""); + packet_mgr->ProcessPacket(&p); DoNextPacket(t, &p, inner, outer); @@ -801,6 +803,7 @@ void NetSessions::DoNextInnerPacket(double t, const Packet* pkt, // Construct fake packet for DoNextPacket Packet p; p.Init(link_type, &ts, caplen, len, data, false, ""); + packet_mgr->ProcessPacket(&p); if ( p.l2_valid && (p.l3_proto == L3_IPV4 || p.l3_proto == L3_IPV6) ) { diff --git a/src/analyzer/protocol/vxlan/VXLAN.cc b/src/analyzer/protocol/vxlan/VXLAN.cc index a28fe45798..8f67ece511 100644 --- a/src/analyzer/protocol/vxlan/VXLAN.cc +++ b/src/analyzer/protocol/vxlan/VXLAN.cc @@ -9,6 +9,7 @@ #include "RunState.h" #include "Sessions.h" #include "Reporter.h" +#include "packet_analysis/Manager.h" #include "events.bif.h" @@ -64,6 +65,7 @@ void VXLAN_Analyzer::DeliverPacket(int len, const u_char* data, bool orig, ts.tv_sec = (time_t) run_state::current_timestamp; ts.tv_usec = (suseconds_t) ((run_state::current_timestamp - (double)ts.tv_sec) * 1000000); Packet pkt(DLT_EN10MB, &ts, caplen, len, data); + packet_mgr->ProcessPacket(&pkt); if ( ! pkt.l2_valid ) { diff --git a/src/iosource/Packet.cc b/src/iosource/Packet.cc index 152723f866..5a8339add0 100644 --- a/src/iosource/Packet.cc +++ b/src/iosource/Packet.cc @@ -67,7 +67,6 @@ void Packet::Init(int arg_link_type, pkt_timeval *arg_ts, uint32_t arg_caplen, // From here we assume that layer 2 is valid. If the packet analysis fails, // the packet manager will invalidate the packet. l2_valid = true; - packet_mgr->ProcessPacket(this); } } diff --git a/src/iosource/Packet.h b/src/iosource/Packet.h index 8d4e5ba60d..5c46b548b1 100644 --- a/src/iosource/Packet.h +++ b/src/iosource/Packet.h @@ -66,8 +66,7 @@ public: */ Packet(int link_type, pkt_timeval *ts, uint32_t caplen, uint32_t len, const u_char *data, bool copy = false, - std::string tag = std::string("")) - : data(nullptr), l2_src(nullptr), l2_dst(nullptr) + std::string tag = "") { Init(link_type, ts, caplen, len, data, copy, tag); } @@ -75,7 +74,7 @@ public: /** * Default constructor. For internal use only. */ - Packet() : data(nullptr), l2_src(nullptr), l2_dst(nullptr) + Packet() { pkt_timeval ts = {0, 0}; Init(0, &ts, 0, 0, nullptr); @@ -113,8 +112,8 @@ public: * differentiating the input streams. */ void Init(int link_type, pkt_timeval *ts, uint32_t caplen, - uint32_t len, const u_char *data, bool copy = false, - std::string tag = std::string("")); + uint32_t len, const u_char *data, bool copy = false, + std::string tag = ""); /** * Interprets the Layer 3 of the packet as IP and returns a @@ -144,13 +143,13 @@ public: static constexpr const u_char L2_EMPTY_ADDR[L2_ADDR_LEN] = { 0 }; // These are passed in through the constructor. - std::string tag; /// Used in serialization - double time; /// Timestamp reconstituted as float - pkt_timeval ts; /// Capture timestamp - const u_char* data; /// Packet data. - uint32_t len; /// Actual length on wire - uint32_t cap_len; /// Captured packet length - uint32_t link_type; /// pcap link_type (DLT_EN10MB, DLT_RAW, etc) + std::string tag; /// Used in serialization + double time; /// Timestamp reconstituted as float + pkt_timeval ts; /// Capture timestamp + const u_char* data = nullptr; /// Packet data. + uint32_t len; /// Actual length on wire + uint32_t cap_len; /// Captured packet length + uint32_t link_type; /// pcap link_type (DLT_EN10MB, DLT_RAW, etc) // True if L2 processing succeeded. If data is set on initialization of // the packet, L2 is assumed to be valid. The packet manager will then @@ -179,12 +178,12 @@ public: /** * Layer 2 source address. Valid iff l2_valid is true. */ - const u_char* l2_src; + const u_char* l2_src = nullptr; /** * Layer 2 destination address. Valid iff l2_valid is true. */ - const u_char* l2_dst; + const u_char* l2_dst = nullptr; /** * (Outermost) VLAN tag if any, else 0. Valid iff l2_valid is true. diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index 38fb51cefb..60521426cb 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -11,6 +11,7 @@ #include "Sessions.h" #include "broker/Manager.h" #include "iosource/Manager.h" +#include "packet_analysis/Manager.h" #include "BPF_Program.h" #include "pcap/pcap.bif.h" @@ -30,11 +31,6 @@ PktSrc::PktSrc() have_packet = false; errbuf = ""; SetClosed(true); - - next_sync_point = 0; - first_timestamp = 0.0; - current_pseudo = 0.0; - first_wallclock = current_wallclock = 0; } PktSrc::~PktSrc() @@ -76,16 +72,12 @@ bool PktSrc::IsLive() const double PktSrc::CurrentPacketTimestamp() { - return current_pseudo; + return run_state::current_packet_timestamp(); } double PktSrc::CurrentPacketWallClock() { - // We stop time when we are suspended. - if ( run_state::is_processing_suspended() ) - current_wallclock = util::current_time(true); - - return current_wallclock; + return run_state::current_packet_wallclock(); } void PktSrc::Opened(const Properties& arg_props) @@ -151,25 +143,6 @@ void PktSrc::InternalError(const std::string& msg) reporter->InternalError("%s", msg.c_str()); } -void PktSrc::ContinueAfterSuspend() - { - current_wallclock = util::current_time(true); - } - -double PktSrc::CheckPseudoTime() - { - if ( ! IsOpen() ) - return 0; - - if ( ! ExtractNextPacketInternal() ) - return 0; - - double pseudo_time = current_packet.time - first_timestamp; - double ct = (util::current_time(true) - first_wallclock) * run_state::pseudo_realtime; - - return pseudo_time <= ct ? run_state::zeek_start_time + pseudo_time : 0; - } - void PktSrc::InitSource() { Open(); @@ -189,19 +162,22 @@ void PktSrc::Process() if ( ! ExtractNextPacketInternal() ) return; - if ( current_packet.l2_valid ) - { - if ( run_state::pseudo_realtime ) - { - current_pseudo = CheckPseudoTime(); - run_state::detail::dispatch_packet(current_pseudo, ¤t_packet, this); - if ( ! first_wallclock ) - first_wallclock = util::current_time(true); - } + // This is set here to avoid having to pass the packet source down into the processing + // methods unnecessarily. + run_state::detail::current_iosrc = this; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + run_state::detail::current_pktsrc = this; +#pragma GCC diagnostic pop - else - run_state::detail::dispatch_packet(current_packet.time, ¤t_packet, this); - } + packet_mgr->ProcessPacket(¤t_packet); + run_state::detail::dispatch_packet(¤t_packet); + + run_state::detail::current_iosrc = nullptr; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" + run_state::detail::current_pktsrc = nullptr; +#pragma GCC diagnostic pop have_packet = false; DoneWithPacket(); @@ -221,11 +197,11 @@ bool PktSrc::ExtractNextPacketInternal() // Don't return any packets if processing is suspended (except for the // very first packet which we need to set up times). - if ( run_state::is_processing_suspended() && first_timestamp ) + if ( run_state::is_processing_suspended() && run_state::detail::first_timestamp ) return false; if ( run_state::pseudo_realtime ) - current_wallclock = util::current_time(true); + run_state::detail::current_wallclock = util::current_time(true); if ( ExtractNextPacket(¤t_packet) ) { @@ -235,8 +211,8 @@ bool PktSrc::ExtractNextPacketInternal() return false; } - if ( ! first_timestamp ) - first_timestamp = current_packet.time; + if ( ! run_state::detail::first_timestamp ) + run_state::detail::first_timestamp = current_packet.time; have_packet = true; return true; @@ -342,8 +318,9 @@ double PktSrc::GetNextTimeout() if ( ! have_packet ) ExtractNextPacketInternal(); - double pseudo_time = current_packet.time - first_timestamp; - double ct = (util::current_time(true) - first_wallclock) * run_state::pseudo_realtime; + // This duplicates the calculation used in run_state::check_pseudo_time(). + double pseudo_time = current_packet.time - run_state::detail::first_timestamp; + double ct = (util::current_time(true) - run_state::detail::first_wallclock) * run_state::pseudo_realtime; return std::max(0.0, pseudo_time - ct); } diff --git a/src/iosource/PktSrc.h b/src/iosource/PktSrc.h index 66708da6b1..127d2364c2 100644 --- a/src/iosource/PktSrc.h +++ b/src/iosource/PktSrc.h @@ -96,6 +96,7 @@ public: * In pseudo-realtime mode, returns the logical timestamp of the * current packet. Undefined if not running pseudo-realtime mode. */ + [[deprecated("Remove in v4.1. Use zeek::run_state::current_packet_timestamp().")]] double CurrentPacketTimestamp(); /** @@ -103,14 +104,9 @@ public: * with current packet. Undefined if not running pseudo-realtime * mode. */ + [[deprecated("Remove in v4.1. Use zeek::run_state::current_wallclock().")]] double CurrentPacketWallClock(); - /** - * Signals packet source that processing is going to be continued - * after previous suspension. - */ - void ContinueAfterSuspend(); - /** * Precompiles a BPF filter and associates the given index with it. * The compiled filter will be then available via \a GetBPFFilter(). @@ -349,9 +345,6 @@ protected: virtual void DoneWithPacket() = 0; private: - // Checks if the current packet has a pseudo-time <= current_time. If - // yes, returns pseudo-time, otherwise 0. - double CheckPseudoTime(); // Internal helper for ExtractNextPacket(). bool ExtractNextPacketInternal(); @@ -370,13 +363,6 @@ private: // For BPF filtering support. std::vector filters; - // Only set in pseudo-realtime mode. - double first_timestamp; - double first_wallclock; - double current_wallclock; - double current_pseudo; - double next_sync_point; // For trace synchronziation in pseudo-realtime - std::string errbuf; }; diff --git a/src/util.cc b/src/util.cc index 467d8e54e0..02c911cbd7 100644 --- a/src/util.cc +++ b/src/util.cc @@ -2050,12 +2050,11 @@ double current_time(bool real) iosource::PktSrc* src = iosource_mgr->GetPktSrc(); if ( run_state::is_processing_suspended() ) - return src->CurrentPacketTimestamp(); + return run_state::current_packet_timestamp(); // We don't scale with pseudo_realtime here as that would give us a // jumping real-time. - return src->CurrentPacketTimestamp() + - (t - src->CurrentPacketWallClock()); + return run_state::current_packet_timestamp() + (t - run_state::current_packet_wallclock()); } struct timeval double_to_timeval(double t) diff --git a/src/zeek.bif b/src/zeek.bif index 8f516ccaaf..e0254b8522 100644 --- a/src/zeek.bif +++ b/src/zeek.bif @@ -3399,9 +3399,9 @@ const char* conn_id_string(zeek::Val* c) function dump_current_packet%(file_name: string%) : bool %{ const Packet* pkt; + auto* pkt_src = static_cast(zeek::run_state::detail::current_iosrc); - if ( ! zeek::run_state::detail::current_pktsrc || - ! zeek::run_state::detail::current_pktsrc->GetCurrentPacket(&pkt) ) + if ( ! pkt_src || ! pkt_src->GetCurrentPacket(&pkt) ) return zeek::val_mgr->False(); if ( addl_pkt_dumper && addl_pkt_dumper->Path() != file_name->CheckString()) @@ -3432,9 +3432,9 @@ function get_current_packet%(%) : pcap_packet static auto pcap_packet = zeek::id::find_type("pcap_packet"); const Packet* p; auto pkt = zeek::make_intrusive(pcap_packet); + auto* pkt_src = static_cast(zeek::run_state::detail::current_iosrc); - if ( ! zeek::run_state::detail::current_pktsrc || - ! zeek::run_state::detail::current_pktsrc->GetCurrentPacket(&p) ) + if ( ! pkt_src || ! pkt_src->GetCurrentPacket(&p) ) { pkt->Assign(0, zeek::val_mgr->Count(0)); pkt->Assign(1, zeek::val_mgr->Count(0)); @@ -3464,9 +3464,9 @@ function get_current_packet%(%) : pcap_packet function get_current_packet_header%(%) : raw_pkt_hdr %{ const Packet* p; + auto* pkt_src = static_cast(zeek::run_state::detail::current_iosrc); - if ( zeek::run_state::detail::current_pktsrc && - zeek::run_state::detail::current_pktsrc->GetCurrentPacket(&p) ) + if ( pkt_src && pkt_src->GetCurrentPacket(&p) ) { return p->ToRawPktHdrVal(); }