Merge remote-tracking branch 'origin/topic/awelzel/3682-bad-pipe-op-3'

* origin/topic/awelzel/3682-bad-pipe-op-3:
  threading/Manager: Warn if threads are added after termination
  iosource/Manager: Reap dry sources while computing timeout
  threading/MsgThread: Decouple IO source and thread lifetimes
  iosource/Manager: Do not manage lifetime of pkt_src
  iosource/Manager: Honor manage_lifetime and dont_count for short-lived IO sources
This commit is contained in:
Arne Welzel 2024-07-02 14:41:27 +02:00
commit e57aa5932e
11 changed files with 199 additions and 40 deletions

47
CHANGES
View file

@ -1,3 +1,50 @@
7.0.0-dev.413 | 2024-07-02 14:41:27 +0200
* threading/Manager: Warn if threads are added after termination (Arne Welzel, Corelight)
The core.file-analyzer-violation test showed that it's possible to
create new threads (log writers) when Zeek is in the process of
terminating. This can result in the IO manager's deconstructor
deleting IO sources for threads that are still running.
This is sort of a scripting issue, so for now log a reporter warning
when it happens to have a bit of a bread-crumb what might be
going on. In the future it might make sense to plug APIs with
zeek_is_terminating().
* iosource/Manager: Reap dry sources while computing timeout (Arne Welzel, Corelight)
Avoids looping over the sources vector twice and should result
in the same behavior.
* GH-3682: threading/MsgThread: Decouple IO source and thread lifetimes (Arne Welzel, Corelight)
MsgThread acting as an IO source can result in the situation where the
threading manager's heartbeat timer deletes a finished MsgThread instance,
but at the same time this thread is in the list of ready IO sources the
main loop is currently processing.
Fix this by decoupling the lifetime of the IO source part and properly
registering as lifetime managed IO sources with the IO manager.
Fixes #3682
* iosource/Manager: Do not manage lifetime of pkt_src (Arne Welzel, Corelight)
Now that dry sources are properly reaped and freed, an offline packet
source would be deleted once dry, resulting in GetPktSrc() returning
a wild pointer. Don't manage the packet source lifetime and instead
free it during Manager destruction.
* iosource/Manager: Honor manage_lifetime and dont_count for short-lived IO sources (Arne Welzel, Corelight)
If an IO source is registered and becomes dry at runtime, the IO
manager would not honor its manage_lifetime or dont_count attribute
during collection, resulting in memory leaks.
This probably hasn't mattered so far as there's no IO sources registered
in-tree at runtime using manage_lifetime=true.
7.0.0-dev.407 | 2024-06-26 13:16:10 +0200 7.0.0-dev.407 | 2024-06-26 13:16:10 +0200
* coverage/lcov_html: Allow missing coveralls token (Arne Welzel, Corelight) * coverage/lcov_html: Allow missing coveralls token (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.0.0-dev.407 7.0.0-dev.413

View file

@ -76,6 +76,9 @@ Manager::~Manager() {
pkt_dumpers.clear(); pkt_dumpers.clear();
// Was registered without lifetime management.
delete pkt_src;
#ifndef _MSC_VER #ifndef _MSC_VER
// There's a bug here with builds on Windows that causes an assertion with debug builds // There's a bug here with builds on Windows that causes an assertion with debug builds
// related to libkqueue returning a zero for the file descriptor. The assert happens // related to libkqueue returning a zero for the file descriptor. The assert happens
@ -104,24 +107,25 @@ void Manager::Wakeup(std::string_view where) {
wakeup->Ping(where); wakeup->Ping(where);
} }
void Manager::ReapSource(Source* src) {
auto* iosource = src->src;
assert(! iosource->IsOpen());
DBG_LOG(DBG_MAINLOOP, "Reaping %s", src->src->Tag());
iosource->Done();
if ( src->manage_lifetime )
delete iosource;
if ( src->dont_count )
dont_counts--;
delete src;
}
void Manager::FindReadySources(ReadySources* ready) { void Manager::FindReadySources(ReadySources* ready) {
ready->clear(); ready->clear();
// Remove sources which have gone dry. For simplicity, we only
// remove at most one each time.
for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i )
if ( ! (*i)->src->IsOpen() ) {
(*i)->src->Done();
delete *i;
sources.erase(i);
break;
}
// If there aren't any sources and exit_only_after_terminate is false, just
// return an empty set of sources. We want the main loop to end.
if ( Size() == 0 && (! BifConst::exit_only_after_terminate || run_state::terminating) )
return;
double timeout = -1; double timeout = -1;
IOSource* timeout_src = nullptr; IOSource* timeout_src = nullptr;
bool time_to_poll = false; bool time_to_poll = false;
@ -133,7 +137,8 @@ void Manager::FindReadySources(ReadySources* ready) {
} }
// Find the source with the next timeout value. // Find the source with the next timeout value.
for ( auto src : sources ) { for ( auto i = sources.begin(); i != sources.end(); /* noop */ ) {
auto* src = *i;
auto iosource = src->src; auto iosource = src->src;
if ( iosource->IsOpen() ) { if ( iosource->IsOpen() ) {
double next = iosource->GetNextTimeout(); double next = iosource->GetNextTimeout();
@ -161,7 +166,19 @@ void Manager::FindReadySources(ReadySources* ready) {
ready->push_back({pkt_src, -1, 0}); ready->push_back({pkt_src, -1, 0});
} }
} }
++i;
} }
else {
ReapSource(src);
i = sources.erase(i);
}
}
// If there aren't any sources and exit_only_after_terminate is false, just
// return an empty set of sources. We want the main loop to end.
if ( Size() == 0 && (! BifConst::exit_only_after_terminate || run_state::terminating) ) {
ready->clear();
return;
} }
DBG_LOG(DBG_MAINLOOP, "timeout: %f ready size: %zu time_to_poll: %d\n", timeout, ready->size(), time_to_poll); DBG_LOG(DBG_MAINLOOP, "timeout: %f ready size: %zu time_to_poll: %d\n", timeout, ready->size(), time_to_poll);
@ -342,7 +359,7 @@ void Manager::Register(IOSource* src, bool dont_count, bool manage_lifetime) {
void Manager::Register(PktSrc* src) { void Manager::Register(PktSrc* src) {
pkt_src = src; pkt_src = src;
Register(src, false); Register(src, false, false);
// Once we know if the source is live or not, adapt the // Once we know if the source is live or not, adapt the
// poll_interval accordingly. // poll_interval accordingly.

View file

@ -143,6 +143,15 @@ public:
void Wakeup(std::string_view where); void Wakeup(std::string_view where);
private: private:
/**
* Internal data structure for managing registered IOSources.
*/
struct Source {
IOSource* src = nullptr;
bool dont_count = false;
bool manage_lifetime = false;
};
/** /**
* Calls the appropriate poll method to gather a set of IOSources that are * Calls the appropriate poll method to gather a set of IOSources that are
* ready for processing. * ready for processing.
@ -170,6 +179,19 @@ private:
void RemoveAll(); void RemoveAll();
/**
* Reap a closed IO source.
*
* Reaping involves calling IOSource::Done() on the underlying IOSource,
* freeing it if Source.manage_lifetime is \c true, updating \c dont_counts
* and freeing \a src, making it invalid.
*
* The caller ensures \a src is removed from Manager.sources.
*
* @param src The source to reap.
*/
void ReapSource(Source* src);
class WakeupHandler final : public IOSource { class WakeupHandler final : public IOSource {
public: public:
WakeupHandler(); WakeupHandler();
@ -192,12 +214,6 @@ private:
zeek::detail::Flare flare; zeek::detail::Flare flare;
}; };
struct Source {
IOSource* src = nullptr;
bool dont_count = false;
bool manage_lifetime = false;
};
using SourceList = std::vector<Source*>; using SourceList = std::vector<Source*>;
SourceList sources; SourceList sources;

View file

@ -28,6 +28,7 @@ Manager::Manager() {
did_process = true; did_process = true;
next_beat = 0; next_beat = 0;
terminating = false; terminating = false;
terminated = false;
} }
Manager::~Manager() { Manager::~Manager() {
@ -61,10 +62,18 @@ void Manager::Terminate() {
all_threads.clear(); all_threads.clear();
msg_threads.clear(); msg_threads.clear();
terminating = false; terminating = false;
terminated = true;
} }
void Manager::AddThread(BasicThread* thread) { void Manager::AddThread(BasicThread* thread) {
DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name());
// This can happen when log writers or other threads are
// created during the shutdown phase and results in unclean
// shutdowns.
if ( terminated )
reporter->Warning("Thread %s added after threading manager terminated", thread->Name());
all_threads.push_back(thread); all_threads.push_back(thread);
if ( ! heartbeat_timer_running ) if ( ! heartbeat_timer_running )

View file

@ -146,6 +146,7 @@ private:
bool did_process; // True if the last Process() found some work to do. bool did_process; // True if the last Process() found some work to do.
double next_beat; // Timestamp when the next heartbeat will be sent. double next_beat; // Timestamp when the next heartbeat will be sent.
bool terminating; // True if we are in Terminate(). bool terminating; // True if we are in Terminate().
bool terminated; // True if Terminate() finished.
msg_stats_list stats; msg_stats_list stats;

View file

@ -165,6 +165,50 @@ bool ReporterMessage::Process() {
return true; return true;
} }
// This is the IO source used by MsgThread.
//
// The lifetime of the IO source is decoupled from
// the thread. The thread may be terminated prior
// to the IO source being properly unregistered and
// removed by the IO manager.
class IOSource : public iosource::IOSource {
public:
explicit IOSource(MsgThread* thread) : thread(thread) {
if ( ! iosource_mgr->RegisterFd(flare.FD(), this) )
reporter->InternalError("Failed to register MsgThread FD with iosource_mgr");
SetClosed(false);
}
~IOSource() override {
if ( ! iosource_mgr->UnregisterFd(flare.FD(), this) )
reporter->InternalError("Failed to unregister MsgThread FD from iosource_mgr");
}
void Process() override {
flare.Extinguish();
if ( thread )
thread->Process();
}
const char* Tag() override { return thread ? thread->Name() : "<MsgThread orphan>"; }
double GetNextTimeout() override { return -1; }
void Fire() { flare.Fire(); };
void Close() {
thread = nullptr;
SetClosed(true);
}
private:
MsgThread* thread = nullptr;
zeek::detail::Flare flare;
};
} // namespace detail } // namespace detail
////// Methods. ////// Methods.
@ -181,16 +225,20 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp
failed = false; failed = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) io_source = new detail::IOSource(this);
reporter->FatalError("Failed to register MsgThread fd with iosource_mgr");
SetClosed(false); // Register IOSource as non-counting lifetime managed IO source.
iosource_mgr->Register(io_source, true);
} }
MsgThread::~MsgThread() { MsgThread::~MsgThread() {
// Unregister this thread from the iosource manager so it doesn't wake // Unregister this thread from the IO source so we don't
// up the main poll anymore. // get Process() callbacks anymore. The IO source itself
iosource_mgr->UnregisterFd(flare.FD(), this); // is life-time managed by the IO manager.
if ( io_source ) {
io_source->Close();
io_source = nullptr;
}
} }
void MsgThread::OnSignalStop() { void MsgThread::OnSignalStop() {
@ -253,7 +301,13 @@ void MsgThread::OnWaitForStop() {
} }
void MsgThread::OnKill() { void MsgThread::OnKill() {
SetClosed(true); // Ensure the IO source is closed and won't call Process() on this
// thread anymore. The thread got killed, so the threading manager will
// remove it forcefully soon.
if ( io_source ) {
io_source->Close();
io_source = nullptr;
}
// Send a message to unblock the reader if its currently waiting for // Send a message to unblock the reader if its currently waiting for
// input. This is just an optimization to make it terminate more // input. This is just an optimization to make it terminate more
@ -345,7 +399,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
++cnt_sent_out; ++cnt_sent_out;
flare.Fire(); if ( io_source )
io_source->Fire();
} }
void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) { void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) {
@ -418,8 +473,6 @@ void MsgThread::GetStats(Stats* stats) {
} }
void MsgThread::Process() { void MsgThread::Process() {
flare.Extinguish();
while ( HasOut() ) { while ( HasOut() ) {
Message* msg = RetrieveOut(); Message* msg = RetrieveOut();
assert(msg); assert(msg);

View file

@ -26,6 +26,7 @@ class HeartbeatMessage;
class FinishMessage; class FinishMessage;
class FinishedMessage; class FinishedMessage;
class KillMeMessage; class KillMeMessage;
class IOSource;
} // namespace detail } // namespace detail
@ -40,7 +41,7 @@ class KillMeMessage;
* that happens, the thread stops accepting any new messages, finishes * that happens, the thread stops accepting any new messages, finishes
* processes all remaining ones still in the queue, and then exits. * processes all remaining ones still in the queue, and then exits.
*/ */
class MsgThread : public BasicThread, public iosource::IOSource { class MsgThread : public BasicThread {
public: public:
/** /**
* Constructor. It automatically registers the thread with the * Constructor. It automatically registers the thread with the
@ -209,11 +210,9 @@ public:
void GetStats(Stats* stats); void GetStats(Stats* stats);
/** /**
* Overridden from iosource::IOSource. * Process() forwarded to from detail::IOSource.
*/ */
void Process() override; void Process();
const char* Tag() override { return Name(); }
double GetNextTimeout() override { return -1; }
protected: protected:
friend class Manager; friend class Manager;
@ -362,7 +361,7 @@ private:
bool child_sent_finish; // Child thread asked to be finished. bool child_sent_finish; // Child thread asked to be finished.
bool failed; // Set to true when a command failed. bool failed; // Set to true when a command failed.
zeek::detail::Flare flare; detail::IOSource* io_source = nullptr; // IO source registered with the IO manager.
}; };
/** /**

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -8,3 +8,4 @@
#fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid #fields ts fuid uid id.orig_h id.orig_p id.resp_h id.resp_p source depth analyzers mime_type filename duration local_orig is_orig seen_bytes total_bytes missing_bytes overflow_bytes timedout parent_fuid
#types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string #types time string string addr port addr port string count set[string] string string interval bool bool count count count count bool string
XXXXXXXXXX.XXXXXX FKPuH630Tmj6UQUMP7 - - - - - ./myfile.exe 0 PE application/x-dosexec - 0.000000 - - 64 - 0 0 F - XXXXXXXXXX.XXXXXX FKPuH630Tmj6UQUMP7 - - - - - ./myfile.exe 0 PE application/x-dosexec - 0.000000 - - 64 - 0 0 F -
#close XXXX-XX-XX-XX-XX-XX

View file

@ -1,20 +1,34 @@
# @TEST-DOC: Verify analyzer_violation_info is raised for an invalid PE file. # @TEST-DOC: Verify analyzer_violation_info is raised for an invalid PE file.
# @TEST-EXEC: zeek -b %INPUT # @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: btest-diff .stderr
# @TEST-EXEC: btest-diff .stdout # @TEST-EXEC: btest-diff .stdout
# @TEST-EXEC: btest-diff files.log # @TEST-EXEC: btest-diff files.log
@load base/frameworks/files @load base/frameworks/files
@load base/files/pe @load base/files/pe
redef exit_only_after_terminate = T;
event analyzer_violation_info(tag: AllAnalyzers::Tag, info: AnalyzerViolationInfo) event analyzer_violation_info(tag: AllAnalyzers::Tag, info: AnalyzerViolationInfo)
{ {
print tag, info$reason, info$f$id, cat(info$f$info$analyzers); print tag, info$reason, info$f$id, cat(info$f$info$analyzers);
terminate();
}
event force_terminate()
{
if ( zeek_is_terminating() )
return;
Reporter::error("force_terminate called - timeout?");
terminate();
} }
event zeek_init() event zeek_init()
{ {
local source: string = "./myfile.exe"; local source: string = "./myfile.exe";
Input::add_analysis([$source=source, $name=source]); Input::add_analysis([$source=source, $name=source]);
schedule 10sec { force_terminate() };
} }
# This file triggers a binpac exception for PE that is reported through # This file triggers a binpac exception for PE that is reported through