Modify IOSource Manager to implement new loop architecture

- Removes entire FindSoonest method that includes all of the calls to select() for checking for ready sources
- Removes FD_Set checking against IOSources
- Adds system for registering and unregistering file descriptors from IOSources. This allows individual sources to mark themselves as ready to be checked by the loop as they become available.
- Adds entirely new loop architecture based on checking the IOSources for when their next timeout is, and then waiting for either that timeout or when the next source is ready. This also implements the polling based on what the OS supports, instead of just calling select() on all platforms. Currently it supports kqueue, epoll, and plain poll.
- Adds system for pinging the loop to force it to wake up
This commit is contained in:
Tim Wojtulewicz 2019-11-26 17:14:56 -07:00
parent 062cadb124
commit 4fa3e4b9b4
12 changed files with 364 additions and 200 deletions

View file

@ -362,6 +362,7 @@ include(OpenSSLTests)
include(CheckNameserCompat) include(CheckNameserCompat)
include(GetArchitecture) include(GetArchitecture)
include(RequireCXX17) include(RequireCXX17)
include(FindKqueue)
if ( (OPENSSL_VERSION VERSION_EQUAL "1.1.0") OR (OPENSSL_VERSION VERSION_GREATER "1.1.0") ) if ( (OPENSSL_VERSION VERSION_EQUAL "1.1.0") OR (OPENSSL_VERSION VERSION_GREATER "1.1.0") )
set(ZEEK_HAVE_OPENSSL_1_1 true CACHE INTERNAL "" FORCE) set(ZEEK_HAVE_OPENSSL_1_1 true CACHE INTERNAL "" FORCE)

View file

@ -21,6 +21,54 @@ a legal notice, here is a blessing:
============================================================================== ==============================================================================
%%% libkqueue - kqueue(2) compatibility library
==============================================================================
== all source ==
Copyright (c) 2009 Mark Heily <mark@heily.com>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
== event.h ==
Copyright (c) 1999,2000,2001 Jonathan Lemon <jlemon@FreeBSD.org>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
==============================================================================
%%% RapidJSON - A fast JSON parser/generator for C++ with both SAX/DOM style API %%% RapidJSON - A fast JSON parser/generator for C++ with both SAX/DOM style API
============================================================================== ==============================================================================

2
cmake

@ -1 +1 @@
Subproject commit d3e1884a876781dedac716d102e8a06e1cc54380 Subproject commit df3a5372285223cdf24b980dcac89d21704bb9e9

5
configure vendored
View file

@ -74,6 +74,8 @@ Usage: $0 [OPTION]... [VAR=VALUE]...
(Zeek uses an embedded version by default) (Zeek uses an embedded version by default)
--with-caf=PATH path to C++ Actor Framework install root --with-caf=PATH path to C++ Actor Framework install root
(a Broker dependency that is embedded by default) (a Broker dependency that is embedded by default)
--with-libkqueue=PATH path to libkqueue install root
(Zeek uses an embedded version by default)
Optional Packages in Non-Standard Locations: Optional Packages in Non-Standard Locations:
--with-geoip=PATH path to the libmaxminddb install root --with-geoip=PATH path to the libmaxminddb install root
@ -320,6 +322,9 @@ while [ $# -ne 0 ]; do
--with-caf=*) --with-caf=*)
append_cache_entry CAF_ROOT_DIR PATH $optarg append_cache_entry CAF_ROOT_DIR PATH $optarg
;; ;;
--with-libkqueue=*)
append_cache_entry LIBKQUEUE_ROOT_DIR PATH $optarg
;;
--with-rocksdb=*) --with-rocksdb=*)
append_cache_entry ROCKSDB_ROOT_DIR PATH $optarg append_cache_entry ROCKSDB_ROOT_DIR PATH $optarg
;; ;;

@ -1 +1 @@
Subproject commit fae32236391d9117bf996e75d56ebd01ef076bc2 Subproject commit 8ddcd00ba53d89dc39e46f42c15db4af8c9471d8

