From fcca8670d3cf98cf2bd71172d0d8993470cc4f40 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 27 Jun 2024 14:03:12 +0200 Subject: [PATCH 1/5] iosource/Manager: Honor manage_lifetime and dont_count for short-lived IO sources If an IO source is registered and becomes dry at runtime, the IO manager would not honor its manage_lifetime or dont_count attribute during collection, resulting in memory leaks. This probably hasn't mattered so far as there's no IO sources registered in-tree at runtime using manage_lifetime=true. --- src/iosource/Manager.cc | 19 +++++++++++++++++-- src/iosource/Manager.h | 28 ++++++++++++++++++++++------ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/iosource/Manager.cc b/src/iosource/Manager.cc index fcb4a1e961..952bb13704 100644 --- a/src/iosource/Manager.cc +++ b/src/iosource/Manager.cc @@ -104,6 +104,22 @@ void Manager::Wakeup(std::string_view where) { wakeup->Ping(where); } +void Manager::ReapSource(Source* src) { + auto* iosource = src->src; + assert(! iosource->IsOpen()); + + DBG_LOG(DBG_MAINLOOP, "Reaping %s", src->src->Tag()); + iosource->Done(); + + if ( src->manage_lifetime ) + delete iosource; + + if ( src->dont_count ) + dont_counts--; + + delete src; +} + void Manager::FindReadySources(ReadySources* ready) { ready->clear(); @@ -111,8 +127,7 @@ void Manager::FindReadySources(ReadySources* ready) { // remove at most one each time. for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) if ( ! (*i)->src->IsOpen() ) { - (*i)->src->Done(); - delete *i; + ReapSource(*i); sources.erase(i); break; } diff --git a/src/iosource/Manager.h b/src/iosource/Manager.h index c533afb982..48f8814b2b 100644 --- a/src/iosource/Manager.h +++ b/src/iosource/Manager.h @@ -143,6 +143,15 @@ public: void Wakeup(std::string_view where); private: + /** + * Internal data structure for managing registered IOSources. + */ + struct Source { + IOSource* src = nullptr; + bool dont_count = false; + bool manage_lifetime = false; + }; + /** * Calls the appropriate poll method to gather a set of IOSources that are * ready for processing. @@ -170,6 +179,19 @@ private: void RemoveAll(); + /** + * Reap a closed IO source. + * + * Reaping involves calling IOSource::Done() on the underlying IOSource, + * freeing it if Source.manage_lifetime is \c true, updating \c dont_counts + * and freeing \a src, making it invalid. + * + * The caller ensures \a src is removed from Manager.sources. + * + * @param src The source to reap. + */ + void ReapSource(Source* src); + class WakeupHandler final : public IOSource { public: WakeupHandler(); @@ -192,12 +214,6 @@ private: zeek::detail::Flare flare; }; - struct Source { - IOSource* src = nullptr; - bool dont_count = false; - bool manage_lifetime = false; - }; - using SourceList = std::vector; SourceList sources; From 0451a4038c84980afddd243d1343b27dadb9011f Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 18 Jun 2024 12:38:53 +0200 Subject: [PATCH 2/5] iosource/Manager: Do not manage lifetime of pkt_src Now that dry sources are properly reaped and freed, an offline packet source would be deleted once dry, resulting in GetPktSrc() returning a wild pointer. Don't manage the packet source lifetime and instead free it during Manager destruction. --- src/iosource/Manager.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/iosource/Manager.cc b/src/iosource/Manager.cc index 952bb13704..a7821e1a5f 100644 --- a/src/iosource/Manager.cc +++ b/src/iosource/Manager.cc @@ -76,6 +76,9 @@ Manager::~Manager() { pkt_dumpers.clear(); + // Was registered without lifetime management. + delete pkt_src; + #ifndef _MSC_VER // There's a bug here with builds on Windows that causes an assertion with debug builds // related to libkqueue returning a zero for the file descriptor. The assert happens @@ -357,7 +360,7 @@ void Manager::Register(IOSource* src, bool dont_count, bool manage_lifetime) { void Manager::Register(PktSrc* src) { pkt_src = src; - Register(src, false); + Register(src, false, false); // Once we know if the source is live or not, adapt the // poll_interval accordingly. From b3118d2a4847914bb4c7caf84b674dca97b6d853 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 18 Jun 2024 09:41:08 +0200 Subject: [PATCH 3/5] threading/MsgThread: Decouple IO source and thread lifetimes MsgThread acting as an IO source can result in the situation where the threading manager's heartbeat timer deletes a finished MsgThread instance, but at the same time this thread is in the list of ready IO sources the main loop is currently processing. Fix this by decoupling the lifetime of the IO source part and properly registering as lifetime managed IO sources with the IO manager. Fixes #3682 --- src/threading/MsgThread.cc | 73 ++++++++++++++++++++++++++++++++------ src/threading/MsgThread.h | 11 +++--- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index a1d2e5e3da..022a8ce2b4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -165,6 +165,50 @@ bool ReporterMessage::Process() { return true; } +// This is the IO source used by MsgThread. +// +// The lifetime of the IO source is decoupled from +// the thread. The thread may be terminated prior +// to the IO source being properly unregistered and +// removed by the IO manager. +class IOSource : public iosource::IOSource { +public: + explicit IOSource(MsgThread* thread) : thread(thread) { + if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) + reporter->InternalError("Failed to register MsgThread FD with iosource_mgr"); + + SetClosed(false); + } + + ~IOSource() override { + if ( ! iosource_mgr->UnregisterFd(flare.FD(), this) ) + reporter->InternalError("Failed to unregister MsgThread FD from iosource_mgr"); + } + + void Process() override { + flare.Extinguish(); + + if ( thread ) + thread->Process(); + } + + const char* Tag() override { return thread ? thread->Name() : ""; } + + double GetNextTimeout() override { return -1; } + + + void Fire() { flare.Fire(); }; + + void Close() { + thread = nullptr; + SetClosed(true); + } + +private: + MsgThread* thread = nullptr; + zeek::detail::Flare flare; +}; + } // namespace detail ////// Methods. @@ -181,16 +225,20 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp failed = false; thread_mgr->AddMsgThread(this); - if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) - reporter->FatalError("Failed to register MsgThread fd with iosource_mgr"); + io_source = new detail::IOSource(this); - SetClosed(false); + // Register IOSource as non-counting lifetime managed IO source. + iosource_mgr->Register(io_source, true); } MsgThread::~MsgThread() { - // Unregister this thread from the iosource manager so it doesn't wake - // up the main poll anymore. - iosource_mgr->UnregisterFd(flare.FD(), this); + // Unregister this thread from the IO source so we don't + // get Process() callbacks anymore. The IO source itself + // is life-time managed by the IO manager. + if ( io_source ) { + io_source->Close(); + io_source = nullptr; + } } void MsgThread::OnSignalStop() { @@ -253,7 +301,13 @@ void MsgThread::OnWaitForStop() { } void MsgThread::OnKill() { - SetClosed(true); + // Ensure the IO source is closed and won't call Process() on this + // thread anymore. The thread got killed, so the threading manager will + // remove it forcefully soon. + if ( io_source ) { + io_source->Close(); + io_source = nullptr; + } // Send a message to unblock the reader if its currently waiting for // input. This is just an optimization to make it terminate more @@ -345,7 +399,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; - flare.Fire(); + if ( io_source ) + io_source->Fire(); } void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) { @@ -418,8 +473,6 @@ void MsgThread::GetStats(Stats* stats) { } void MsgThread::Process() { - flare.Extinguish(); - while ( HasOut() ) { Message* msg = RetrieveOut(); assert(msg); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 55b8f0ba1d..259e64b11f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -26,6 +26,7 @@ class HeartbeatMessage; class FinishMessage; class FinishedMessage; class KillMeMessage; +class IOSource; } // namespace detail @@ -40,7 +41,7 @@ class KillMeMessage; * that happens, the thread stops accepting any new messages, finishes * processes all remaining ones still in the queue, and then exits. */ -class MsgThread : public BasicThread, public iosource::IOSource { +class MsgThread : public BasicThread { public: /** * Constructor. It automatically registers the thread with the @@ -209,11 +210,9 @@ public: void GetStats(Stats* stats); /** - * Overridden from iosource::IOSource. + * Process() forwarded to from detail::IOSource. */ - void Process() override; - const char* Tag() override { return Name(); } - double GetNextTimeout() override { return -1; } + void Process(); protected: friend class Manager; @@ -362,7 +361,7 @@ private: bool child_sent_finish; // Child thread asked to be finished. bool failed; // Set to true when a command failed. - zeek::detail::Flare flare; + detail::IOSource* io_source = nullptr; // IO source registered with the IO manager. }; /** From 739a8ac5090b2b2299c2d4c89ac329a1907f89e4 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 27 Jun 2024 14:14:39 +0200 Subject: [PATCH 4/5] iosource/Manager: Reap dry sources while computing timeout Avoids looping over the sources vector twice and should result in the same behavior. --- src/iosource/Manager.cc | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/iosource/Manager.cc b/src/iosource/Manager.cc index a7821e1a5f..bbef07a922 100644 --- a/src/iosource/Manager.cc +++ b/src/iosource/Manager.cc @@ -126,20 +126,6 @@ void Manager::ReapSource(Source* src) { void Manager::FindReadySources(ReadySources* ready) { ready->clear(); - // Remove sources which have gone dry. For simplicity, we only - // remove at most one each time. - for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) - if ( ! (*i)->src->IsOpen() ) { - ReapSource(*i); - sources.erase(i); - break; - } - - // If there aren't any sources and exit_only_after_terminate is false, just - // return an empty set of sources. We want the main loop to end. - if ( Size() == 0 && (! BifConst::exit_only_after_terminate || run_state::terminating) ) - return; - double timeout = -1; IOSource* timeout_src = nullptr; bool time_to_poll = false; @@ -151,7 +137,8 @@ void Manager::FindReadySources(ReadySources* ready) { } // Find the source with the next timeout value. - for ( auto src : sources ) { + for ( auto i = sources.begin(); i != sources.end(); /* noop */ ) { + auto* src = *i; auto iosource = src->src; if ( iosource->IsOpen() ) { double next = iosource->GetNextTimeout(); @@ -179,7 +166,19 @@ void Manager::FindReadySources(ReadySources* ready) { ready->push_back({pkt_src, -1, 0}); } } + ++i; } + else { + ReapSource(src); + i = sources.erase(i); + } + } + + // If there aren't any sources and exit_only_after_terminate is false, just + // return an empty set of sources. We want the main loop to end. + if ( Size() == 0 && (! BifConst::exit_only_after_terminate || run_state::terminating) ) { + ready->clear(); + return; } DBG_LOG(DBG_MAINLOOP, "timeout: %f ready size: %zu time_to_poll: %d\n", timeout, ready->size(), time_to_poll); From f050d96503b9d551543b0503e32c51de5c500aa8 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 2 Jul 2024 12:12:15 +0200 Subject: [PATCH 5/5] threading/Manager: Warn if threads are added after termination The core.file-analyzer-violation test showed that it's possible to create new threads (log writers) when Zeek is in the process of terminating. This can result in the IO manager's deconstructor deleting IO sources for threads that are still running. This is sort of a scripting issue, so for now log a reporter warning when it happens to have a bit of a bread-crumb what might be going on. In the future it might make sense to plug APIs with zeek_is_terminating(). --- src/threading/Manager.cc | 9 +++++++++ src/threading/Manager.h | 1 + .../Baseline/core.file-analyzer-violation/.stderr | 2 ++ .../core.file-analyzer-violation/files.log | 1 + testing/btest/core/file-analyzer-violation.zeek | 14 ++++++++++++++ 5 files changed, 27 insertions(+) create mode 100644 testing/btest/Baseline/core.file-analyzer-violation/.stderr diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 9bcdc925f4..5620a0bf80 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -28,6 +28,7 @@ Manager::Manager() { did_process = true; next_beat = 0; terminating = false; + terminated = false; } Manager::~Manager() { @@ -61,10 +62,18 @@ void Manager::Terminate() { all_threads.clear(); msg_threads.clear(); terminating = false; + terminated = true; } void Manager::AddThread(BasicThread* thread) { DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); + + // This can happen when log writers or other threads are + // created during the shutdown phase and results in unclean + // shutdowns. + if ( terminated ) + reporter->Warning("Thread %s added after threading manager terminated", thread->Name()); + all_threads.push_back(thread); if ( ! heartbeat_timer_running ) diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 875e35290a..b075e6a70d 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -146,6 +146,7 @@ private: bool did_process; // True if the last Process() found some work to do. double next_beat; // Timestamp when the next heartbeat will be sent. bool terminating; // True if we are in Terminate(). + bool terminated; // True if Terminate() finished. msg_stats_list stats; diff --git a/testing/btest/Baseline/core.file-analyzer-violation/.stderr b/testing/btest/Baseline/core.file-analyzer-violation/.stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/core.file-analyzer-violation/.stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/core.file-analyzer-violation/files.log b/testing/btest/Baseline/core.file-analyzer-violation/files.log index abef87d6a6..3b26732673 100644 --- a/testing/btest/Baseline/core.file-analyzer-violation/files.log +++ b/testing/btest/Baseline/core.file-analyzer-violation/files.log @@ -8,3 +8,4 @@ #fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid #types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string XXXXXXXXXX.XXXXXX FKPuH630Tmj6UQUMP7 - - - - - ./myfile.exe 0 PE application/x-dosexec - 0.000000 - - 64 - 0 0 F - +#close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/core/file-analyzer-violation.zeek b/testing/btest/core/file-analyzer-violation.zeek index 6d73d2bfb6..70757bc8d2 100644 --- a/testing/btest/core/file-analyzer-violation.zeek +++ b/testing/btest/core/file-analyzer-violation.zeek @@ -1,20 +1,34 @@ # @TEST-DOC: Verify analyzer_violation_info is raised for an invalid PE file. # @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: btest-diff .stderr # @TEST-EXEC: btest-diff .stdout # @TEST-EXEC: btest-diff files.log @load base/frameworks/files @load base/files/pe +redef exit_only_after_terminate = T; + event analyzer_violation_info(tag: AllAnalyzers::Tag, info: AnalyzerViolationInfo) { print tag, info$reason, info$f$id, cat(info$f$info$analyzers); + terminate(); + } + +event force_terminate() + { + if ( zeek_is_terminating() ) + return; + + Reporter::error("force_terminate called - timeout?"); + terminate(); } event zeek_init() { local source: string = "./myfile.exe"; Input::add_analysis([$source=source, $name=source]); + schedule 10sec { force_terminate() }; } # This file triggers a binpac exception for PE that is reported through