diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 04867b7189..3764533b66 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -279,6 +279,7 @@ set(bro_SRCS EventRegistry.cc Expr.cc File.cc + Flare.cc FlowSrc.cc Frag.cc Frame.cc @@ -299,6 +300,7 @@ set(bro_SRCS OSFinger.cc PacketFilter.cc PersistenceSerializer.cc + Pipe.cc PktSrc.cc PolicyFile.cc PrefixTable.cc diff --git a/src/ChunkedIO.cc b/src/ChunkedIO.cc index 54e2e59575..a94eb98748 100644 --- a/src/ChunkedIO.cc +++ b/src/ChunkedIO.cc @@ -210,6 +210,7 @@ bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial) else pending_head = pending_tail = q; + write_flare.Fire(); return Flush(); } @@ -232,6 +233,7 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk) write_len += len; delete chunk; + write_flare.Fire(); if ( network_time - last_flush > 0.005 ) FlushWriteBuffer(); @@ -269,6 +271,10 @@ bool ChunkedIOFd::FlushWriteBuffer() if ( unsigned(written) == len ) { write_pos = write_len = 0; + + if ( ! pending_head ) + write_flare.Extinguish(); + return true; } @@ -318,7 +324,12 @@ bool ChunkedIOFd::Flush() } } - return FlushWriteBuffer(); + bool rval = FlushWriteBuffer(); + + if ( ! pending_head && write_len == 0 ) + write_flare.Extinguish(); + + return rval; } uint32 ChunkedIOFd::ChunkAvailable() @@ -394,6 +405,9 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block) #ifdef DEBUG_COMMUNICATION AddToBuffer("", true); #endif + if ( ! ChunkAvailable() ) + read_flare.Extinguish(); + return false; } @@ -402,9 +416,15 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block) #ifdef DEBUG_COMMUNICATION AddToBuffer("", true); #endif + read_flare.Extinguish(); return true; } + if ( ChunkAvailable() ) + read_flare.Fire(); + else + read_flare.Extinguish(); + #ifdef DEBUG if ( *chunk ) DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]", @@ -481,6 +501,9 @@ bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block) read_pos = 0; read_len = bytes_left; + if ( ! ChunkAvailable() ) + read_flare.Extinguish(); + // If allowed, wait a bit for something to read. if ( may_block ) { @@ -607,6 +630,14 @@ bool ChunkedIOFd::IsFillingUp() return stats.pending > MAX_BUFFERED_CHUNKS_SOFT; } +std::vector ChunkedIOFd::FdSupplements() const + { + std::vector rval; + rval.push_back(write_flare.FD()); + rval.push_back(read_flare.FD()); + return rval; + } + void ChunkedIOFd::Clear() { while ( pending_head ) @@ -618,6 +649,9 @@ void ChunkedIOFd::Clear() } pending_head = pending_tail = 0; + + if ( write_len == 0 ) + write_flare.Extinguish(); } const char* ChunkedIOFd::Error() @@ -830,6 +864,7 @@ bool ChunkedIOSSL::Write(Chunk* chunk) else write_head = write_tail = q; + write_flare.Fire(); Flush(); return true; } @@ -935,6 +970,7 @@ bool ChunkedIOSSL::Flush() write_state = LEN; } + write_flare.Extinguish(); return true; } @@ -1104,6 +1140,13 @@ bool ChunkedIOSSL::IsFillingUp() return false; } +std::vector ChunkedIOSSL::FdSupplements() const + { + std::vector rval; + rval.push_back(write_flare.FD()); + return rval; + } + void ChunkedIOSSL::Clear() { while ( write_head ) @@ -1114,6 +1157,7 @@ void ChunkedIOSSL::Clear() write_head = next; } write_head = write_tail = 0; + write_flare.Extinguish(); } const char* ChunkedIOSSL::Error() diff --git a/src/ChunkedIO.h b/src/ChunkedIO.h index a9865e4c05..c640e529b8 100644 --- a/src/ChunkedIO.h +++ b/src/ChunkedIO.h @@ -6,8 +6,9 @@ #include "config.h" #include "List.h" #include "util.h" - +#include "Flare.h" #include +#include #ifdef NEED_KRB5_H # include @@ -95,6 +96,11 @@ public: // Returns underlying fd if available, -1 otherwise. virtual int Fd() { return -1; } + // Returns supplementary file descriptors that become read-ready in order + // to signal that there is some work that can be performed. + virtual std::vector FdSupplements() const + { return std::vector(); } + // Makes sure that no additional protocol data is written into // the output stream. If this is activated, the output cannot // be read again by any of these classes! @@ -177,6 +183,7 @@ public: virtual void Clear(); virtual bool Eof() { return eof; } virtual int Fd() { return fd; } + virtual std::vector FdSupplements() const; virtual void Stats(char* buffer, int length); private: @@ -240,6 +247,8 @@ private: ChunkQueue* pending_tail; pid_t pid; + bro::Flare write_flare; + bro::Flare read_flare; }; // Chunked I/O using an SSL connection. @@ -262,6 +271,7 @@ public: virtual void Clear(); virtual bool Eof() { return eof; } virtual int Fd() { return socket; } + virtual std::vector FdSupplements() const; virtual void Stats(char* buffer, int length); private: @@ -303,6 +313,8 @@ private: // One SSL for all connections. static SSL_CTX* ctx; + + bro::Flare write_flare; }; #include @@ -328,6 +340,8 @@ public: virtual bool Eof() { return io->Eof(); } virtual int Fd() { return io->Fd(); } + virtual std::vector FdSupplements() const + { return io->FdSupplements(); } virtual void Stats(char* buffer, int length); void EnableCompression(int level) diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc index 9188d61b96..9fb5c8bb87 100644 --- a/src/DNS_Mgr.cc +++ b/src/DNS_Mgr.cc @@ -1217,9 +1217,10 @@ void DNS_Mgr::IssueAsyncRequests() } } -void DNS_Mgr::GetFds(int* read, int* write, int* except) +void DNS_Mgr::GetFds(std::vector* read, std::vector* write, + std::vector* except) { - *read = nb_dns_fd(nb_dns); + read->push_back(nb_dns_fd(nb_dns)); } double DNS_Mgr::NextTimestamp(double* network_time) diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h index 7864505add..fa19914add 100644 --- a/src/DNS_Mgr.h +++ b/src/DNS_Mgr.h @@ -132,7 +132,8 @@ protected: void DoProcess(bool flush); // IOSource interface. - virtual void GetFds(int* read, int* write, int* except); + virtual void GetFds(std::vector* read, std::vector* write, + std::vector* except); virtual double NextTimestamp(double* network_time); virtual void Process(); virtual const char* Tag() { return "DNS_Mgr"; } diff --git a/src/Flare.cc b/src/Flare.cc new file mode 100644 index 0000000000..8a0418f631 --- /dev/null +++ b/src/Flare.cc @@ -0,0 +1,29 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "Flare.h" +#include "util.h" +#include +#include +#include + +using namespace bro; + +Flare::Flare() + : pipe(FD_CLOEXEC, FD_CLOEXEC, O_NONBLOCK, O_NONBLOCK) + { + } + +void Flare::Fire() + { + char tmp; + safe_write(pipe.WriteFD(), &tmp, 1); + } + +void Flare::Extinguish() + { + char tmp[256]; + + for ( ; ; ) + if ( read(pipe.ReadFD(), &tmp, sizeof(tmp)) == -1 && errno == EAGAIN ) + break; + } diff --git a/src/Flare.h b/src/Flare.h new file mode 100644 index 0000000000..4e6378847a --- /dev/null +++ b/src/Flare.h @@ -0,0 +1,45 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef BRO_FLARE_H +#define BRO_FLARE_H + +#include "Pipe.h" + +namespace bro { + +class Flare { +public: + + /** + * Create a flare object that can be used to signal a "ready" status via + * a file descriptor that may be integrated with select(), poll(), etc. + * Not thread-safe, but that should only require Fire()/Extinguish() calls + * to be made mutually exclusive (across all copies of a Flare). + */ + Flare(); + + /** + * @return a file descriptor that will become ready if the flare has been + * Fire()'d and not yet Extinguished()'d. + */ + int FD() const + { return pipe.ReadFD(); } + + /** + * Put the object in the "ready" state. + */ + void Fire(); + + /** + * Take the object out of the "ready" state. + */ + void Extinguish(); + +private: + + Pipe pipe; +}; + +} // namespace bro + +#endif // BRO_FLARE_H diff --git a/src/FlowSrc.cc b/src/FlowSrc.cc index 8eed94fcea..4999d9cb97 100644 --- a/src/FlowSrc.cc +++ b/src/FlowSrc.cc @@ -28,10 +28,11 @@ FlowSrc::~FlowSrc() delete netflow_analyzer; } -void FlowSrc::GetFds(int* read, int* write, int* except) +void FlowSrc::GetFds(std::vector* read, std::vector* write, + std::vector* except) { if ( selectable_fd >= 0 ) - *read = selectable_fd; + read->push_back(selectable_fd); } double FlowSrc::NextTimestamp(double* network_time) diff --git a/src/FlowSrc.h b/src/FlowSrc.h index 03dda2761d..ee927604e1 100644 --- a/src/FlowSrc.h +++ b/src/FlowSrc.h @@ -34,7 +34,8 @@ public: // IOSource interface: bool IsReady(); - void GetFds(int* read, int* write, int* except); + void GetFds(std::vector* read, std::vector* write, + std::vector* except); double NextTimestamp(double* network_time); void Process(); diff --git a/src/IOSource.cc b/src/IOSource.cc index d47007caad..540b797162 100644 --- a/src/IOSource.cc +++ b/src/IOSource.cc @@ -24,6 +24,15 @@ void IOSourceRegistry::RemoveAll() dont_counts = sources.size(); } +static void fd_vector_set(const std::vector& fds, fd_set* set, int* max) + { + for ( size_t i = 0; i < fds.size(); ++i ) + { + FD_SET(fds[i], set); + *max = ::max(fds[i], *max); + } + } + IOSource* IOSourceRegistry::FindSoonest(double* ts) { // Remove sources which have gone dry. For simplicity, we only @@ -94,16 +103,14 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts) // be ready. continue; - src->fd_read = src->fd_write = src->fd_except = 0; + src->fd_read.clear(); + src->fd_write.clear(); + src->fd_except.clear(); src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except); - FD_SET(src->fd_read, &fd_read); - FD_SET(src->fd_write, &fd_write); - FD_SET(src->fd_except, &fd_except); - - maxx = max(src->fd_read, maxx); - maxx = max(src->fd_write, maxx); - maxx = max(src->fd_except, maxx); + fd_vector_set(src->fd_read, &fd_read, &maxx); + fd_vector_set(src->fd_write, &fd_write, &maxx); + fd_vector_set(src->fd_except, &fd_except, &maxx); } // We can't block indefinitely even when all sources are dry: @@ -143,9 +150,7 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts) if ( ! src->src->IsIdle() ) continue; - if ( FD_ISSET(src->fd_read, &fd_read) || - FD_ISSET(src->fd_write, &fd_write) || - FD_ISSET(src->fd_except, &fd_except) ) + if ( src->Ready(&fd_read, &fd_write, &fd_except) ) { double local_network_time = 0; double ts = src->src->NextTimestamp(&local_network_time); @@ -174,3 +179,23 @@ void IOSourceRegistry::Register(IOSource* src, bool dont_count) ++dont_counts; return sources.push_back(s); } + +static bool fd_vector_ready(const std::vector& fds, fd_set* set) + { + for ( size_t i = 0; i < fds.size(); ++i ) + if ( FD_ISSET(fds[i], set) ) + return true; + + return false; + } + +bool IOSourceRegistry::Source::Ready(fd_set* read, fd_set* write, + fd_set* except) const + { + if ( fd_vector_ready(fd_read, read) || + fd_vector_ready(fd_write, write) || + fd_vector_ready(fd_except, except) ) + return true; + + return false; + } diff --git a/src/IOSource.h b/src/IOSource.h index db50bbd2a9..3da70af568 100644 --- a/src/IOSource.h +++ b/src/IOSource.h @@ -4,6 +4,8 @@ #define iosource_h #include +#include +#include #include "Timer.h" using namespace std; @@ -22,7 +24,8 @@ public: // Returns select'able fds (leaves args untouched if we don't have // selectable fds). - virtual void GetFds(int* read, int* write, int* except) = 0; + virtual void GetFds(std::vector* read, std::vector* write, + std::vector* except) = 0; // The following two methods are only called when either IsIdle() // returns false or select() on one of the fds indicates that there's @@ -89,9 +92,11 @@ protected: struct Source { IOSource* src; - int fd_read; - int fd_write; - int fd_except; + std::vector fd_read; + std::vector fd_write; + std::vector fd_except; + + bool Ready(fd_set* read, fd_set* write, fd_set* except) const; }; typedef list SourceList; diff --git a/src/Pipe.cc b/src/Pipe.cc new file mode 100644 index 0000000000..51298d07b6 --- /dev/null +++ b/src/Pipe.cc @@ -0,0 +1,79 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "Pipe.h" +#include "Reporter.h" +#include +#include +#include +#include + +using namespace bro; + +static void pipe_fail(int eno) + { + char tmp[256]; + strerror_r(eno, tmp, sizeof(tmp)); + reporter->FatalError("Pipe failure: %s", tmp); + } + +static void set_flags(int fd, int flags) + { + if ( flags ) + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags); + } + +static void set_status_flags(int fd, int flags) + { + if ( flags ) + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags); + } + +static int dup_or_fail(int fd, int flags) + { + int rval = dup(fd); + + if ( rval < 0 ) + pipe_fail(errno); + + set_flags(fd, flags); + return rval; + } + +Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1) + { + // pipe2 can set flags atomically, but not yet available everywhere. + if ( ::pipe(fds) ) + pipe_fail(errno); + + flags[0] = flags0; + flags[1] = flags1; + + set_flags(fds[0], flags[0]); + set_flags(fds[1], flags[1]); + set_status_flags(fds[0], status_flags0); + set_status_flags(fds[1], status_flags1); + } + +Pipe::~Pipe() + { + close(fds[0]); + close(fds[1]); + } + +Pipe::Pipe(const Pipe& other) + { + fds[0] = dup_or_fail(other.fds[0], other.flags[0]); + fds[1] = dup_or_fail(other.fds[1], other.flags[1]); + } + +Pipe& Pipe::operator=(const Pipe& other) + { + if ( this == &other ) + return *this; + + close(fds[0]); + close(fds[1]); + fds[0] = dup_or_fail(other.fds[0], other.flags[0]); + fds[1] = dup_or_fail(other.fds[1], other.flags[1]); + return *this; + } diff --git a/src/Pipe.h b/src/Pipe.h new file mode 100644 index 0000000000..493169e615 --- /dev/null +++ b/src/Pipe.h @@ -0,0 +1,57 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef BRO_PIPE_H +#define BRO_PIPE_H + +namespace bro { + +class Pipe { +public: + + /** + * Create a pair of file descriptors via pipe(), or aborts if it cannot. + * @param flags0 file descriptor flags to set on read end of pipe. + * @param flags1 file descriptor flags to set on write end of pipe. + * @param status_flags0 descriptor status flags to set on read end of pipe. + * @param status_flags1 descriptor status flags to set on write end of pipe. + */ + Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0, + int status_flags1 = 0); + + /** + * Close the pair of file descriptors owned by the object. + */ + ~Pipe(); + + /** + * Make a copy of another Pipe object (file descriptors are dup'd). + */ + Pipe(const Pipe& other); + + /** + * Assign a Pipe object by closing file descriptors and duping those of + * the other. + */ + Pipe& operator=(const Pipe& other); + + /** + * @return the file descriptor associated with the read-end of the pipe. + */ + int ReadFD() const + { return fds[0]; } + + /** + * @return the file descriptor associated with the write-end of the pipe. + */ + int WriteFD() const + { return fds[1]; } + +private: + + int fds[2]; + int flags[2]; +}; + +} // namespace bro + +#endif // BRO_PIPE_H diff --git a/src/PktSrc.cc b/src/PktSrc.cc index b5ac3a5d69..04b7b7d552 100644 --- a/src/PktSrc.cc +++ b/src/PktSrc.cc @@ -51,7 +51,8 @@ PktSrc::~PktSrc() delete [] readfile; } -void PktSrc::GetFds(int* read, int* write, int* except) +void PktSrc::GetFds(std::vector* read, std::vector* write, + std::vector* except) { if ( pseudo_realtime ) { @@ -62,7 +63,7 @@ void PktSrc::GetFds(int* read, int* write, int* except) } if ( selectable_fd >= 0 ) - *read = selectable_fd; + read->push_back(selectable_fd); } int PktSrc::ExtractNextPacket() diff --git a/src/PktSrc.h b/src/PktSrc.h index 70eef4dd00..0d4be12b43 100644 --- a/src/PktSrc.h +++ b/src/PktSrc.h @@ -98,7 +98,8 @@ public: // IOSource interface bool IsReady(); - void GetFds(int* read, int* write, int* except); + void GetFds(std::vector* read, std::vector* write, + std::vector* except); double NextTimestamp(double* local_network_time); void Process(); const char* Tag() { return "PktSrc"; } diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 3e46c5a1d2..34c5f1abce 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -1368,12 +1368,17 @@ void RemoteSerializer::Unregister(ID* id) } } -void RemoteSerializer::GetFds(int* read, int* write, int* except) +void RemoteSerializer::GetFds(std::vector* read, std::vector* write, + std::vector* except) { - *read = io->Fd(); + read->push_back(io->Fd()); + std::vector supp = io->FdSupplements(); + + for ( size_t i = 0; i < supp.size(); ++i ) + read->push_back(supp[i]); if ( io->CanWrite() ) - *write = io->Fd(); + write->push_back(io->Fd()); } double RemoteSerializer::NextTimestamp(double* local_network_time) @@ -3356,6 +3361,15 @@ SocketComm::~SocketComm() static unsigned int first_rtime = 0; +static void fd_vector_set(const std::vector& fds, fd_set* set, int* max) + { + for ( size_t i = 0; i < fds.size(); ++i ) + { + FD_SET(fds[i], set); + *max = ::max(fds[i], *max); + } + } + void SocketComm::Run() { first_rtime = (unsigned int) current_time(true); @@ -3381,6 +3395,7 @@ void SocketComm::Run() FD_SET(io->Fd(), &fd_read); max_fd = io->Fd(); + fd_vector_set(io->FdSupplements(), &fd_read, &max_fd); loop_over_list(peers, i) { @@ -3389,6 +3404,7 @@ void SocketComm::Run() FD_SET(peers[i]->io->Fd(), &fd_read); if ( peers[i]->io->Fd() > max_fd ) max_fd = peers[i]->io->Fd(); + fd_vector_set(peers[i]->io->FdSupplements(), &fd_read, &max_fd); } else { @@ -3439,38 +3455,17 @@ void SocketComm::Run() if ( ! io->IsFillingUp() && shutting_conns_down ) shutting_conns_down = false; - // We cannot rely solely on select() as the there may - // be some data left in our input/output queues. So, we use - // a small timeout for select and check for data - // manually afterwards. - static long selects = 0; static long canwrites = 0; - static long timeouts = 0; ++selects; if ( io->CanWrite() ) ++canwrites; - // FIXME: Fine-tune this (timeouts, flush, etc.) - struct timeval small_timeout; - small_timeout.tv_sec = 0; - small_timeout.tv_usec = - io->CanWrite() || io->CanRead() ? 1 : 10; - -#if 0 - if ( ! io->CanWrite() ) - usleep(10); -#endif - - int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, - &small_timeout); - - if ( a == 0 ) - ++timeouts; + int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0); if ( selects % 100000 == 0 ) - Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", selects, canwrites, timeouts)); + Log(fmt("selects=%ld canwrites=%ld", selects, canwrites)); if ( a < 0 ) // Ignore errors for now. diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index 9dbfbd9dae..3aa4f91bb0 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -140,7 +140,8 @@ public: void Finish(); // Overidden from IOSource: - virtual void GetFds(int* read, int* write, int* except); + virtual void GetFds(std::vector* read, std::vector* write, + std::vector* except); virtual double NextTimestamp(double* local_network_time); virtual void Process(); virtual TimerMgr::Tag* GetCurrentTag(); diff --git a/src/Serializer.cc b/src/Serializer.cc index 36b1c74000..0ea79cfafb 100644 --- a/src/Serializer.cc +++ b/src/Serializer.cc @@ -1067,9 +1067,10 @@ void EventPlayer::GotFunctionCall(const char* name, double time, // We don't replay function calls. } -void EventPlayer::GetFds(int* read, int* write, int* except) +void EventPlayer::GetFds(std::vector* read, std::vector* write, + std::vector* except) { - *read = fd; + read->push_back(fd); } double EventPlayer::NextTimestamp(double* local_network_time) diff --git a/src/Serializer.h b/src/Serializer.h index 543797a7af..0524906d48 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -355,7 +355,8 @@ public: EventPlayer(const char* file); virtual ~EventPlayer(); - virtual void GetFds(int* read, int* write, int* except); + virtual void GetFds(std::vector* read, std::vector* write, + std::vector* except); virtual double NextTimestamp(double* local_network_time); virtual void Process(); virtual const char* Tag() { return "EventPlayer"; } diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 4491cd42b5..c16b9f4351 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -65,7 +65,8 @@ void Manager::AddMsgThread(MsgThread* thread) msg_threads.push_back(thread); } -void Manager::GetFds(int* read, int* write, int* except) +void Manager::GetFds(std::vector* read, std::vector* write, + std::vector* except) { } diff --git a/src/threading/Manager.h b/src/threading/Manager.h index e839749a91..4f0e53928e 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -103,7 +103,8 @@ protected: /** * Part of the IOSource interface. */ - virtual void GetFds(int* read, int* write, int* except); + virtual void GetFds(std::vector* read, std::vector* write, + std::vector* except); /** * Part of the IOSource interface.