Remove timeouts from remote communication loop.

The select() now blocks until there's work to do instead of relying on a
small timeout value which can cause unproductive use of cpu cycles.
This commit is contained in:
Jon Siwek 2014-08-28 13:13:30 -05:00
parent 73cc81f44a
commit 675fba3fde
21 changed files with 364 additions and 58 deletions

View file

@ -279,6 +279,7 @@ set(bro_SRCS
EventRegistry.cc
Expr.cc
File.cc
Flare.cc
FlowSrc.cc
Frag.cc
Frame.cc
@ -299,6 +300,7 @@ set(bro_SRCS
OSFinger.cc
PacketFilter.cc
PersistenceSerializer.cc
Pipe.cc
PktSrc.cc
PolicyFile.cc
PrefixTable.cc

View file

@ -210,6 +210,7 @@ bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
else
pending_head = pending_tail = q;
write_flare.Fire();
return Flush();
}
@ -232,6 +233,7 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
write_len += len;
delete chunk;
write_flare.Fire();
if ( network_time - last_flush > 0.005 )
FlushWriteBuffer();
@ -269,6 +271,10 @@ bool ChunkedIOFd::FlushWriteBuffer()
if ( unsigned(written) == len )
{
write_pos = write_len = 0;
if ( ! pending_head )
write_flare.Extinguish();
return true;
}
@ -318,7 +324,12 @@ bool ChunkedIOFd::Flush()
}
}
return FlushWriteBuffer();
bool rval = FlushWriteBuffer();
if ( ! pending_head && write_len == 0 )
write_flare.Extinguish();
return rval;
}
uint32 ChunkedIOFd::ChunkAvailable()
@ -394,6 +405,9 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
#ifdef DEBUG_COMMUNICATION
AddToBuffer("<false:read-chunk>", true);
#endif
if ( ! ChunkAvailable() )
read_flare.Extinguish();
return false;
}
@ -402,9 +416,15 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
#ifdef DEBUG_COMMUNICATION
AddToBuffer("<null:no-data>", true);
#endif
read_flare.Extinguish();
return true;
}
if ( ChunkAvailable() )
read_flare.Fire();
else
read_flare.Extinguish();
#ifdef DEBUG
if ( *chunk )
DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
@ -481,6 +501,9 @@ bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
read_pos = 0;
read_len = bytes_left;
if ( ! ChunkAvailable() )
read_flare.Extinguish();
// If allowed, wait a bit for something to read.
if ( may_block )
{
@ -607,6 +630,14 @@ bool ChunkedIOFd::IsFillingUp()
return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
}
std::vector<int> ChunkedIOFd::FdSupplements() const
{
std::vector<int> rval;
rval.push_back(write_flare.FD());
rval.push_back(read_flare.FD());
return rval;
}
void ChunkedIOFd::Clear()
{
while ( pending_head )
@ -618,6 +649,9 @@ void ChunkedIOFd::Clear()
}
pending_head = pending_tail = 0;
if ( write_len == 0 )
write_flare.Extinguish();
}
const char* ChunkedIOFd::Error()
@ -830,6 +864,7 @@ bool ChunkedIOSSL::Write(Chunk* chunk)
else
write_head = write_tail = q;
write_flare.Fire();
Flush();
return true;
}
@ -935,6 +970,7 @@ bool ChunkedIOSSL::Flush()
write_state = LEN;
}
write_flare.Extinguish();
return true;
}
@ -1104,6 +1140,13 @@ bool ChunkedIOSSL::IsFillingUp()
return false;
}
std::vector<int> ChunkedIOSSL::FdSupplements() const
{
std::vector<int> rval;
rval.push_back(write_flare.FD());
return rval;
}
void ChunkedIOSSL::Clear()
{
while ( write_head )
@ -1114,6 +1157,7 @@ void ChunkedIOSSL::Clear()
write_head = next;
}
write_head = write_tail = 0;
write_flare.Extinguish();
}
const char* ChunkedIOSSL::Error()

View file

