PktSrc iosource changes to match the new IOSource API

This commit is contained in:
Tim Wojtulewicz 2019-11-26 12:59:55 -07:00
parent c5462eaa80
commit 4751783d56
2 changed files with 60 additions and 65 deletions

View file

@ -109,9 +109,20 @@ void PktSrc::Opened(const Properties& arg_props)
return; return;
} }
if ( props.is_live ) if ( props.is_live )
{
Info(fmt("listening on %s\n", props.path.c_str())); Info(fmt("listening on %s\n", props.path.c_str()));
// We only register the file descriptor if we're in live
// mode because libpcap's file descriptor for trace files
// isn't a reliable way to know whether we actually have
// data to read.
if ( props.selectable_fd != -1 )
if ( ! iosource_mgr->RegisterFd(props.selectable_fd, this) )
reporter->FatalError("Failed to register pktsrc fd with iosource_mgr");
}
DBG_LOG(DBG_PKTIO, "Opened source %s", props.path.c_str()); DBG_LOG(DBG_PKTIO, "Opened source %s", props.path.c_str());
} }
@ -119,6 +130,9 @@ void PktSrc::Closed()
{ {
SetClosed(true); SetClosed(true);
if ( props.is_live && props.selectable_fd != -1 )
iosource_mgr->UnregisterFd(props.selectable_fd, this);
DBG_LOG(DBG_PKTIO, "Closed source %s", props.path.c_str()); DBG_LOG(DBG_PKTIO, "Closed source %s", props.path.c_str());
} }
@ -166,7 +180,7 @@ double PktSrc::CheckPseudoTime()
return pseudo_time <= ct ? bro_start_time + pseudo_time : 0; return pseudo_time <= ct ? bro_start_time + pseudo_time : 0;
} }
void PktSrc::Init() void PktSrc::InitSource()
{ {
Open(); Open();
} }
@ -177,55 +191,6 @@ void PktSrc::Done()
Close(); Close();
} }
void PktSrc::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
if ( pseudo_realtime )
{
// Select would give erroneous results. But we simulate it
// by setting idle accordingly.
SetIdle(CheckPseudoTime() == 0);
return;
}
if ( IsOpen() && props.selectable_fd >= 0 )
read->Insert(props.selectable_fd);
// TODO: This seems like a hack that should be removed, but doing so
// causes the main run loop to spin more frequently and increase cpu usage.
// See also commit 9cd85be308.
if ( read->Empty() )
read->Insert(0);
if ( write->Empty() )
write->Insert(0);
if ( except->Empty() )
except->Insert(0);
}
double PktSrc::NextTimestamp(double* local_network_time)
{
if ( ! IsOpen() )
return -1.0;
if ( ! ExtractNextPacketInternal() )
return -1.0;
if ( pseudo_realtime )
{
// Delay packet if necessary.
double packet_time = CheckPseudoTime();
if ( packet_time )
return packet_time;
SetIdle(true);
return -1.0;
}
return current_packet.time;
}
void PktSrc::Process() void PktSrc::Process()
{ {
if ( ! IsOpen() ) if ( ! IsOpen() )
@ -248,7 +213,7 @@ void PktSrc::Process()
net_packet_dispatch(current_packet.time, &current_packet, this); net_packet_dispatch(current_packet.time, &current_packet, this);
} }
have_packet = 0; have_packet = false;
DoneWithPacket(); DoneWithPacket();
} }
@ -267,10 +232,7 @@ bool PktSrc::ExtractNextPacketInternal()
// Don't return any packets if processing is suspended (except for the // Don't return any packets if processing is suspended (except for the
// very first packet which we need to set up times). // very first packet which we need to set up times).
if ( net_is_processing_suspended() && first_timestamp ) if ( net_is_processing_suspended() && first_timestamp )
{ return false;
SetIdle(true);
return 0;
}
if ( pseudo_realtime ) if ( pseudo_realtime )
current_wallclock = current_time(true); current_wallclock = current_time(true);
@ -280,15 +242,14 @@ bool PktSrc::ExtractNextPacketInternal()
if ( current_packet.time < 0 ) if ( current_packet.time < 0 )
{ {
Weird("negative_packet_timestamp", &current_packet); Weird("negative_packet_timestamp", &current_packet);
return 0; return false;
} }
if ( ! first_timestamp ) if ( ! first_timestamp )
first_timestamp = current_packet.time; first_timestamp = current_packet.time;
SetIdle(false);
have_packet = true; have_packet = true;
return 1; return true;
} }
if ( pseudo_realtime && ! IsOpen() ) if ( pseudo_realtime && ! IsOpen() )
@ -297,8 +258,7 @@ bool PktSrc::ExtractNextPacketInternal()
iosource_mgr->Terminate(); iosource_mgr->Terminate();
} }
SetIdle(true); return false;
return 0;
} }
bool PktSrc::PrecompileBPFFilter(int index, const std::string& filter) bool PktSrc::PrecompileBPFFilter(int index, const std::string& filter)
@ -321,7 +281,7 @@ bool PktSrc::PrecompileBPFFilter(int index, const std::string& filter)
Error(msg); Error(msg);
delete code; delete code;
return 0; return false;
} }
// Store it in vector. // Store it in vector.
@ -369,3 +329,30 @@ bool PktSrc::GetCurrentPacket(const Packet** pkt)
*pkt = &current_packet; *pkt = &current_packet;
return true; return true;
} }
double PktSrc::GetNextTimeout()
{
// If there's no file descriptor for the source, which is the case for some interfaces like
// myricom, we can't rely on the polling mechanism to wait for data to be available. As gross
// as it is, just spin with a short timeout here so that it will continually poll the
// interface. The old IOSource code had a 20ns timeout between calls to select() so just
// use that.
if ( props.selectable_fd == -1 )
return 0.00000002;
// If we're live we want poll to do what it has to with the file descriptor. If we're not live
// but we're not in pseudo-realtime mode, let the loop just spin as fast as it can. If we're
// in pseudo-realtime mode, find the next time that a packet is ready and have poll block until
// then.
if ( IsLive() || net_is_processing_suspended() )
return -1;
else if ( ! pseudo_realtime )
return 0;
if ( ! have_packet )
ExtractNextPacketInternal();
double pseudo_time = current_packet.time - first_timestamp;
double ct = (current_time(true) - first_wallclock) * pseudo_realtime;
return std::max(0.0, pseudo_time - ct);
}

View file

@ -204,6 +204,17 @@ public:
*/ */
virtual void Statistics(Stats* stats) = 0; virtual void Statistics(Stats* stats) = 0;
/**
* Return the next timeout value for this source. This should be
* overridden by source classes where they have a timeout value
* that can wake up the poll.
*
* @return A value for the next time that the source thinks the
* poll should time out in seconds from the current time. Return
* -1 if this should should not be considered.
*/
virtual double GetNextTimeout() override;
protected: protected:
friend class Manager; friend class Manager;
friend class ManagerBase; friend class ManagerBase;
@ -343,11 +354,8 @@ private:
bool ExtractNextPacketInternal(); bool ExtractNextPacketInternal();
// IOSource interface implementation. // IOSource interface implementation.
void Init() override; void InitSource() override;
void Done() override; void Done() override;
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
double NextTimestamp(double* local_network_time) override;
void Process() override; void Process() override;
const char* Tag() override; const char* Tag() override;