View file

@ -278,45 +278,40 @@ void net_run()
{ {
set_processing_status("RUNNING", "net_run"); set_processing_status("RUNNING", "net_run");
std::vector<iosource::IOSource*> ready;
ready.reserve(iosource_mgr->TotalSize());
while ( iosource_mgr->Size() || while ( iosource_mgr->Size() ||
(BifConst::exit_only_after_terminate && ! terminating) ) (BifConst::exit_only_after_terminate && ! terminating) )
{ {
double ts; iosource_mgr->FindReadySources(&ready);
iosource::IOSource* src = iosource_mgr->FindSoonest(&ts);
#ifdef DEBUG #ifdef DEBUG
static int loop_counter = 0; static int loop_counter = 0;
// If no source is ready, we log only every 100th cycle, // If no source is ready, we log only every 100th cycle,
// starting with the first. // starting with the first.
if ( src || loop_counter++ % 100 == 0 ) if ( ! ready.empty() || loop_counter++ % 100 == 0 )
{ {
DBG_LOG(DBG_MAINLOOP, "realtime=%.6f iosrc=%s ts=%.6f", DBG_LOG(DBG_MAINLOOP, "realtime=%.6f ready_count=%ld",
current_time(), src ? src->Tag() : "<all dry>", src ? ts : -1); current_time(), ready.size());
if ( src ) if ( ! ready.empty() )
loop_counter = 0; loop_counter = 0;
} }
#endif #endif
current_iosrc = nullptr; current_iosrc = nullptr;
auto communication_enabled = broker_mgr->Active(); auto communication_enabled = broker_mgr->Active();
if ( src ) if ( ! ready.empty() )
src->Process(); // which will call net_packet_dispatch()
else if ( reading_live && ! pseudo_realtime)
{ {
// live but no source is currently active for ( auto src : ready )
if ( ! net_is_processing_suspended() )
{ {
// Take advantage of the lull to get up to DBG_LOG(DBG_MAINLOOP, "processing source %s", src->Tag());
// date on timers and events. current_iosrc = src;
net_update_time(current_time()); src->Process();
expire_timers();
usleep(1); // Just yield.
} }
} }
else if ( (have_pending_timers || communication_enabled || else if ( (have_pending_timers || communication_enabled ||
BifConst::exit_only_after_terminate) && BifConst::exit_only_after_terminate) &&
! pseudo_realtime ) ! pseudo_realtime )
@ -327,25 +322,6 @@ void net_run()
// doesn't risk blocking on other inputs. // doesn't risk blocking on other inputs.
net_update_time(current_time()); net_update_time(current_time());
expire_timers(); expire_timers();
// Avoid busy-waiting - pause for 100 ms.
// We pick a sleep value of 100 msec that buys
// us a lot of idle time, but doesn't delay near-term
// timers too much. (Delaying them somewhat is okay,
// since Bro timers are not high-precision anyway.)
if ( ! communication_enabled )
usleep(100000);
else
usleep(1000);
// Flawfinder says about usleep:
//
// This C routine is considered obsolete (as opposed
// to the shell command by the same name). The
// interaction of this function with SIGALRM and
// other timer functions such as sleep(), alarm(),
// setitimer(), and nanosleep() is unspecified.
// Use nanosleep(2) or setitimer(2) instead.
} }
mgr.Drain(); mgr.Drain();

View file

@ -15,8 +15,7 @@ set(iosource_SRCS
Packet.cc Packet.cc
PktDumper.cc PktDumper.cc
PktSrc.cc PktSrc.cc
) )
bro_add_subdir_library(iosource ${iosource_SRCS}) bro_add_subdir_library(iosource ${iosource_SRCS})
add_dependencies(bro_iosource generate_outputs) add_dependencies(bro_iosource generate_outputs)

View file

@ -1,17 +1,18 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
#include <sys/event.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#include <assert.h> #include <assert.h>
#include <algorithm>
#include "Manager.h" #include "Manager.h"
#include "IOSource.h" #include "IOSource.h"
#include "Net.h"
#include "PktSrc.h" #include "PktSrc.h"
#include "PktDumper.h" #include "PktDumper.h"
#include "plugin/Manager.h" #include "plugin/Manager.h"
#include "broker/Manager.h"
#include "util.h" #include "util.h"
@ -19,8 +20,39 @@
using namespace iosource; using namespace iosource;
Manager::WakeupHandler::WakeupHandler()
{
iosource_mgr->RegisterFd(flare.FD(), this);
}
Manager::WakeupHandler::~WakeupHandler()
{
iosource_mgr->UnregisterFd(flare.FD());
}
void Manager::WakeupHandler::Process()
{
flare.Extinguish();
}
void Manager::WakeupHandler::Ping(const std::string& where)
{
DBG_LOG(DBG_MAINLOOP, "Pinging WakeupHandler from %s", where.c_str());
flare.Fire();
}
Manager::Manager()
{
event_queue = kqueue();
if ( event_queue == -1 )
reporter->FatalError("Failed to initialize kqueue: %s", strerror(errno));
}
Manager::~Manager() Manager::~Manager()
{ {
delete wakeup;
wakeup = nullptr;
for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i )
{ {
(*i)->src->Done(); (*i)->src->Done();
@ -37,6 +69,14 @@ Manager::~Manager()
} }
pkt_dumpers.clear(); pkt_dumpers.clear();
if ( event_queue != -1 )
close(event_queue);
}
void Manager::InitPostScript()
{
wakeup = new WakeupHandler();
} }
void Manager::RemoveAll() void Manager::RemoveAll()
@ -45,12 +85,19 @@ void Manager::RemoveAll()
dont_counts = sources.size(); dont_counts = sources.size();
} }
IOSource* Manager::FindSoonest(double* ts) void Manager::Wakeup(const std::string& where)
{ {
if ( wakeup )
wakeup->Ping(where);
}
void Manager::FindReadySources(std::vector<IOSource*>* ready)
{
ready->clear();
// Remove sources which have gone dry. For simplicity, we only // Remove sources which have gone dry. For simplicity, we only
// remove at most one each time. // remove at most one each time.
for ( SourceList::iterator i = sources.begin(); for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i )
i != sources.end(); ++i )
if ( ! (*i)->src->IsOpen() ) if ( ! (*i)->src->IsOpen() )
{ {
(*i)->src->Done(); (*i)->src->Done();
@ -59,124 +106,140 @@ IOSource* Manager::FindSoonest(double* ts)
break; break;
} }
// Ideally, we would always call select on the fds to see which // If there aren't any sources and exit_only_after_terminate is false, just
// are ready, and return the soonest. Unfortunately, that'd mean // return an empty set of sources. We want the main loop to end.
// one select-call per packet, which we can't afford in high-volume if ( Size() == 0 && ( ! BifConst::exit_only_after_terminate || terminating ) )
// environments. Thus, we call select only every SELECT_FREQUENCY return;
// call (or if all sources report that they are dry).
++call_count; double timeout = -1;
IOSource* timeout_src = nullptr;
bool time_to_poll = false;
IOSource* soonest_src = 0; ++poll_counter;
double soonest_ts = 1e20; if ( poll_counter % poll_interval == 0 )
double soonest_local_network_time = 1e20;
bool all_idle = true;
// Find soonest source of those which tell us they have something to
// process.
for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i )
{ {
if ( ! (*i)->src->IsIdle() ) poll_counter = 0;
time_to_poll = true;
}
// Find the source with the next timeout value.
for ( auto src : sources )
{ {
all_idle = false; auto iosource = src->src;
double local_network_time = 0; if ( iosource->IsOpen() )
double ts = (*i)->src->NextTimestamp(&local_network_time);
if ( ts >= 0 && ts < soonest_ts )
{ {
soonest_ts = ts; double next = iosource->GetNextTimeout();
soonest_src = (*i)->src; if ( timeout == -1 || ( next >= 0.0 && next < timeout ) )
soonest_local_network_time =
local_network_time ?
local_network_time : ts;
}
}
}
// If we found one and aren't going to select this time,
// return it.
int maxx = 0;
if ( soonest_src && (call_count % SELECT_FREQUENCY) != 0 )
goto finished;
// Select on the join of all file descriptors.
fd_set fd_read, fd_write, fd_except;
FD_ZERO(&fd_read);
FD_ZERO(&fd_write);
FD_ZERO(&fd_except);
for ( SourceList::iterator i = sources.begin();
i != sources.end(); ++i )
{ {
Source* src = (*i); timeout = next;
timeout_src = iosource;
if ( ! src->src->IsIdle() ) // If a source has a zero timeout then it's ready. Just add it to the
// No need to select on sources which we know to // list already. Only do this if it's not time to poll though, since
// be ready. // we don't want things in the vector passed into Poll() or it'll end
continue; // up inserting duplicates.
if ( timeout == 0 && ! time_to_poll )
src->Clear(); ready->push_back(timeout_src);
src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except);
src->SetFds(&fd_read, &fd_write, &fd_except, &maxx);
} }
// We can't block indefinitely even when all sources are dry: // Avoid calling Poll() if we can help it since on very high-traffic
// we're doing some IOSource-independent stuff in the main loop, // networks, we spend too much time in Poll() and end up dropping packets.
// so we need to return from time to time. (Instead of no time-out if ( ! time_to_poll && iosource == pkt_src && pkt_src->IsLive() )
// at all, we use a very small one. This lets FreeBSD trigger a ready->push_back(pkt_src);
// BPF buffer switch on the next read when the hold buffer is empty }
// while the store buffer isn't filled yet. }
struct timeval timeout; // If we didn't find any IOSources with zero timeouts or it's time to
// force a poll, do that and return. Otherwise return the set of ready
// sources that we have.
if ( ready->empty() || time_to_poll )
Poll(ready, timeout, timeout_src);
}
if ( all_idle ) void Manager::Poll(std::vector<IOSource*>* ready, double timeout, IOSource* timeout_src)
{ {
// Interesting: when all sources are dry, simply sleeping a struct timespec kqueue_timeout;
// bit *without* watching for any fd becoming ready may ConvertTimeout(timeout, kqueue_timeout);
// decrease CPU load. I guess that's because it allows
// the kernel's packet buffers to fill. - Robin
timeout.tv_sec = 0;
timeout.tv_usec = 20; // SELECT_TIMEOUT;
select(0, 0, 0, 0, &timeout);
}
if ( ! maxx ) int ret = kevent(event_queue, NULL, 0, events.data(), events.size(), &kqueue_timeout);
// No selectable fd at all. if ( ret == -1 )
goto finished;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
if ( select(maxx + 1, &fd_read, &fd_write, &fd_except, &timeout) > 0 )
{ // Find soonest.
for ( SourceList::iterator i = sources.begin();
i != sources.end(); ++i )
{ {
Source* src = (*i); // Ignore interrupts since we may catch one during shutdown and we don't want the
// error to get printed.
if ( ! src->src->IsIdle() ) if ( errno != EINTR )
continue; reporter->InternalWarning("Error calling kevent: %s", strerror(errno));
}
if ( src->Ready(&fd_read, &fd_write, &fd_except) ) else if ( ret == 0 )
{ {
double local_network_time = 0; if ( timeout_src )
double ts = src->src->NextTimestamp(&local_network_time); ready->push_back(timeout_src);
if ( ts >= 0.0 && ts < soonest_ts ) }
else
{ {
soonest_ts = ts; // kevent returns the number of events that are ready, so we only need to loop
soonest_src = src->src; // over that many of them.
soonest_local_network_time = for ( int i = 0; i < ret; i++ )
local_network_time ? {
local_network_time : ts; if ( events[i].filter == EVFILT_READ )
{
std::map<int, IOSource*>::const_iterator it = fd_map.find(events[i].ident);
if ( it != fd_map.end() )
ready->push_back(it->second);
} }
} }
} }
} }
finished: void Manager::ConvertTimeout(double timeout, struct timespec& spec)
*ts = soonest_local_network_time; {
return soonest_src; // If timeout ended up -1, set it to some nominal value just to keep the loop
// from blocking forever. This is the case of exit_only_after_terminate when
// there isn't anything else going on.
if ( timeout < 0 )
{
spec.tv_sec = 0;
spec.tv_nsec = 1e8;
}
else
{
spec.tv_sec = static_cast<time_t>(timeout);
spec.tv_nsec = static_cast<long>((timeout - spec.tv_sec) * 1e9);
}
}
void Manager::RegisterFd(int fd, IOSource* src)
{
struct kevent event;
EV_SET(&event, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
int ret = kevent(event_queue, &event, 1, NULL, 0, NULL);
if ( ret != -1 )
{
events.push_back({});
DBG_LOG(DBG_MAINLOOP, "Registered fd %d from %s", fd, src->Tag());
fd_map[fd] = src;
Wakeup("RegisterFd");
}
else
{
DBG_LOG(DBG_MAINLOOP, "Failed to register fd %d from %s: %s", fd, src->Tag(), strerror(errno));
}
}
void Manager::UnregisterFd(int fd)
{
if ( fd_map.find(fd) != fd_map.end() )
{
struct kevent event;
EV_SET(&event, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
int ret = kevent(event_queue, &event, 1, NULL, 0, NULL);
if ( ret != -1 )
DBG_LOG(DBG_MAINLOOP, "Unregistered fd %d", fd);
fd_map.erase(fd);
Wakeup("UnregisterFd");
}
} }
void Manager::Register(IOSource* src, bool dont_count) void Manager::Register(IOSource* src, bool dont_count)
@ -195,7 +258,7 @@ void Manager::Register(IOSource* src, bool dont_count)
} }
} }
src->Init(); src->InitSource();
Source* s = new Source; Source* s = new Source;
s->src = src; s->src = src;
s->dont_count = dont_count; s->dont_count = dont_count;
@ -208,6 +271,16 @@ void Manager::Register(IOSource* src, bool dont_count)
void Manager::Register(PktSrc* src) void Manager::Register(PktSrc* src)
{ {
pkt_src = src; pkt_src = src;
// The poll interval gets defaulted to 100 which is good for cases like reading
// from pcap files and when there isn't a packet source, but is a little too
// infrequent for live sources (especially fast live sources). Set it down a
// little bit for those sources.
if ( src->IsLive() )
poll_interval = 10;
else if ( pseudo_realtime )
poll_interval = 1;
Register(src, false); Register(src, false);
} }
@ -317,11 +390,3 @@ PktDumper* Manager::OpenPktDumper(const string& path, bool append)
return pd; return pd;
} }
void Manager::Source::SetFds(fd_set* read, fd_set* write, fd_set* except,
int* maxx) const
{
*maxx = std::max(*maxx, fd_read.Set(read));
*maxx = std::max(*maxx, fd_write.Set(write));
*maxx = std::max(*maxx, fd_except.Set(except));
}

View file

@ -2,30 +2,41 @@
#pragma once #pragma once
#include "zeek-config.h"
#include <string> #include <string>
#include <list> #include <vector>
#include "iosource/FD_Set.h" #include <map>
#include "IOSource.h"
#include "Flare.h"
namespace iosource { namespace iosource {
class IOSource;
class PktSrc; class PktSrc;
class PktDumper; class PktDumper;
/** /**
* Singleton class managing all IOSources. * Manager class for IO sources. This handles all of the polling of sources
* in the main loop.
*/ */
class Manager { class Manager {
public: public:
/** /**
* Constructor. * Constructor.
*/ */
Manager() { call_count = 0; dont_counts = 0; } Manager();
/** /**
* Destructor. * Destructor.
*/ */
~Manager(); virtual ~Manager();
/**
* Initializes some extra fields that can't be done during the
* due to dependencies on other objects being initialized first.
*/
void InitPostScript();
/** /**
* Registers an IOSource with the manager. If the source is already * Registers an IOSource with the manager. If the source is already
@ -41,23 +52,17 @@ public:
*/ */
void Register(IOSource* src, bool dont_count = false); void Register(IOSource* src, bool dont_count = false);
/**
* Returns the packet source with the soonest available input. This
* may block for a little while if all are dry.
*
* @param ts A pointer where to store the timestamp of the input that
* the soonest source has available next.
*
* @return The source, or null if no source has input.
*/
IOSource* FindSoonest(double* ts);
/** /**
* Returns the number of registered and still active sources, * Returns the number of registered and still active sources,
* excluding those that are registered as \a dont_cont. * excluding those that are registered as \a dont_count.
*/ */
int Size() const { return sources.size() - dont_counts; } int Size() const { return sources.size() - dont_counts; }
/**
* Returns total number of sources including dont_counts;
*/
int TotalSize() const { return sources.size(); }
/** /**
* Returns the registered PktSrc. If not source is registered yet, * Returns the registered PktSrc. If not source is registered yet,
* returns a nullptr. * returns a nullptr.
@ -93,49 +98,111 @@ public:
*/ */
PktDumper* OpenPktDumper(const std::string& path, bool append); PktDumper* OpenPktDumper(const std::string& path, bool append);
/**
* Finds the sources that have data ready to be processed.
*
* @param ready A vector used to return the set of sources that are ready.
*/
void FindReadySources(std::vector<IOSource*>* ready);
/**
* Registers a file descriptor and associated IOSource with the manager
* to be checked during FindReadySources.
*
* @param fd A file descriptor pointing at some resource that should be
* checked for readiness.
* @param src The IOSource that owns the file descriptor.
*/
void RegisterFd(int fd, IOSource* src);
/**
* Unregisters a file descriptor from the FindReadySources checks.
*/
void UnregisterFd(int fd);
/**
* Forces the poll in FindReadySources to wake up immediately. This method
* is called during RegisterFd and UnregisterFd since those methods cause
* changes to the active set of file descriptors.
*/
void Wakeup(const std::string& where);
private: private:
/**
* When looking for a source with something to process, every
* SELECT_FREQUENCY calls we will go ahead and block on a select().
*/
static const int SELECT_FREQUENCY = 25;
/** /**
* Microseconds to wait in an empty select if no source is ready. * Calls the appropriate poll method to gather a set of IOSources that are
* ready for processing.
*
* @param ready a vector used to return the ready sources.
* @param timeout the value to be used for the timeout of the poll. This
* should be a value relative to the current network time, not an
* absolute time value. This may be zero to cause an infinite timeout or
* -1 to force a very short timeout.
* @param timeout_src The source associated with the current timeout value.
* This is typically a timer manager object.
*/ */
static const int SELECT_TIMEOUT = 50; void Poll(std::vector<IOSource*>* ready, double timeout, IOSource* timeout_src);
/**
* Converts a double timeout value into a timespec struct used for calls
* to kevent().
*/
void ConvertTimeout(double timeout, struct timespec& spec);
/**
* Specialized registration method for packet sources.
*/
void Register(PktSrc* src); void Register(PktSrc* src);
void RemoveAll(); void RemoveAll();
unsigned int call_count; class WakeupHandler : public IOSource {
int dont_counts; public:
WakeupHandler();
~WakeupHandler();
/**
* Tells the handler to wake up the loop by firing the flare.
*
* @param where a string denoting where this ping was called from. Used
* for debugging output.
*/
void Ping(const std::string& where);
// IOSource API methods
void Process() override;
const char* Tag() override { return "WakeupHandler"; }
double GetNextTimeout() override { return -1; }
private:
bro::Flare flare;
};
struct Source { struct Source {
IOSource* src; IOSource* src;
FD_Set fd_read;
FD_Set fd_write;
FD_Set fd_except;
bool dont_count; bool dont_count;
bool Ready(fd_set* read, fd_set* write, fd_set* except) const
{ return fd_read.Ready(read) || fd_write.Ready(write) ||
fd_except.Ready(except); }
void SetFds(fd_set* read, fd_set* write, fd_set* except,
int* maxx) const;
void Clear()
{ fd_read.Clear(); fd_write.Clear(); fd_except.Clear(); }
}; };
typedef std::list<Source*> SourceList; using SourceList = std::vector<Source*>;
SourceList sources; SourceList sources;
typedef std::list<PktDumper *> PktDumperList; using PktDumperList = std::vector<PktDumper*>;
PktDumperList pkt_dumpers;
PktSrc* pkt_src = nullptr; PktSrc* pkt_src = nullptr;
PktDumperList pkt_dumpers;
int dont_counts = 0;
int zero_timeout_count = 0;
WakeupHandler* wakeup = nullptr;
int poll_counter = 0;
int poll_interval = 100;
int event_queue = -1;
std::map<int, IOSource*> fd_map;
// This is only used for the output of the call to kqueue in FindReadySources().
// The actual events are stored as part of the queue.
std::vector<struct kevent> events;
}; };
} }