@ -6,8 +6,9 @@
#include "config.h"
#include "List.h"
#include "util.h"
#include "Flare.h"
#include <list>
#include <vector>
#ifdef NEED_KRB5_H
# include <krb5.h>
@ -95,6 +96,11 @@ public:
// Returns underlying fd if available, -1 otherwise.
virtual int Fd() { return -1; }
// Returns supplementary file descriptors that become read-ready in order
// to signal that there is some work that can be performed.
virtual std::vector<int> FdSupplements() const
{ return std::vector<int>(); }
// Makes sure that no additional protocol data is written into
// the output stream. If this is activated, the output cannot
// be read again by any of these classes!
@ -177,6 +183,7 @@ public:
virtual void Clear();
virtual bool Eof() { return eof; }
virtual int Fd() { return fd; }
virtual std::vector<int> FdSupplements() const;
virtual void Stats(char* buffer, int length);
private:
@ -240,6 +247,8 @@ private:
ChunkQueue* pending_tail;
pid_t pid;
bro::Flare write_flare;
bro::Flare read_flare;
};
// Chunked I/O using an SSL connection.
@ -262,6 +271,7 @@ public:
virtual void Clear();
virtual bool Eof() { return eof; }
virtual int Fd() { return socket; }
virtual std::vector<int> FdSupplements() const;
virtual void Stats(char* buffer, int length);
private:
@ -303,6 +313,8 @@ private:
// One SSL for all connections.
static SSL_CTX* ctx;
bro::Flare write_flare;
};
#include <zlib.h>
@ -328,6 +340,8 @@ public:
virtual bool Eof() { return io->Eof(); }
virtual int Fd() { return io->Fd(); }
virtual std::vector<int> FdSupplements() const
{ return io->FdSupplements(); }
virtual void Stats(char* buffer, int length);
void EnableCompression(int level)

View file

@ -1217,9 +1217,10 @@ void DNS_Mgr::IssueAsyncRequests()
}
}
void DNS_Mgr::GetFds(int* read, int* write, int* except)
void DNS_Mgr::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
*read = nb_dns_fd(nb_dns);
read->push_back(nb_dns_fd(nb_dns));
}
double DNS_Mgr::NextTimestamp(double* network_time)

View file

@ -132,7 +132,8 @@ protected:
void DoProcess(bool flush);
// IOSource interface.
virtual void GetFds(int* read, int* write, int* except);
virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
virtual double NextTimestamp(double* network_time);
virtual void Process();
virtual const char* Tag() { return "DNS_Mgr"; }

29
src/Flare.cc Normal file
View file

@ -0,0 +1,29 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "Flare.h"
#include "util.h"
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
using namespace bro;
Flare::Flare()
: pipe(FD_CLOEXEC, FD_CLOEXEC, O_NONBLOCK, O_NONBLOCK)
{
}
void Flare::Fire()
{
char tmp;
safe_write(pipe.WriteFD(), &tmp, 1);
}
void Flare::Extinguish()
{
char tmp[256];
for ( ; ; )
if ( read(pipe.ReadFD(), &tmp, sizeof(tmp)) == -1 && errno == EAGAIN )
break;
}

45
src/Flare.h Normal file
View file

@ -0,0 +1,45 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef BRO_FLARE_H
#define BRO_FLARE_H
#include "Pipe.h"
namespace bro {
class Flare {
public:
/**
* Create a flare object that can be used to signal a "ready" status via
* a file descriptor that may be integrated with select(), poll(), etc.
* Not thread-safe, but that should only require Fire()/Extinguish() calls
* to be made mutually exclusive (across all copies of a Flare).
*/
Flare();
/**
* @return a file descriptor that will become ready if the flare has been
* Fire()'d and not yet Extinguished()'d.
*/
int FD() const
{ return pipe.ReadFD(); }
/**
* Put the object in the "ready" state.
*/
void Fire();
/**
* Take the object out of the "ready" state.
*/
void Extinguish();
private:
Pipe pipe;
};
} // namespace bro
#endif // BRO_FLARE_H

View file

@ -28,10 +28,11 @@ FlowSrc::~FlowSrc()
delete netflow_analyzer;
}
void FlowSrc::GetFds(int* read, int* write, int* except)
void FlowSrc::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
if ( selectable_fd >= 0 )
*read = selectable_fd;
read->push_back(selectable_fd);
}
double FlowSrc::NextTimestamp(double* network_time)

View file

@ -34,7 +34,8 @@ public:
// IOSource interface:
bool IsReady();
void GetFds(int* read, int* write, int* except);
void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
double NextTimestamp(double* network_time);
void Process();

View file

