mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 08:38:20 +00:00
Merge remote-tracking branch 'origin/topic/jsiwek/improve_comm_loop'
* origin/topic/jsiwek/improve_comm_loop: Add a simple FD_Set wrapper/helper class. Fix Pipe copy/assignment to make a copy of flags. Fix possible abort on writing to a full pipe. Remove timeouts from remote communication loop.
This commit is contained in:
commit
e9692958f0
22 changed files with 448 additions and 65 deletions
|
@ -1 +1 @@
|
||||||
Subproject commit 23055b473c689a79da12b2825d8388f71f28c709
|
Subproject commit 783d47c854c97dda6cff9b9eecb8709fe1ee749b
|
|
@ -279,6 +279,7 @@ set(bro_SRCS
|
||||||
EventRegistry.cc
|
EventRegistry.cc
|
||||||
Expr.cc
|
Expr.cc
|
||||||
File.cc
|
File.cc
|
||||||
|
Flare.cc
|
||||||
Frag.cc
|
Frag.cc
|
||||||
Frame.cc
|
Frame.cc
|
||||||
Func.cc
|
Func.cc
|
||||||
|
@ -297,6 +298,7 @@ set(bro_SRCS
|
||||||
OSFinger.cc
|
OSFinger.cc
|
||||||
PacketFilter.cc
|
PacketFilter.cc
|
||||||
PersistenceSerializer.cc
|
PersistenceSerializer.cc
|
||||||
|
Pipe.cc
|
||||||
PolicyFile.cc
|
PolicyFile.cc
|
||||||
PrefixTable.cc
|
PrefixTable.cc
|
||||||
PriorityQueue.cc
|
PriorityQueue.cc
|
||||||
|
|
|
@ -210,6 +210,7 @@ bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
|
||||||
else
|
else
|
||||||
pending_head = pending_tail = q;
|
pending_head = pending_tail = q;
|
||||||
|
|
||||||
|
write_flare.Fire();
|
||||||
return Flush();
|
return Flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +233,7 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
|
||||||
write_len += len;
|
write_len += len;
|
||||||
|
|
||||||
delete chunk;
|
delete chunk;
|
||||||
|
write_flare.Fire();
|
||||||
|
|
||||||
if ( network_time - last_flush > 0.005 )
|
if ( network_time - last_flush > 0.005 )
|
||||||
FlushWriteBuffer();
|
FlushWriteBuffer();
|
||||||
|
@ -269,6 +271,10 @@ bool ChunkedIOFd::FlushWriteBuffer()
|
||||||
if ( unsigned(written) == len )
|
if ( unsigned(written) == len )
|
||||||
{
|
{
|
||||||
write_pos = write_len = 0;
|
write_pos = write_len = 0;
|
||||||
|
|
||||||
|
if ( ! pending_head )
|
||||||
|
write_flare.Extinguish();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,7 +324,12 @@ bool ChunkedIOFd::Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return FlushWriteBuffer();
|
bool rval = FlushWriteBuffer();
|
||||||
|
|
||||||
|
if ( ! pending_head && write_len == 0 )
|
||||||
|
write_flare.Extinguish();
|
||||||
|
|
||||||
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32 ChunkedIOFd::ChunkAvailable()
|
uint32 ChunkedIOFd::ChunkAvailable()
|
||||||
|
@ -394,6 +405,9 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
|
||||||
#ifdef DEBUG_COMMUNICATION
|
#ifdef DEBUG_COMMUNICATION
|
||||||
AddToBuffer("<false:read-chunk>", true);
|
AddToBuffer("<false:read-chunk>", true);
|
||||||
#endif
|
#endif
|
||||||
|
if ( ! ChunkAvailable() )
|
||||||
|
read_flare.Extinguish();
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,9 +416,15 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
|
||||||
#ifdef DEBUG_COMMUNICATION
|
#ifdef DEBUG_COMMUNICATION
|
||||||
AddToBuffer("<null:no-data>", true);
|
AddToBuffer("<null:no-data>", true);
|
||||||
#endif
|
#endif
|
||||||
|
read_flare.Extinguish();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( ChunkAvailable() )
|
||||||
|
read_flare.Fire();
|
||||||
|
else
|
||||||
|
read_flare.Extinguish();
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
if ( *chunk )
|
if ( *chunk )
|
||||||
DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
|
DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
|
||||||
|
@ -481,6 +501,9 @@ bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
|
||||||
read_pos = 0;
|
read_pos = 0;
|
||||||
read_len = bytes_left;
|
read_len = bytes_left;
|
||||||
|
|
||||||
|
if ( ! ChunkAvailable() )
|
||||||
|
read_flare.Extinguish();
|
||||||
|
|
||||||
// If allowed, wait a bit for something to read.
|
// If allowed, wait a bit for something to read.
|
||||||
if ( may_block )
|
if ( may_block )
|
||||||
{
|
{
|
||||||
|
@ -607,6 +630,14 @@ bool ChunkedIOFd::IsFillingUp()
|
||||||
return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
|
return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iosource::FD_Set ChunkedIOFd::ExtraReadFDs() const
|
||||||
|
{
|
||||||
|
iosource::FD_Set rval;
|
||||||
|
rval.Insert(write_flare.FD());
|
||||||
|
rval.Insert(read_flare.FD());
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
void ChunkedIOFd::Clear()
|
void ChunkedIOFd::Clear()
|
||||||
{
|
{
|
||||||
while ( pending_head )
|
while ( pending_head )
|
||||||
|
@ -618,6 +649,9 @@ void ChunkedIOFd::Clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
pending_head = pending_tail = 0;
|
pending_head = pending_tail = 0;
|
||||||
|
|
||||||
|
if ( write_len == 0 )
|
||||||
|
write_flare.Extinguish();
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* ChunkedIOFd::Error()
|
const char* ChunkedIOFd::Error()
|
||||||
|
@ -830,6 +864,7 @@ bool ChunkedIOSSL::Write(Chunk* chunk)
|
||||||
else
|
else
|
||||||
write_head = write_tail = q;
|
write_head = write_tail = q;
|
||||||
|
|
||||||
|
write_flare.Fire();
|
||||||
Flush();
|
Flush();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -935,6 +970,7 @@ bool ChunkedIOSSL::Flush()
|
||||||
write_state = LEN;
|
write_state = LEN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
write_flare.Extinguish();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1104,6 +1140,13 @@ bool ChunkedIOSSL::IsFillingUp()
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iosource::FD_Set ChunkedIOSSL::ExtraReadFDs() const
|
||||||
|
{
|
||||||
|
iosource::FD_Set rval;
|
||||||
|
rval.Insert(write_flare.FD());
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
void ChunkedIOSSL::Clear()
|
void ChunkedIOSSL::Clear()
|
||||||
{
|
{
|
||||||
while ( write_head )
|
while ( write_head )
|
||||||
|
@ -1114,6 +1157,7 @@ void ChunkedIOSSL::Clear()
|
||||||
write_head = next;
|
write_head = next;
|
||||||
}
|
}
|
||||||
write_head = write_tail = 0;
|
write_head = write_tail = 0;
|
||||||
|
write_flare.Extinguish();
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* ChunkedIOSSL::Error()
|
const char* ChunkedIOSSL::Error()
|
||||||
|
|
|
@ -6,7 +6,8 @@
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include "List.h"
|
#include "List.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
|
#include "Flare.h"
|
||||||
|
#include "iosource/FD_Set.h"
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
#ifdef NEED_KRB5_H
|
#ifdef NEED_KRB5_H
|
||||||
|
@ -95,6 +96,11 @@ public:
|
||||||
// Returns underlying fd if available, -1 otherwise.
|
// Returns underlying fd if available, -1 otherwise.
|
||||||
virtual int Fd() { return -1; }
|
virtual int Fd() { return -1; }
|
||||||
|
|
||||||
|
// Returns supplementary file descriptors that become read-ready in order
|
||||||
|
// to signal that there is some work that can be performed.
|
||||||
|
virtual iosource::FD_Set ExtraReadFDs() const
|
||||||
|
{ return iosource::FD_Set(); }
|
||||||
|
|
||||||
// Makes sure that no additional protocol data is written into
|
// Makes sure that no additional protocol data is written into
|
||||||
// the output stream. If this is activated, the output cannot
|
// the output stream. If this is activated, the output cannot
|
||||||
// be read again by any of these classes!
|
// be read again by any of these classes!
|
||||||
|
@ -177,6 +183,7 @@ public:
|
||||||
virtual void Clear();
|
virtual void Clear();
|
||||||
virtual bool Eof() { return eof; }
|
virtual bool Eof() { return eof; }
|
||||||
virtual int Fd() { return fd; }
|
virtual int Fd() { return fd; }
|
||||||
|
virtual iosource::FD_Set ExtraReadFDs() const;
|
||||||
virtual void Stats(char* buffer, int length);
|
virtual void Stats(char* buffer, int length);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -240,6 +247,8 @@ private:
|
||||||
ChunkQueue* pending_tail;
|
ChunkQueue* pending_tail;
|
||||||
|
|
||||||
pid_t pid;
|
pid_t pid;
|
||||||
|
bro::Flare write_flare;
|
||||||
|
bro::Flare read_flare;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Chunked I/O using an SSL connection.
|
// Chunked I/O using an SSL connection.
|
||||||
|
@ -262,6 +271,7 @@ public:
|
||||||
virtual void Clear();
|
virtual void Clear();
|
||||||
virtual bool Eof() { return eof; }
|
virtual bool Eof() { return eof; }
|
||||||
virtual int Fd() { return socket; }
|
virtual int Fd() { return socket; }
|
||||||
|
virtual iosource::FD_Set ExtraReadFDs() const;
|
||||||
virtual void Stats(char* buffer, int length);
|
virtual void Stats(char* buffer, int length);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -303,6 +313,8 @@ private:
|
||||||
|
|
||||||
// One SSL for all connections.
|
// One SSL for all connections.
|
||||||
static SSL_CTX* ctx;
|
static SSL_CTX* ctx;
|
||||||
|
|
||||||
|
bro::Flare write_flare;
|
||||||
};
|
};
|
||||||
|
|
||||||
#include <zlib.h>
|
#include <zlib.h>
|
||||||
|
@ -328,6 +340,8 @@ public:
|
||||||
|
|
||||||
virtual bool Eof() { return io->Eof(); }
|
virtual bool Eof() { return io->Eof(); }
|
||||||
virtual int Fd() { return io->Fd(); }
|
virtual int Fd() { return io->Fd(); }
|
||||||
|
virtual iosource::FD_Set ExtraReadFDs() const
|
||||||
|
{ return io->ExtraReadFDs(); }
|
||||||
virtual void Stats(char* buffer, int length);
|
virtual void Stats(char* buffer, int length);
|
||||||
|
|
||||||
void EnableCompression(int level)
|
void EnableCompression(int level)
|
||||||
|
|
|
@ -1216,9 +1216,10 @@ void DNS_Mgr::IssueAsyncRequests()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void DNS_Mgr::GetFds(int* read, int* write, int* except)
|
void DNS_Mgr::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except)
|
||||||
{
|
{
|
||||||
*read = nb_dns_fd(nb_dns);
|
read->Insert(nb_dns_fd(nb_dns));
|
||||||
}
|
}
|
||||||
|
|
||||||
double DNS_Mgr::NextTimestamp(double* network_time)
|
double DNS_Mgr::NextTimestamp(double* network_time)
|
||||||
|
|
|
@ -132,7 +132,8 @@ protected:
|
||||||
void DoProcess(bool flush);
|
void DoProcess(bool flush);
|
||||||
|
|
||||||
// IOSource interface.
|
// IOSource interface.
|
||||||
virtual void GetFds(int* read, int* write, int* except);
|
virtual void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except);
|
||||||
virtual double NextTimestamp(double* network_time);
|
virtual double NextTimestamp(double* network_time);
|
||||||
virtual void Process();
|
virtual void Process();
|
||||||
virtual const char* Tag() { return "DNS_Mgr"; }
|
virtual const char* Tag() { return "DNS_Mgr"; }
|
||||||
|
|
43
src/Flare.cc
Normal file
43
src/Flare.cc
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#include "Flare.h"
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
using namespace bro;
|
||||||
|
|
||||||
|
Flare::Flare()
|
||||||
|
: pipe(FD_CLOEXEC, FD_CLOEXEC, O_NONBLOCK, O_NONBLOCK)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void Flare::Fire()
|
||||||
|
{
|
||||||
|
char tmp;
|
||||||
|
|
||||||
|
for ( ; ; )
|
||||||
|
{
|
||||||
|
int n = write(pipe.WriteFD(), &tmp, 1);
|
||||||
|
|
||||||
|
if ( n > 0 )
|
||||||
|
// Success -- wrote a byte to pipe.
|
||||||
|
break;
|
||||||
|
|
||||||
|
if ( n < 0 && errno == EAGAIN )
|
||||||
|
// Success -- pipe is full and just need at least one byte in it.
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Loop because either the byte wasn't written or got EINTR error.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Flare::Extinguish()
|
||||||
|
{
|
||||||
|
char tmp[256];
|
||||||
|
|
||||||
|
for ( ; ; )
|
||||||
|
if ( read(pipe.ReadFD(), &tmp, sizeof(tmp)) == -1 && errno == EAGAIN )
|
||||||
|
// Pipe is now drained.
|
||||||
|
break;
|
||||||
|
}
|
44
src/Flare.h
Normal file
44
src/Flare.h
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#ifndef BRO_FLARE_H
|
||||||
|
#define BRO_FLARE_H
|
||||||
|
|
||||||
|
#include "Pipe.h"
|
||||||
|
|
||||||
|
namespace bro {
|
||||||
|
|
||||||
|
class Flare {
|
||||||
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a flare object that can be used to signal a "ready" status via
|
||||||
|
* a file descriptor that may be integrated with select(), poll(), etc.
|
||||||
|
* Not thread-safe, but that should only require Fire()/Extinguish() calls
|
||||||
|
* to be made mutually exclusive (across all copies of a Flare).
|
||||||
|
*/
|
||||||
|
Flare();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a file descriptor that will become ready if the flare has been
|
||||||
|
* Fire()'d and not yet Extinguished()'d.
|
||||||
|
*/
|
||||||
|
int FD() const
|
||||||
|
{ return pipe.ReadFD(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Put the object in the "ready" state.
|
||||||
|
*/
|
||||||
|
void Fire();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take the object out of the "ready" state.
|
||||||
|
*/
|
||||||
|
void Extinguish();
|
||||||
|
|
||||||
|
private:
|
||||||
|
Pipe pipe;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace bro
|
||||||
|
|
||||||
|
#endif // BRO_FLARE_H
|
83
src/Pipe.cc
Normal file
83
src/Pipe.cc
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#include "Pipe.h"
|
||||||
|
#include "Reporter.h"
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <cstdio>
|
||||||
|
|
||||||
|
using namespace bro;
|
||||||
|
|
||||||
|
static void pipe_fail(int eno)
|
||||||
|
{
|
||||||
|
char tmp[256];
|
||||||
|
strerror_r(eno, tmp, sizeof(tmp));
|
||||||
|
reporter->FatalError("Pipe failure: %s", tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void set_flags(int fd, int flags)
|
||||||
|
{
|
||||||
|
if ( flags )
|
||||||
|
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void set_status_flags(int fd, int flags)
|
||||||
|
{
|
||||||
|
if ( flags )
|
||||||
|
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dup_or_fail(int fd, int flags)
|
||||||
|
{
|
||||||
|
int rval = dup(fd);
|
||||||
|
|
||||||
|
if ( rval < 0 )
|
||||||
|
pipe_fail(errno);
|
||||||
|
|
||||||
|
set_flags(fd, flags);
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1)
|
||||||
|
{
|
||||||
|
// pipe2 can set flags atomically, but not yet available everywhere.
|
||||||
|
if ( ::pipe(fds) )
|
||||||
|
pipe_fail(errno);
|
||||||
|
|
||||||
|
flags[0] = flags0;
|
||||||
|
flags[1] = flags1;
|
||||||
|
|
||||||
|
set_flags(fds[0], flags[0]);
|
||||||
|
set_flags(fds[1], flags[1]);
|
||||||
|
set_status_flags(fds[0], status_flags0);
|
||||||
|
set_status_flags(fds[1], status_flags1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipe::~Pipe()
|
||||||
|
{
|
||||||
|
close(fds[0]);
|
||||||
|
close(fds[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipe::Pipe(const Pipe& other)
|
||||||
|
{
|
||||||
|
fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
|
||||||
|
fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
|
||||||
|
flags[0] = other.flags[0];
|
||||||
|
flags[1] = other.flags[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipe& Pipe::operator=(const Pipe& other)
|
||||||
|
{
|
||||||
|
if ( this == &other )
|
||||||
|
return *this;
|
||||||
|
|
||||||
|
close(fds[0]);
|
||||||
|
close(fds[1]);
|
||||||
|
fds[0] = dup_or_fail(other.fds[0], other.flags[0]);
|
||||||
|
fds[1] = dup_or_fail(other.fds[1], other.flags[1]);
|
||||||
|
flags[0] = other.flags[0];
|
||||||
|
flags[1] = other.flags[1];
|
||||||
|
return *this;
|
||||||
|
}
|
56
src/Pipe.h
Normal file
56
src/Pipe.h
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#ifndef BRO_PIPE_H
|
||||||
|
#define BRO_PIPE_H
|
||||||
|
|
||||||
|
namespace bro {
|
||||||
|
|
||||||
|
class Pipe {
|
||||||
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a pair of file descriptors via pipe(), or aborts if it cannot.
|
||||||
|
* @param flags0 file descriptor flags to set on read end of pipe.
|
||||||
|
* @param flags1 file descriptor flags to set on write end of pipe.
|
||||||
|
* @param status_flags0 descriptor status flags to set on read end of pipe.
|
||||||
|
* @param status_flags1 descriptor status flags to set on write end of pipe.
|
||||||
|
*/
|
||||||
|
Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0,
|
||||||
|
int status_flags1 = 0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the pair of file descriptors owned by the object.
|
||||||
|
*/
|
||||||
|
~Pipe();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make a copy of another Pipe object (file descriptors are dup'd).
|
||||||
|
*/
|
||||||
|
Pipe(const Pipe& other);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assign a Pipe object by closing file descriptors and duping those of
|
||||||
|
* the other.
|
||||||
|
*/
|
||||||
|
Pipe& operator=(const Pipe& other);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the file descriptor associated with the read-end of the pipe.
|
||||||
|
*/
|
||||||
|
int ReadFD() const
|
||||||
|
{ return fds[0]; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the file descriptor associated with the write-end of the pipe.
|
||||||
|
*/
|
||||||
|
int WriteFD() const
|
||||||
|
{ return fds[1]; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
int fds[2];
|
||||||
|
int flags[2];
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace bro
|
||||||
|
|
||||||
|
#endif // BRO_PIPE_H
|
|
@ -1367,12 +1367,14 @@ void RemoteSerializer::Unregister(ID* id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteSerializer::GetFds(int* read, int* write, int* except)
|
void RemoteSerializer::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except)
|
||||||
{
|
{
|
||||||
*read = io->Fd();
|
read->Insert(io->Fd());
|
||||||
|
read->Insert(io->ExtraReadFDs());
|
||||||
|
|
||||||
if ( io->CanWrite() )
|
if ( io->CanWrite() )
|
||||||
*write = io->Fd();
|
write->Insert(io->Fd());
|
||||||
}
|
}
|
||||||
|
|
||||||
double RemoteSerializer::NextTimestamp(double* local_network_time)
|
double RemoteSerializer::NextTimestamp(double* local_network_time)
|
||||||
|
@ -3355,6 +3357,15 @@ SocketComm::~SocketComm()
|
||||||
|
|
||||||
static unsigned int first_rtime = 0;
|
static unsigned int first_rtime = 0;
|
||||||
|
|
||||||
|
static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max)
|
||||||
|
{
|
||||||
|
for ( size_t i = 0; i < fds.size(); ++i )
|
||||||
|
{
|
||||||
|
FD_SET(fds[i], set);
|
||||||
|
*max = ::max(fds[i], *max);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void SocketComm::Run()
|
void SocketComm::Run()
|
||||||
{
|
{
|
||||||
first_rtime = (unsigned int) current_time(true);
|
first_rtime = (unsigned int) current_time(true);
|
||||||
|
@ -3376,10 +3387,9 @@ void SocketComm::Run()
|
||||||
FD_ZERO(&fd_write);
|
FD_ZERO(&fd_write);
|
||||||
FD_ZERO(&fd_except);
|
FD_ZERO(&fd_except);
|
||||||
|
|
||||||
int max_fd = 0;
|
int max_fd = io->Fd();
|
||||||
|
|
||||||
FD_SET(io->Fd(), &fd_read);
|
FD_SET(io->Fd(), &fd_read);
|
||||||
max_fd = io->Fd();
|
max_fd = std::max(max_fd, io->ExtraReadFDs().Set(&fd_read));
|
||||||
|
|
||||||
loop_over_list(peers, i)
|
loop_over_list(peers, i)
|
||||||
{
|
{
|
||||||
|
@ -3388,6 +3398,8 @@ void SocketComm::Run()
|
||||||
FD_SET(peers[i]->io->Fd(), &fd_read);
|
FD_SET(peers[i]->io->Fd(), &fd_read);
|
||||||
if ( peers[i]->io->Fd() > max_fd )
|
if ( peers[i]->io->Fd() > max_fd )
|
||||||
max_fd = peers[i]->io->Fd();
|
max_fd = peers[i]->io->Fd();
|
||||||
|
max_fd = std::max(max_fd,
|
||||||
|
peers[i]->io->ExtraReadFDs().Set(&fd_read));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -3438,38 +3450,17 @@ void SocketComm::Run()
|
||||||
if ( ! io->IsFillingUp() && shutting_conns_down )
|
if ( ! io->IsFillingUp() && shutting_conns_down )
|
||||||
shutting_conns_down = false;
|
shutting_conns_down = false;
|
||||||
|
|
||||||
// We cannot rely solely on select() as the there may
|
|
||||||
// be some data left in our input/output queues. So, we use
|
|
||||||
// a small timeout for select and check for data
|
|
||||||
// manually afterwards.
|
|
||||||
|
|
||||||
static long selects = 0;
|
static long selects = 0;
|
||||||
static long canwrites = 0;
|
static long canwrites = 0;
|
||||||
static long timeouts = 0;
|
|
||||||
|
|
||||||
++selects;
|
++selects;
|
||||||
if ( io->CanWrite() )
|
if ( io->CanWrite() )
|
||||||
++canwrites;
|
++canwrites;
|
||||||
|
|
||||||
// FIXME: Fine-tune this (timeouts, flush, etc.)
|
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0);
|
||||||
struct timeval small_timeout;
|
|
||||||
small_timeout.tv_sec = 0;
|
|
||||||
small_timeout.tv_usec =
|
|
||||||
io->CanWrite() || io->CanRead() ? 1 : 10;
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
if ( ! io->CanWrite() )
|
|
||||||
usleep(10);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
|
|
||||||
&small_timeout);
|
|
||||||
|
|
||||||
if ( a == 0 )
|
|
||||||
++timeouts;
|
|
||||||
|
|
||||||
if ( selects % 100000 == 0 )
|
if ( selects % 100000 == 0 )
|
||||||
Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts));
|
Log(fmt("selects=%ld canwrites=%ld", selects, canwrites));
|
||||||
|
|
||||||
if ( a < 0 )
|
if ( a < 0 )
|
||||||
// Ignore errors for now.
|
// Ignore errors for now.
|
||||||
|
|
|
@ -140,7 +140,8 @@ public:
|
||||||
void Finish();
|
void Finish();
|
||||||
|
|
||||||
// Overidden from IOSource:
|
// Overidden from IOSource:
|
||||||
virtual void GetFds(int* read, int* write, int* except);
|
virtual void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except);
|
||||||
virtual double NextTimestamp(double* local_network_time);
|
virtual double NextTimestamp(double* local_network_time);
|
||||||
virtual void Process();
|
virtual void Process();
|
||||||
virtual TimerMgr::Tag* GetCurrentTag();
|
virtual TimerMgr::Tag* GetCurrentTag();
|
||||||
|
|
|
@ -1068,9 +1068,10 @@ void EventPlayer::GotFunctionCall(const char* name, double time,
|
||||||
// We don't replay function calls.
|
// We don't replay function calls.
|
||||||
}
|
}
|
||||||
|
|
||||||
void EventPlayer::GetFds(int* read, int* write, int* except)
|
void EventPlayer::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except)
|
||||||
{
|
{
|
||||||
*read = fd;
|
read->Insert(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
double EventPlayer::NextTimestamp(double* local_network_time)
|
double EventPlayer::NextTimestamp(double* local_network_time)
|
||||||
|
|
|
@ -355,7 +355,8 @@ public:
|
||||||
EventPlayer(const char* file);
|
EventPlayer(const char* file);
|
||||||
virtual ~EventPlayer();
|
virtual ~EventPlayer();
|
||||||
|
|
||||||
virtual void GetFds(int* read, int* write, int* except);
|
virtual void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except);
|
||||||
virtual double NextTimestamp(double* local_network_time);
|
virtual double NextTimestamp(double* local_network_time);
|
||||||
virtual void Process();
|
virtual void Process();
|
||||||
virtual const char* Tag() { return "EventPlayer"; }
|
virtual const char* Tag() { return "EventPlayer"; }
|
||||||
|
|
87
src/iosource/FD_Set.h
Normal file
87
src/iosource/FD_Set.h
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
#ifndef BRO_FD_SET_H
|
||||||
|
#define BRO_FD_SET_H
|
||||||
|
|
||||||
|
#include <set>
|
||||||
|
#include <sys/select.h>
|
||||||
|
|
||||||
|
namespace iosource {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A container holding a set of file descriptors.
|
||||||
|
*/
|
||||||
|
class FD_Set {
|
||||||
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor. The set is initially empty.
|
||||||
|
*/
|
||||||
|
FD_Set() : max(-1), fds()
|
||||||
|
{ }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert a file descriptor in to the set.
|
||||||
|
* @param fd the fd to insert in the set.
|
||||||
|
* @return false if fd was already in the set, else true.
|
||||||
|
*/
|
||||||
|
bool Insert(int fd)
|
||||||
|
{
|
||||||
|
if ( max < fd )
|
||||||
|
max = fd;
|
||||||
|
|
||||||
|
return fds.insert(fd).second;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inserts all the file descriptors from another set in to this one.
|
||||||
|
* @param other a file descriptor set to merge in to this one.
|
||||||
|
*/
|
||||||
|
void Insert(const FD_Set& other)
|
||||||
|
{
|
||||||
|
for ( std::set<int>::const_iterator it = other.fds.begin();
|
||||||
|
it != other.fds.end(); ++it )
|
||||||
|
Insert(*it);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empties the set.
|
||||||
|
*/
|
||||||
|
void Clear()
|
||||||
|
{ max = -1; fds.clear(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert file descriptors in to a fd_set for use with select().
|
||||||
|
* @return the greatest file descriptor inserted.
|
||||||
|
*/
|
||||||
|
int Set(fd_set* set) const
|
||||||
|
{
|
||||||
|
for ( std::set<int>::const_iterator it = fds.begin(); it != fds.end();
|
||||||
|
++it )
|
||||||
|
FD_SET(*it, set);
|
||||||
|
|
||||||
|
return max;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Whether a file descriptor belonging to this set is within the
|
||||||
|
* fd_set arugment.
|
||||||
|
*/
|
||||||
|
bool Ready(fd_set* set) const
|
||||||
|
{
|
||||||
|
for ( std::set<int>::const_iterator it = fds.begin(); it != fds.end();
|
||||||
|
++it )
|
||||||
|
{
|
||||||
|
if ( FD_ISSET(*it, set) )
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int max;
|
||||||
|
std::set<int> fds;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace bro
|
||||||
|
|
||||||
|
#endif // BRO_FD_SET_H
|
|
@ -8,7 +8,7 @@ extern "C" {
|
||||||
}
|
}
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include "FD_Set.h"
|
||||||
#include "Timer.h"
|
#include "Timer.h"
|
||||||
|
|
||||||
namespace iosource {
|
namespace iosource {
|
||||||
|
@ -55,13 +55,13 @@ public:
|
||||||
* Returns select'able file descriptors for this source. Leaves the
|
* Returns select'able file descriptors for this source. Leaves the
|
||||||
* passed values untouched if not available.
|
* passed values untouched if not available.
|
||||||
*
|
*
|
||||||
* @param read Pointer to where to store a read descriptor.
|
* @param read Pointer to container where to insert a read descriptor.
|
||||||
*
|
*
|
||||||
* @param write Pointer to where to store a write descriptor.
|
* @param write Pointer to container where to insert a write descriptor.
|
||||||
*
|
*
|
||||||
* @param except Pointer to where to store a except descriptor.
|
* @param except Pointer to container where to insert a except descriptor.
|
||||||
*/
|
*/
|
||||||
virtual void GetFds(int* read, int* write, int* except) = 0;
|
virtual void GetFds(FD_Set* read, FD_Set* write, FD_Set* except) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the timestamp (in \a global network time) associated with
|
* Returns the timestamp (in \a global network time) associated with
|
||||||
|
|
|
@ -115,16 +115,9 @@ IOSource* Manager::FindSoonest(double* ts)
|
||||||
// be ready.
|
// be ready.
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
src->fd_read = src->fd_write = src->fd_except = 0;
|
src->Clear();
|
||||||
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
|
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
|
||||||
|
src->SetFds(&fd_read, &fd_write, &fd_except, &maxx);
|
||||||
FD_SET(src->fd_read, &fd_read);
|
|
||||||
FD_SET(src->fd_write, &fd_write);
|
|
||||||
FD_SET(src->fd_except, &fd_except);
|
|
||||||
|
|
||||||
maxx = std::max(src->fd_read, maxx);
|
|
||||||
maxx = std::max(src->fd_write, maxx);
|
|
||||||
maxx = std::max(src->fd_except, maxx);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can't block indefinitely even when all sources are dry:
|
// We can't block indefinitely even when all sources are dry:
|
||||||
|
@ -164,9 +157,7 @@ IOSource* Manager::FindSoonest(double* ts)
|
||||||
if ( ! src->src->IsIdle() )
|
if ( ! src->src->IsIdle() )
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if ( FD_ISSET(src->fd_read, &fd_read) ||
|
if ( src->Ready(&fd_read, &fd_write, &fd_except) )
|
||||||
FD_ISSET(src->fd_write, &fd_write) ||
|
|
||||||
FD_ISSET(src->fd_except, &fd_except) )
|
|
||||||
{
|
{
|
||||||
double local_network_time = 0;
|
double local_network_time = 0;
|
||||||
double ts = src->src->NextTimestamp(&local_network_time);
|
double ts = src->src->NextTimestamp(&local_network_time);
|
||||||
|
@ -310,3 +301,11 @@ PktDumper* Manager::OpenPktDumper(const string& path, bool append)
|
||||||
|
|
||||||
return pd;
|
return pd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Manager::Source::SetFds(fd_set* read, fd_set* write, fd_set* except,
|
||||||
|
int* maxx) const
|
||||||
|
{
|
||||||
|
*maxx = std::max(*maxx, fd_read.Set(read));
|
||||||
|
*maxx = std::max(*maxx, fd_write.Set(write));
|
||||||
|
*maxx = std::max(*maxx, fd_except.Set(except));
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
#include "iosource/FD_Set.h"
|
||||||
|
|
||||||
namespace iosource {
|
namespace iosource {
|
||||||
|
|
||||||
|
@ -113,9 +114,19 @@ private:
|
||||||
|
|
||||||
struct Source {
|
struct Source {
|
||||||
IOSource* src;
|
IOSource* src;
|
||||||
int fd_read;
|
FD_Set fd_read;
|
||||||
int fd_write;
|
FD_Set fd_write;
|
||||||
int fd_except;
|
FD_Set fd_except;
|
||||||
|
|
||||||
|
bool Ready(fd_set* read, fd_set* write, fd_set* except) const
|
||||||
|
{ return fd_read.Ready(read) || fd_write.Ready(write) ||
|
||||||
|
fd_except.Ready(except); }
|
||||||
|
|
||||||
|
void SetFds(fd_set* read, fd_set* write, fd_set* except,
|
||||||
|
int* maxx) const;
|
||||||
|
|
||||||
|
void Clear()
|
||||||
|
{ fd_read.Clear(); fd_write.Clear(); fd_except.Clear(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef std::list<Source*> SourceList;
|
typedef std::list<Source*> SourceList;
|
||||||
|
|
|
@ -218,7 +218,8 @@ void PktSrc::Done()
|
||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
void PktSrc::GetFds(int* read, int* write, int* except)
|
void PktSrc::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except)
|
||||||
{
|
{
|
||||||
if ( pseudo_realtime )
|
if ( pseudo_realtime )
|
||||||
{
|
{
|
||||||
|
@ -229,7 +230,7 @@ void PktSrc::GetFds(int* read, int* write, int* except)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( IsOpen() && props.selectable_fd >= 0 )
|
if ( IsOpen() && props.selectable_fd >= 0 )
|
||||||
*read = props.selectable_fd;
|
read->Insert(props.selectable_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
double PktSrc::NextTimestamp(double* local_network_time)
|
double PktSrc::NextTimestamp(double* local_network_time)
|
||||||
|
|
|
@ -388,7 +388,8 @@ private:
|
||||||
// IOSource interface implementation.
|
// IOSource interface implementation.
|
||||||
virtual void Init();
|
virtual void Init();
|
||||||
virtual void Done();
|
virtual void Done();
|
||||||
virtual void GetFds(int* read, int* write, int* except);
|
virtual void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except);
|
||||||
virtual double NextTimestamp(double* local_network_time);
|
virtual double NextTimestamp(double* local_network_time);
|
||||||
virtual void Process();
|
virtual void Process();
|
||||||
virtual const char* Tag();
|
virtual const char* Tag();
|
||||||
|
|
|
@ -65,7 +65,8 @@ void Manager::AddMsgThread(MsgThread* thread)
|
||||||
msg_threads.push_back(thread);
|
msg_threads.push_back(thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::GetFds(int* read, int* write, int* except)
|
void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,8 @@ protected:
|
||||||
/**
|
/**
|
||||||
* Part of the IOSource interface.
|
* Part of the IOSource interface.
|
||||||
*/
|
*/
|
||||||
virtual void GetFds(int* read, int* write, int* except);
|
virtual void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||||
|
iosource::FD_Set* except);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Part of the IOSource interface.
|
* Part of the IOSource interface.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue