diff --git a/CMakeLists.txt b/CMakeLists.txt index a919b8ffe1..53c6d4faca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -362,6 +362,7 @@ include(OpenSSLTests) include(CheckNameserCompat) include(GetArchitecture) include(RequireCXX17) +include(FindKqueue) if ( (OPENSSL_VERSION VERSION_EQUAL "1.1.0") OR (OPENSSL_VERSION VERSION_GREATER "1.1.0") ) set(ZEEK_HAVE_OPENSSL_1_1 true CACHE INTERNAL "" FORCE) diff --git a/COPYING.3rdparty b/COPYING.3rdparty index 831aa519f2..5dcb1bd88d 100644 --- a/COPYING.3rdparty +++ b/COPYING.3rdparty @@ -21,6 +21,54 @@ a legal notice, here is a blessing: ============================================================================== +%%% libkqueue - kqueue(2) compatibility library + +============================================================================== + +== all source == + +Copyright (c) 2009 Mark Heily + +Permission to use, copy, modify, and distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +== event.h == + +Copyright (c) 1999,2000,2001 Jonathan Lemon +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. + +============================================================================== + %%% RapidJSON - A fast JSON parser/generator for C++ with both SAX/DOM style API ============================================================================== diff --git a/cmake b/cmake index d3e1884a87..df3a537228 160000 --- a/cmake +++ b/cmake @@ -1 +1 @@ -Subproject commit d3e1884a876781dedac716d102e8a06e1cc54380 +Subproject commit df3a5372285223cdf24b980dcac89d21704bb9e9 diff --git a/configure b/configure index cd6fd4b025..5c7570f469 100755 --- a/configure +++ b/configure @@ -74,6 +74,8 @@ Usage: $0 [OPTION]... [VAR=VALUE]... (Zeek uses an embedded version by default) --with-caf=PATH path to C++ Actor Framework install root (a Broker dependency that is embedded by default) + --with-libkqueue=PATH path to libkqueue install root + (Zeek uses an embedded version by default) Optional Packages in Non-Standard Locations: --with-geoip=PATH path to the libmaxminddb install root @@ -320,6 +322,9 @@ while [ $# -ne 0 ]; do --with-caf=*) append_cache_entry CAF_ROOT_DIR PATH $optarg ;; + --with-libkqueue=*) + append_cache_entry LIBKQUEUE_ROOT_DIR PATH $optarg + ;; --with-rocksdb=*) append_cache_entry ROCKSDB_ROOT_DIR PATH $optarg ;; diff --git a/src/3rdparty b/src/3rdparty index fae3223639..8ddcd00ba5 160000 --- a/src/3rdparty +++ b/src/3rdparty @@ -1 +1 @@ -Subproject commit fae32236391d9117bf996e75d56ebd01ef076bc2 +Subproject commit 8ddcd00ba53d89dc39e46f42c15db4af8c9471d8 diff --git a/src/Net.cc b/src/Net.cc index 61deab227c..c4df44b9cd 100644 --- a/src/Net.cc +++ b/src/Net.cc @@ -278,45 +278,40 @@ void net_run() { set_processing_status("RUNNING", "net_run"); + std::vector ready; + ready.reserve(iosource_mgr->TotalSize()); + while ( iosource_mgr->Size() || (BifConst::exit_only_after_terminate && ! terminating) ) { - double ts; - iosource::IOSource* src = iosource_mgr->FindSoonest(&ts); + iosource_mgr->FindReadySources(&ready); #ifdef DEBUG static int loop_counter = 0; // If no source is ready, we log only every 100th cycle, // starting with the first. - if ( src || loop_counter++ % 100 == 0 ) + if ( ! ready.empty() || loop_counter++ % 100 == 0 ) { - DBG_LOG(DBG_MAINLOOP, "realtime=%.6f iosrc=%s ts=%.6f", - current_time(), src ? src->Tag() : "", src ? ts : -1); + DBG_LOG(DBG_MAINLOOP, "realtime=%.6f ready_count=%ld", + current_time(), ready.size()); - if ( src ) + if ( ! ready.empty() ) loop_counter = 0; } #endif current_iosrc = nullptr; auto communication_enabled = broker_mgr->Active(); - if ( src ) - src->Process(); // which will call net_packet_dispatch() - - else if ( reading_live && ! pseudo_realtime) + if ( ! ready.empty() ) { - // live but no source is currently active - if ( ! net_is_processing_suspended() ) + for ( auto src : ready ) { - // Take advantage of the lull to get up to - // date on timers and events. - net_update_time(current_time()); - expire_timers(); - usleep(1); // Just yield. + DBG_LOG(DBG_MAINLOOP, "processing source %s", src->Tag()); + current_iosrc = src; + src->Process(); } } - else if ( (have_pending_timers || communication_enabled || BifConst::exit_only_after_terminate) && ! pseudo_realtime ) @@ -327,25 +322,6 @@ void net_run() // doesn't risk blocking on other inputs. net_update_time(current_time()); expire_timers(); - - // Avoid busy-waiting - pause for 100 ms. - // We pick a sleep value of 100 msec that buys - // us a lot of idle time, but doesn't delay near-term - // timers too much. (Delaying them somewhat is okay, - // since Bro timers are not high-precision anyway.) - if ( ! communication_enabled ) - usleep(100000); - else - usleep(1000); - - // Flawfinder says about usleep: - // - // This C routine is considered obsolete (as opposed - // to the shell command by the same name). The - // interaction of this function with SIGALRM and - // other timer functions such as sleep(), alarm(), - // setitimer(), and nanosleep() is unspecified. - // Use nanosleep(2) or setitimer(2) instead. } mgr.Drain(); diff --git a/src/iosource/CMakeLists.txt b/src/iosource/CMakeLists.txt index f7497c7fe6..881903d65c 100644 --- a/src/iosource/CMakeLists.txt +++ b/src/iosource/CMakeLists.txt @@ -15,8 +15,7 @@ set(iosource_SRCS Packet.cc PktDumper.cc PktSrc.cc -) + ) bro_add_subdir_library(iosource ${iosource_SRCS}) add_dependencies(bro_iosource generate_outputs) - diff --git a/src/iosource/Manager.cc b/src/iosource/Manager.cc index c76e7786bc..c5791a2272 100644 --- a/src/iosource/Manager.cc +++ b/src/iosource/Manager.cc @@ -1,17 +1,18 @@ // See the file "COPYING" in the main distribution directory for copyright. +#include #include #include #include #include -#include - #include "Manager.h" #include "IOSource.h" +#include "Net.h" #include "PktSrc.h" #include "PktDumper.h" #include "plugin/Manager.h" +#include "broker/Manager.h" #include "util.h" @@ -19,8 +20,39 @@ using namespace iosource; +Manager::WakeupHandler::WakeupHandler() + { + iosource_mgr->RegisterFd(flare.FD(), this); + } + +Manager::WakeupHandler::~WakeupHandler() + { + iosource_mgr->UnregisterFd(flare.FD()); + } + +void Manager::WakeupHandler::Process() + { + flare.Extinguish(); + } + +void Manager::WakeupHandler::Ping(const std::string& where) + { + DBG_LOG(DBG_MAINLOOP, "Pinging WakeupHandler from %s", where.c_str()); + flare.Fire(); + } + +Manager::Manager() + { + event_queue = kqueue(); + if ( event_queue == -1 ) + reporter->FatalError("Failed to initialize kqueue: %s", strerror(errno)); + } + Manager::~Manager() { + delete wakeup; + wakeup = nullptr; + for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) { (*i)->src->Done(); @@ -37,6 +69,14 @@ Manager::~Manager() } pkt_dumpers.clear(); + + if ( event_queue != -1 ) + close(event_queue); + } + +void Manager::InitPostScript() + { + wakeup = new WakeupHandler(); } void Manager::RemoveAll() @@ -45,12 +85,19 @@ void Manager::RemoveAll() dont_counts = sources.size(); } -IOSource* Manager::FindSoonest(double* ts) +void Manager::Wakeup(const std::string& where) { + if ( wakeup ) + wakeup->Ping(where); + } + +void Manager::FindReadySources(std::vector* ready) + { + ready->clear(); + // Remove sources which have gone dry. For simplicity, we only // remove at most one each time. - for ( SourceList::iterator i = sources.begin(); - i != sources.end(); ++i ) + for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) if ( ! (*i)->src->IsOpen() ) { (*i)->src->Done(); @@ -59,124 +106,140 @@ IOSource* Manager::FindSoonest(double* ts) break; } - // Ideally, we would always call select on the fds to see which - // are ready, and return the soonest. Unfortunately, that'd mean - // one select-call per packet, which we can't afford in high-volume - // environments. Thus, we call select only every SELECT_FREQUENCY - // call (or if all sources report that they are dry). + // If there aren't any sources and exit_only_after_terminate is false, just + // return an empty set of sources. We want the main loop to end. + if ( Size() == 0 && ( ! BifConst::exit_only_after_terminate || terminating ) ) + return; - ++call_count; + double timeout = -1; + IOSource* timeout_src = nullptr; + bool time_to_poll = false; - IOSource* soonest_src = 0; - double soonest_ts = 1e20; - double soonest_local_network_time = 1e20; - bool all_idle = true; - - // Find soonest source of those which tell us they have something to - // process. - for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) + ++poll_counter; + if ( poll_counter % poll_interval == 0 ) { - if ( ! (*i)->src->IsIdle() ) + poll_counter = 0; + time_to_poll = true; + } + + // Find the source with the next timeout value. + for ( auto src : sources ) + { + auto iosource = src->src; + if ( iosource->IsOpen() ) { - all_idle = false; - double local_network_time = 0; - double ts = (*i)->src->NextTimestamp(&local_network_time); - if ( ts >= 0 && ts < soonest_ts ) + double next = iosource->GetNextTimeout(); + if ( timeout == -1 || ( next >= 0.0 && next < timeout ) ) { - soonest_ts = ts; - soonest_src = (*i)->src; - soonest_local_network_time = - local_network_time ? - local_network_time : ts; + timeout = next; + timeout_src = iosource; + + // If a source has a zero timeout then it's ready. Just add it to the + // list already. Only do this if it's not time to poll though, since + // we don't want things in the vector passed into Poll() or it'll end + // up inserting duplicates. + if ( timeout == 0 && ! time_to_poll ) + ready->push_back(timeout_src); } + + // Avoid calling Poll() if we can help it since on very high-traffic + // networks, we spend too much time in Poll() and end up dropping packets. + if ( ! time_to_poll && iosource == pkt_src && pkt_src->IsLive() ) + ready->push_back(pkt_src); } } - // If we found one and aren't going to select this time, - // return it. - int maxx = 0; + // If we didn't find any IOSources with zero timeouts or it's time to + // force a poll, do that and return. Otherwise return the set of ready + // sources that we have. + if ( ready->empty() || time_to_poll ) + Poll(ready, timeout, timeout_src); + } - if ( soonest_src && (call_count % SELECT_FREQUENCY) != 0 ) - goto finished; +void Manager::Poll(std::vector* ready, double timeout, IOSource* timeout_src) + { + struct timespec kqueue_timeout; + ConvertTimeout(timeout, kqueue_timeout); - // Select on the join of all file descriptors. - fd_set fd_read, fd_write, fd_except; - - FD_ZERO(&fd_read); - FD_ZERO(&fd_write); - FD_ZERO(&fd_except); - - for ( SourceList::iterator i = sources.begin(); - i != sources.end(); ++i ) + int ret = kevent(event_queue, NULL, 0, events.data(), events.size(), &kqueue_timeout); + if ( ret == -1 ) { - Source* src = (*i); - - if ( ! src->src->IsIdle() ) - // No need to select on sources which we know to - // be ready. - continue; - - src->Clear(); - src->src->GetFds(&src->fd_read, &src->fd_write, &src->fd_except); - src->SetFds(&fd_read, &fd_write, &fd_except, &maxx); + // Ignore interrupts since we may catch one during shutdown and we don't want the + // error to get printed. + if ( errno != EINTR ) + reporter->InternalWarning("Error calling kevent: %s", strerror(errno)); } - - // We can't block indefinitely even when all sources are dry: - // we're doing some IOSource-independent stuff in the main loop, - // so we need to return from time to time. (Instead of no time-out - // at all, we use a very small one. This lets FreeBSD trigger a - // BPF buffer switch on the next read when the hold buffer is empty - // while the store buffer isn't filled yet. - - struct timeval timeout; - - if ( all_idle ) + else if ( ret == 0 ) { - // Interesting: when all sources are dry, simply sleeping a - // bit *without* watching for any fd becoming ready may - // decrease CPU load. I guess that's because it allows - // the kernel's packet buffers to fill. - Robin - timeout.tv_sec = 0; - timeout.tv_usec = 20; // SELECT_TIMEOUT; - select(0, 0, 0, 0, &timeout); + if ( timeout_src ) + ready->push_back(timeout_src); } - - if ( ! maxx ) - // No selectable fd at all. - goto finished; - - timeout.tv_sec = 0; - timeout.tv_usec = 0; - - if ( select(maxx + 1, &fd_read, &fd_write, &fd_except, &timeout) > 0 ) - { // Find soonest. - for ( SourceList::iterator i = sources.begin(); - i != sources.end(); ++i ) + else + { + // kevent returns the number of events that are ready, so we only need to loop + // over that many of them. + for ( int i = 0; i < ret; i++ ) { - Source* src = (*i); - - if ( ! src->src->IsIdle() ) - continue; - - if ( src->Ready(&fd_read, &fd_write, &fd_except) ) + if ( events[i].filter == EVFILT_READ ) { - double local_network_time = 0; - double ts = src->src->NextTimestamp(&local_network_time); - if ( ts >= 0.0 && ts < soonest_ts ) - { - soonest_ts = ts; - soonest_src = src->src; - soonest_local_network_time = - local_network_time ? - local_network_time : ts; - } + std::map::const_iterator it = fd_map.find(events[i].ident); + if ( it != fd_map.end() ) + ready->push_back(it->second); } } } + } -finished: - *ts = soonest_local_network_time; - return soonest_src; +void Manager::ConvertTimeout(double timeout, struct timespec& spec) + { + // If timeout ended up -1, set it to some nominal value just to keep the loop + // from blocking forever. This is the case of exit_only_after_terminate when + // there isn't anything else going on. + if ( timeout < 0 ) + { + spec.tv_sec = 0; + spec.tv_nsec = 1e8; + } + else + { + spec.tv_sec = static_cast(timeout); + spec.tv_nsec = static_cast((timeout - spec.tv_sec) * 1e9); + } + } + +void Manager::RegisterFd(int fd, IOSource* src) + { + struct kevent event; + EV_SET(&event, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + int ret = kevent(event_queue, &event, 1, NULL, 0, NULL); + if ( ret != -1 ) + { + events.push_back({}); + DBG_LOG(DBG_MAINLOOP, "Registered fd %d from %s", fd, src->Tag()); + fd_map[fd] = src; + + Wakeup("RegisterFd"); + } + else + { + DBG_LOG(DBG_MAINLOOP, "Failed to register fd %d from %s: %s", fd, src->Tag(), strerror(errno)); + } + } + +void Manager::UnregisterFd(int fd) + { + if ( fd_map.find(fd) != fd_map.end() ) + { + struct kevent event; + EV_SET(&event, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + int ret = kevent(event_queue, &event, 1, NULL, 0, NULL); + if ( ret != -1 ) + DBG_LOG(DBG_MAINLOOP, "Unregistered fd %d", fd); + + fd_map.erase(fd); + + Wakeup("UnregisterFd"); + } } void Manager::Register(IOSource* src, bool dont_count) @@ -195,7 +258,7 @@ void Manager::Register(IOSource* src, bool dont_count) } } - src->Init(); + src->InitSource(); Source* s = new Source; s->src = src; s->dont_count = dont_count; @@ -208,6 +271,16 @@ void Manager::Register(IOSource* src, bool dont_count) void Manager::Register(PktSrc* src) { pkt_src = src; + + // The poll interval gets defaulted to 100 which is good for cases like reading + // from pcap files and when there isn't a packet source, but is a little too + // infrequent for live sources (especially fast live sources). Set it down a + // little bit for those sources. + if ( src->IsLive() ) + poll_interval = 10; + else if ( pseudo_realtime ) + poll_interval = 1; + Register(src, false); } @@ -317,11 +390,3 @@ PktDumper* Manager::OpenPktDumper(const string& path, bool append) return pd; } - -void Manager::Source::SetFds(fd_set* read, fd_set* write, fd_set* except, - int* maxx) const - { - *maxx = std::max(*maxx, fd_read.Set(read)); - *maxx = std::max(*maxx, fd_write.Set(write)); - *maxx = std::max(*maxx, fd_except.Set(except)); - } diff --git a/src/iosource/Manager.h b/src/iosource/Manager.h index cdb8757540..ddf89db11a 100644 --- a/src/iosource/Manager.h +++ b/src/iosource/Manager.h @@ -2,30 +2,41 @@ #pragma once +#include "zeek-config.h" + #include -#include -#include "iosource/FD_Set.h" +#include +#include + +#include "IOSource.h" +#include "Flare.h" namespace iosource { -class IOSource; class PktSrc; class PktDumper; /** - * Singleton class managing all IOSources. + * Manager class for IO sources. This handles all of the polling of sources + * in the main loop. */ class Manager { public: /** * Constructor. */ - Manager() { call_count = 0; dont_counts = 0; } + Manager(); /** * Destructor. */ - ~Manager(); + virtual ~Manager(); + + /** + * Initializes some extra fields that can't be done during the + * due to dependencies on other objects being initialized first. + */ + void InitPostScript(); /** * Registers an IOSource with the manager. If the source is already @@ -41,23 +52,17 @@ public: */ void Register(IOSource* src, bool dont_count = false); - /** - * Returns the packet source with the soonest available input. This - * may block for a little while if all are dry. - * - * @param ts A pointer where to store the timestamp of the input that - * the soonest source has available next. - * - * @return The source, or null if no source has input. - */ - IOSource* FindSoonest(double* ts); - /** * Returns the number of registered and still active sources, - * excluding those that are registered as \a dont_cont. + * excluding those that are registered as \a dont_count. */ int Size() const { return sources.size() - dont_counts; } + /** + * Returns total number of sources including dont_counts; + */ + int TotalSize() const { return sources.size(); } + /** * Returns the registered PktSrc. If not source is registered yet, * returns a nullptr. @@ -93,49 +98,111 @@ public: */ PktDumper* OpenPktDumper(const std::string& path, bool append); + /** + * Finds the sources that have data ready to be processed. + * + * @param ready A vector used to return the set of sources that are ready. + */ + void FindReadySources(std::vector* ready); + + /** + * Registers a file descriptor and associated IOSource with the manager + * to be checked during FindReadySources. + * + * @param fd A file descriptor pointing at some resource that should be + * checked for readiness. + * @param src The IOSource that owns the file descriptor. + */ + void RegisterFd(int fd, IOSource* src); + + /** + * Unregisters a file descriptor from the FindReadySources checks. + */ + void UnregisterFd(int fd); + + /** + * Forces the poll in FindReadySources to wake up immediately. This method + * is called during RegisterFd and UnregisterFd since those methods cause + * changes to the active set of file descriptors. + */ + void Wakeup(const std::string& where); + private: - /** - * When looking for a source with something to process, every - * SELECT_FREQUENCY calls we will go ahead and block on a select(). - */ - static const int SELECT_FREQUENCY = 25; /** - * Microseconds to wait in an empty select if no source is ready. + * Calls the appropriate poll method to gather a set of IOSources that are + * ready for processing. + * + * @param ready a vector used to return the ready sources. + * @param timeout the value to be used for the timeout of the poll. This + * should be a value relative to the current network time, not an + * absolute time value. This may be zero to cause an infinite timeout or + * -1 to force a very short timeout. + * @param timeout_src The source associated with the current timeout value. + * This is typically a timer manager object. */ - static const int SELECT_TIMEOUT = 50; + void Poll(std::vector* ready, double timeout, IOSource* timeout_src); + /** + * Converts a double timeout value into a timespec struct used for calls + * to kevent(). + */ + void ConvertTimeout(double timeout, struct timespec& spec); + + /** + * Specialized registration method for packet sources. + */ void Register(PktSrc* src); + void RemoveAll(); - unsigned int call_count; - int dont_counts; + class WakeupHandler : public IOSource { + public: + WakeupHandler(); + ~WakeupHandler(); + + /** + * Tells the handler to wake up the loop by firing the flare. + * + * @param where a string denoting where this ping was called from. Used + * for debugging output. + */ + void Ping(const std::string& where); + + // IOSource API methods + void Process() override; + const char* Tag() override { return "WakeupHandler"; } + double GetNextTimeout() override { return -1; } + + private: + bro::Flare flare; + }; struct Source { IOSource* src; - FD_Set fd_read; - FD_Set fd_write; - FD_Set fd_except; bool dont_count; - - bool Ready(fd_set* read, fd_set* write, fd_set* except) const - { return fd_read.Ready(read) || fd_write.Ready(write) || - fd_except.Ready(except); } - - void SetFds(fd_set* read, fd_set* write, fd_set* except, - int* maxx) const; - - void Clear() - { fd_read.Clear(); fd_write.Clear(); fd_except.Clear(); } }; - typedef std::list SourceList; + using SourceList = std::vector; SourceList sources; - typedef std::list PktDumperList; + using PktDumperList = std::vector; + PktDumperList pkt_dumpers; PktSrc* pkt_src = nullptr; - PktDumperList pkt_dumpers; + + int dont_counts = 0; + int zero_timeout_count = 0; + WakeupHandler* wakeup = nullptr; + int poll_counter = 0; + int poll_interval = 100; + + int event_queue = -1; + std::map fd_map; + + // This is only used for the output of the call to kqueue in FindReadySources(). + // The actual events are stored as part of the queue. + std::vector events; }; } diff --git a/src/iosource/PktDumper.h b/src/iosource/PktDumper.h index 9a7949fc5d..2b6251ba0a 100644 --- a/src/iosource/PktDumper.h +++ b/src/iosource/PktDumper.h @@ -87,6 +87,7 @@ public: protected: friend class Manager; + friend class ManagerBase; /** * Structure to pass back information about the packet dumper to the diff --git a/src/iosource/PktSrc.h b/src/iosource/PktSrc.h index 2f08508d85..7ba9335df8 100644 --- a/src/iosource/PktSrc.h +++ b/src/iosource/PktSrc.h @@ -206,6 +206,7 @@ public: protected: friend class Manager; + friend class ManagerBase; // Methods to use by derived classes. diff --git a/src/main.cc b/src/main.cc index 5e47fa6089..72a1b5ce79 100644 --- a/src/main.cc +++ b/src/main.cc @@ -256,6 +256,8 @@ void terminate_bro() terminating = true; + iosource_mgr->Wakeup("terminate_bro"); + // File analysis termination may produce events, so do it early on in // the termination process. file_mgr->Terminate(); @@ -332,6 +334,9 @@ RETSIGTYPE sig_handler(int signo) set_processing_status("TERMINATING", "sig_handler"); signal_val = signo; + if ( ! terminating ) + iosource_mgr->Wakeup("sig_handler"); + return RETSIGVAL; } @@ -662,6 +667,7 @@ int main(int argc, char** argv) if ( reporter->Errors() > 0 ) exit(1); + iosource_mgr->InitPostScript(); plugin_mgr->InitPostScript(); zeekygen_mgr->InitPostScript(); broker_mgr->InitPostScript(); @@ -878,11 +884,6 @@ int main(int argc, char** argv) have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; - iosource_mgr->Register(thread_mgr, true); - - if ( zeek::supervisor_mgr ) - iosource_mgr->Register(zeek::supervisor_mgr); - if ( iosource_mgr->Size() > 0 || have_pending_timers || BifConst::exit_only_after_terminate )