@ -24,6 +24,15 @@ void IOSourceRegistry::RemoveAll()
dont_counts = sources.size();
}
static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max)
{
for ( size_t i = 0; i < fds.size(); ++i )
{
FD_SET(fds[i], set);
*max = ::max(fds[i], *max);
}
}
IOSource* IOSourceRegistry::FindSoonest(double* ts)
{
// Remove sources which have gone dry. For simplicity, we only
@ -94,16 +103,14 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts)
// be ready.
continue;
src->fd_read = src->fd_write = src->fd_except = 0;
src->fd_read.clear();
src->fd_write.clear();
src->fd_except.clear();
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
FD_SET(src->fd_read, &fd_read);
FD_SET(src->fd_write, &fd_write);
FD_SET(src->fd_except, &fd_except);
maxx = max(src->fd_read, maxx);
maxx = max(src->fd_write, maxx);
maxx = max(src->fd_except, maxx);
fd_vector_set(src->fd_read, &fd_read, &maxx);
fd_vector_set(src->fd_write, &fd_write, &maxx);
fd_vector_set(src->fd_except, &fd_except, &maxx);
}
// We can't block indefinitely even when all sources are dry:
@ -143,9 +150,7 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts)
if ( ! src->src->IsIdle() )
continue;
if ( FD_ISSET(src->fd_read, &fd_read) ||
FD_ISSET(src->fd_write, &fd_write) ||
FD_ISSET(src->fd_except, &fd_except) )
if ( src->Ready(&fd_read, &fd_write, &fd_except) )
{
double local_network_time = 0;
double ts = src->src->NextTimestamp(&local_network_time);
@ -174,3 +179,23 @@ void IOSourceRegistry::Register(IOSource* src, bool dont_count)
++dont_counts;
return sources.push_back(s);
}
static bool fd_vector_ready(const std::vector<int>& fds, fd_set* set)
{
for ( size_t i = 0; i < fds.size(); ++i )
if ( FD_ISSET(fds[i], set) )
return true;
return false;
}
bool IOSourceRegistry::Source::Ready(fd_set* read, fd_set* write,
fd_set* except) const
{
if ( fd_vector_ready(fd_read, read) ||
fd_vector_ready(fd_write, write) ||
fd_vector_ready(fd_except, except) )
return true;
return false;
}

View file

@ -4,6 +4,8 @@
#define iosource_h
#include <list>
#include <vector>
#include <sys/select.h>
#include "Timer.h"
using namespace std;
@ -22,7 +24,8 @@ public:
// Returns select'able fds (leaves args untouched if we don't have
// selectable fds).
virtual void GetFds(int* read, int* write, int* except) = 0;
virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except) = 0;
// The following two methods are only called when either IsIdle()
// returns false or select() on one of the fds indicates that there's
@ -89,9 +92,11 @@ protected:
struct Source {
IOSource* src;
int fd_read;
int fd_write;
int fd_except;
std::vector<int> fd_read;
std::vector<int> fd_write;
std::vector<int> fd_except;
bool Ready(fd_set* read, fd_set* write, fd_set* except) const;
};
typedef list<Source*> SourceList;

79
src/Pipe.cc Normal file
View file

@ -0,0 +1,79 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "Pipe.h"
#include "Reporter.h"
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <cstdio>
using namespace bro;
static void pipe_fail(int eno)
{
char tmp[256];
strerror_r(eno, tmp, sizeof(tmp));
reporter->FatalError("Pipe failure: %s", tmp);
}
static void set_flags(int fd, int flags)
{
if ( flags )
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags);
}
static void set_status_flags(int fd, int flags)
{
if ( flags )
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
}
static int dup_or_fail(int fd, int flags)
{
int rval = dup(fd);
if ( rval < 0 )
pipe_fail(errno);
set_flags(fd, flags);
return rval;
}
Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1)
{
// pipe2 can set flags atomically, but not yet available everywhere.
if ( ::pipe(fds) )
pipe_fail(errno);
flags[0] = flags0;
flags[1] = flags1;
set_flags(fds[0], flags[0]);
set_flags(fds[1], flags[1]);
set_status_flags(fds[0], status_flags0);
set_status_flags(fds[1], status_flags1);
}
Pipe::~Pipe()
{
close(fds[0]);
close(fds[1]);
}
Pipe::Pipe(const Pipe& other)
{
fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
}
Pipe& Pipe::operator=(const Pipe& other)
{
if ( this == &other )
return *this;
close(fds[0]);
close(fds[1]);
fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
return *this;
}

57
src/Pipe.h Normal file
View file

@ -0,0 +1,57 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef BRO_PIPE_H
#define BRO_PIPE_H
namespace bro {
class Pipe {
public:
/**
* Create a pair of file descriptors via pipe(), or aborts if it cannot.
* @param flags0 file descriptor flags to set on read end of pipe.
* @param flags1 file descriptor flags to set on write end of pipe.
* @param status_flags0 descriptor status flags to set on read end of pipe.
* @param status_flags1 descriptor status flags to set on write end of pipe.
*/
Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0,
int status_flags1 = 0);
/**
* Close the pair of file descriptors owned by the object.
*/
~Pipe();
/**
* Make a copy of another Pipe object (file descriptors are dup'd).
*/
Pipe(const Pipe& other);
/**
* Assign a Pipe object by closing file descriptors and duping those of
* the other.
*/
Pipe& operator=(const Pipe& other);
/**
* @return the file descriptor associated with the read-end of the pipe.
*/
int ReadFD() const
{ return fds[0]; }
/**
* @return the file descriptor associated with the write-end of the pipe.
*/
int WriteFD() const
{ return fds[1]; }
private:
int fds[2];
int flags[2];
};
} // namespace bro
#endif // BRO_PIPE_H

View file