View file

@ -87,6 +87,7 @@ public:
protected: protected:
friend class Manager; friend class Manager;
friend class ManagerBase;
/** /**
* Structure to pass back information about the packet dumper to the * Structure to pass back information about the packet dumper to the

View file

@ -206,6 +206,7 @@ public:
protected: protected:
friend class Manager; friend class Manager;
friend class ManagerBase;
// Methods to use by derived classes. // Methods to use by derived classes.

View file

@ -256,6 +256,8 @@ void terminate_bro()
terminating = true; terminating = true;
iosource_mgr->Wakeup("terminate_bro");
// File analysis termination may produce events, so do it early on in // File analysis termination may produce events, so do it early on in
// the termination process. // the termination process.
file_mgr->Terminate(); file_mgr->Terminate();
@ -332,6 +334,9 @@ RETSIGTYPE sig_handler(int signo)
set_processing_status("TERMINATING", "sig_handler"); set_processing_status("TERMINATING", "sig_handler");
signal_val = signo; signal_val = signo;
if ( ! terminating )
iosource_mgr->Wakeup("sig_handler");
return RETSIGVAL; return RETSIGVAL;
} }
@ -662,6 +667,7 @@ int main(int argc, char** argv)
if ( reporter->Errors() > 0 ) if ( reporter->Errors() > 0 )
exit(1); exit(1);
iosource_mgr->InitPostScript();
plugin_mgr->InitPostScript(); plugin_mgr->InitPostScript();
zeekygen_mgr->InitPostScript(); zeekygen_mgr->InitPostScript();
broker_mgr->InitPostScript(); broker_mgr->InitPostScript();
@ -878,11 +884,6 @@ int main(int argc, char** argv)
have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; have_pending_timers = ! reading_traces && timer_mgr->Size() > 0;
iosource_mgr->Register(thread_mgr, true);
if ( zeek::supervisor_mgr )
iosource_mgr->Register(zeek::supervisor_mgr);
if ( iosource_mgr->Size() > 0 || if ( iosource_mgr->Size() > 0 ||
have_pending_timers || have_pending_timers ||
BifConst::exit_only_after_terminate ) BifConst::exit_only_after_terminate )