diff --git a/src/iosource/PktSrc.cc b/src/iosource/PktSrc.cc index 0eae93ac27..c61aebc2d1 100644 --- a/src/iosource/PktSrc.cc +++ b/src/iosource/PktSrc.cc @@ -109,9 +109,20 @@ void PktSrc::Opened(const Properties& arg_props) return; } + if ( props.is_live ) + { Info(fmt("listening on %s\n", props.path.c_str())); + // We only register the file descriptor if we're in live + // mode because libpcap's file descriptor for trace files + // isn't a reliable way to know whether we actually have + // data to read. + if ( props.selectable_fd != -1 ) + if ( ! iosource_mgr->RegisterFd(props.selectable_fd, this) ) + reporter->FatalError("Failed to register pktsrc fd with iosource_mgr"); + } + DBG_LOG(DBG_PKTIO, "Opened source %s", props.path.c_str()); } @@ -119,6 +130,9 @@ void PktSrc::Closed() { SetClosed(true); + if ( props.is_live && props.selectable_fd != -1 ) + iosource_mgr->UnregisterFd(props.selectable_fd, this); + DBG_LOG(DBG_PKTIO, "Closed source %s", props.path.c_str()); } @@ -166,7 +180,7 @@ double PktSrc::CheckPseudoTime() return pseudo_time <= ct ? bro_start_time + pseudo_time : 0; } -void PktSrc::Init() +void PktSrc::InitSource() { Open(); } @@ -177,55 +191,6 @@ void PktSrc::Done() Close(); } -void PktSrc::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) - { - if ( pseudo_realtime ) - { - // Select would give erroneous results. But we simulate it - // by setting idle accordingly. - SetIdle(CheckPseudoTime() == 0); - return; - } - - if ( IsOpen() && props.selectable_fd >= 0 ) - read->Insert(props.selectable_fd); - - // TODO: This seems like a hack that should be removed, but doing so - // causes the main run loop to spin more frequently and increase cpu usage. - // See also commit 9cd85be308. - if ( read->Empty() ) - read->Insert(0); - - if ( write->Empty() ) - write->Insert(0); - - if ( except->Empty() ) - except->Insert(0); - } - -double PktSrc::NextTimestamp(double* local_network_time) - { - if ( ! IsOpen() ) - return -1.0; - - if ( ! ExtractNextPacketInternal() ) - return -1.0; - - if ( pseudo_realtime ) - { - // Delay packet if necessary. - double packet_time = CheckPseudoTime(); - if ( packet_time ) - return packet_time; - - SetIdle(true); - return -1.0; - } - - return current_packet.time; - } - void PktSrc::Process() { if ( ! IsOpen() ) @@ -248,7 +213,7 @@ void PktSrc::Process() net_packet_dispatch(current_packet.time, ¤t_packet, this); } - have_packet = 0; + have_packet = false; DoneWithPacket(); } @@ -267,10 +232,7 @@ bool PktSrc::ExtractNextPacketInternal() // Don't return any packets if processing is suspended (except for the // very first packet which we need to set up times). if ( net_is_processing_suspended() && first_timestamp ) - { - SetIdle(true); - return 0; - } + return false; if ( pseudo_realtime ) current_wallclock = current_time(true); @@ -280,15 +242,14 @@ bool PktSrc::ExtractNextPacketInternal() if ( current_packet.time < 0 ) { Weird("negative_packet_timestamp", ¤t_packet); - return 0; + return false; } if ( ! first_timestamp ) first_timestamp = current_packet.time; - SetIdle(false); have_packet = true; - return 1; + return true; } if ( pseudo_realtime && ! IsOpen() ) @@ -297,8 +258,7 @@ bool PktSrc::ExtractNextPacketInternal() iosource_mgr->Terminate(); } - SetIdle(true); - return 0; + return false; } bool PktSrc::PrecompileBPFFilter(int index, const std::string& filter) @@ -321,7 +281,7 @@ bool PktSrc::PrecompileBPFFilter(int index, const std::string& filter) Error(msg); delete code; - return 0; + return false; } // Store it in vector. @@ -369,3 +329,30 @@ bool PktSrc::GetCurrentPacket(const Packet** pkt) *pkt = ¤t_packet; return true; } + +double PktSrc::GetNextTimeout() + { + // If there's no file descriptor for the source, which is the case for some interfaces like + // myricom, we can't rely on the polling mechanism to wait for data to be available. As gross + // as it is, just spin with a short timeout here so that it will continually poll the + // interface. The old IOSource code had a 20ns timeout between calls to select() so just + // use that. + if ( props.selectable_fd == -1 ) + return 0.00000002; + + // If we're live we want poll to do what it has to with the file descriptor. If we're not live + // but we're not in pseudo-realtime mode, let the loop just spin as fast as it can. If we're + // in pseudo-realtime mode, find the next time that a packet is ready and have poll block until + // then. + if ( IsLive() || net_is_processing_suspended() ) + return -1; + else if ( ! pseudo_realtime ) + return 0; + + if ( ! have_packet ) + ExtractNextPacketInternal(); + + double pseudo_time = current_packet.time - first_timestamp; + double ct = (current_time(true) - first_wallclock) * pseudo_realtime; + return std::max(0.0, pseudo_time - ct); + } diff --git a/src/iosource/PktSrc.h b/src/iosource/PktSrc.h index 7ba9335df8..8207e030e9 100644 --- a/src/iosource/PktSrc.h +++ b/src/iosource/PktSrc.h @@ -204,6 +204,17 @@ public: */ virtual void Statistics(Stats* stats) = 0; + /** + * Return the next timeout value for this source. This should be + * overridden by source classes where they have a timeout value + * that can wake up the poll. + * + * @return A value for the next time that the source thinks the + * poll should time out in seconds from the current time. Return + * -1 if this should should not be considered. + */ + virtual double GetNextTimeout() override; + protected: friend class Manager; friend class ManagerBase; @@ -343,11 +354,8 @@ private: bool ExtractNextPacketInternal(); // IOSource interface implementation. - void Init() override; + void InitSource() override; void Done() override; - void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) override; - double NextTimestamp(double* local_network_time) override; void Process() override; const char* Tag() override;