@ -51,7 +51,8 @@ PktSrc::~PktSrc()
delete [] readfile;
}
void PktSrc::GetFds(int* read, int* write, int* except)
void PktSrc::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
if ( pseudo_realtime )
{
@ -62,7 +63,7 @@ void PktSrc::GetFds(int* read, int* write, int* except)
}
if ( selectable_fd >= 0 )
*read = selectable_fd;
read->push_back(selectable_fd);
}
int PktSrc::ExtractNextPacket()

View file

@ -98,7 +98,8 @@ public:
// IOSource interface
bool IsReady();
void GetFds(int* read, int* write, int* except);
void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
double NextTimestamp(double* local_network_time);
void Process();
const char* Tag() { return "PktSrc"; }

View file

@ -1368,12 +1368,17 @@ void RemoteSerializer::Unregister(ID* id)
}
}
void RemoteSerializer::GetFds(int* read, int* write, int* except)
void RemoteSerializer::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
*read = io->Fd();
read->push_back(io->Fd());
std::vector<int> supp = io->FdSupplements();
for ( size_t i = 0; i < supp.size(); ++i )
read->push_back(supp[i]);
if ( io->CanWrite() )
*write = io->Fd();
write->push_back(io->Fd());
}
double RemoteSerializer::NextTimestamp(double* local_network_time)
@ -3356,6 +3361,15 @@ SocketComm::~SocketComm()
static unsigned int first_rtime = 0;
static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max)
{
for ( size_t i = 0; i < fds.size(); ++i )
{
FD_SET(fds[i], set);
*max = ::max(fds[i], *max);
}
}
void SocketComm::Run()
{
first_rtime = (unsigned int) current_time(true);
@ -3381,6 +3395,7 @@ void SocketComm::Run()
FD_SET(io->Fd(), &fd_read);
max_fd = io->Fd();
fd_vector_set(io->FdSupplements(), &fd_read, &max_fd);
loop_over_list(peers, i)
{
@ -3389,6 +3404,7 @@ void SocketComm::Run()
FD_SET(peers[i]->io->Fd(), &fd_read);
if ( peers[i]->io->Fd() > max_fd )
max_fd = peers[i]->io->Fd();
fd_vector_set(peers[i]->io->FdSupplements(), &fd_read, &max_fd);
}
else
{
@ -3439,38 +3455,17 @@ void SocketComm::Run()
if ( ! io->IsFillingUp() && shutting_conns_down )
shutting_conns_down = false;
// We cannot rely solely on select() as the there may
// be some data left in our input/output queues. So, we use
// a small timeout for select and check for data
// manually afterwards.
static long selects = 0;
static long canwrites = 0;
static long timeouts = 0;
++selects;
if ( io->CanWrite() )
++canwrites;
// FIXME: Fine-tune this (timeouts, flush, etc.)
struct timeval small_timeout;
small_timeout.tv_sec = 0;
small_timeout.tv_usec =
io->CanWrite() || io->CanRead() ? 1 : 10;
#if 0
if ( ! io->CanWrite() )
usleep(10);
#endif
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
&small_timeout);
if ( a == 0 )
++timeouts;
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0);
if ( selects % 100000 == 0 )
Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts));
Log(fmt("selects=%ld canwrites=%ld", selects, canwrites));
if ( a < 0 )
// Ignore errors for now.

View file

@ -140,7 +140,8 @@ public:
void Finish();
// Overidden from IOSource:
virtual void GetFds(int* read, int* write, int* except);
virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
virtual double NextTimestamp(double* local_network_time);
virtual void Process();
virtual TimerMgr::Tag* GetCurrentTag();

View file

@ -1067,9 +1067,10 @@ void EventPlayer::GotFunctionCall(const char* name, double time,
// We don't replay function calls.
}
void EventPlayer::GetFds(int* read, int* write, int* except)
void EventPlayer::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
*read = fd;
read->push_back(fd);
}
double EventPlayer::NextTimestamp(double* local_network_time)

View file

@ -355,7 +355,8 @@ public:
EventPlayer(const char* file);
virtual ~EventPlayer();
virtual void GetFds(int* read, int* write, int* except);
virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
virtual double NextTimestamp(double* local_network_time);
virtual void Process();
virtual const char* Tag() { return "EventPlayer"; }

View file

@ -65,7 +65,8 @@ void Manager::AddMsgThread(MsgThread* thread)
msg_threads.push_back(thread);
}
void Manager::GetFds(int* read, int* write, int* except)
void Manager::GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except)
{
}

View file

@ -103,7 +103,8 @@ protected:
/**
* Part of the IOSource interface.
*/
virtual void GetFds(int* read, int* write, int* except);
virtual void GetFds(std::vector<int>* read, std::vector<int>* write,
std::vector<int>* except);
/**
* Part of the IOSource interface.