threading/MsgThread: Decouple IO source and thread lifetimes

MsgThread acting as an IO source can result in the situation where the
threading manager's heartbeat timer deletes a finished MsgThread instance,
but at the same time this thread is in the list of ready IO sources the
main loop is currently processing.

Fix this by decoupling the lifetime of the IO source part and properly
registering as lifetime managed IO sources with the IO manager.

Fixes #3682
This commit is contained in:
Arne Welzel 2024-06-18 09:41:08 +02:00
parent 0451a4038c
commit b3118d2a48
2 changed files with 68 additions and 16 deletions

View file

@ -165,6 +165,50 @@ bool ReporterMessage::Process() {
return true; return true;
} }
// This is the IO source used by MsgThread.
//
// The lifetime of the IO source is decoupled from
// the thread. The thread may be terminated prior
// to the IO source being properly unregistered and
// removed by the IO manager.
class IOSource : public iosource::IOSource {
public:
explicit IOSource(MsgThread* thread) : thread(thread) {
if ( ! iosource_mgr->RegisterFd(flare.FD(), this) )
reporter->InternalError("Failed to register MsgThread FD with iosource_mgr");
SetClosed(false);
}
~IOSource() override {
if ( ! iosource_mgr->UnregisterFd(flare.FD(), this) )
reporter->InternalError("Failed to unregister MsgThread FD from iosource_mgr");
}
void Process() override {
flare.Extinguish();
if ( thread )
thread->Process();
}
const char* Tag() override { return thread ? thread->Name() : "<MsgThread orphan>"; }
double GetNextTimeout() override { return -1; }
void Fire() { flare.Fire(); };
void Close() {
thread = nullptr;
SetClosed(true);
}
private:
MsgThread* thread = nullptr;
zeek::detail::Flare flare;
};
} // namespace detail } // namespace detail
////// Methods. ////// Methods.
@ -181,16 +225,20 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp
failed = false; failed = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) io_source = new detail::IOSource(this);
reporter->FatalError("Failed to register MsgThread fd with iosource_mgr");
SetClosed(false); // Register IOSource as non-counting lifetime managed IO source.
iosource_mgr->Register(io_source, true);
} }
MsgThread::~MsgThread() { MsgThread::~MsgThread() {
// Unregister this thread from the iosource manager so it doesn't wake // Unregister this thread from the IO source so we don't
// up the main poll anymore. // get Process() callbacks anymore. The IO source itself
iosource_mgr->UnregisterFd(flare.FD(), this); // is life-time managed by the IO manager.
if ( io_source ) {
io_source->Close();
io_source = nullptr;
}
} }
void MsgThread::OnSignalStop() { void MsgThread::OnSignalStop() {
@ -253,7 +301,13 @@ void MsgThread::OnWaitForStop() {
} }
void MsgThread::OnKill() { void MsgThread::OnKill() {
SetClosed(true); // Ensure the IO source is closed and won't call Process() on this
// thread anymore. The thread got killed, so the threading manager will
// remove it forcefully soon.
if ( io_source ) {
io_source->Close();
io_source = nullptr;
}
// Send a message to unblock the reader if its currently waiting for // Send a message to unblock the reader if its currently waiting for
// input. This is just an optimization to make it terminate more // input. This is just an optimization to make it terminate more
@ -345,7 +399,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) {
++cnt_sent_out; ++cnt_sent_out;
flare.Fire(); if ( io_source )
io_source->Fire();
} }
void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) { void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) {
@ -418,8 +473,6 @@ void MsgThread::GetStats(Stats* stats) {
} }
void MsgThread::Process() { void MsgThread::Process() {
flare.Extinguish();
while ( HasOut() ) { while ( HasOut() ) {
Message* msg = RetrieveOut(); Message* msg = RetrieveOut();
assert(msg); assert(msg);

View file

@ -26,6 +26,7 @@ class HeartbeatMessage;
class FinishMessage; class FinishMessage;
class FinishedMessage; class FinishedMessage;
class KillMeMessage; class KillMeMessage;
class IOSource;
} // namespace detail } // namespace detail
@ -40,7 +41,7 @@ class KillMeMessage;
* that happens, the thread stops accepting any new messages, finishes * that happens, the thread stops accepting any new messages, finishes
* processes all remaining ones still in the queue, and then exits. * processes all remaining ones still in the queue, and then exits.
*/ */
class MsgThread : public BasicThread, public iosource::IOSource { class MsgThread : public BasicThread {
public: public:
/** /**
* Constructor. It automatically registers the thread with the * Constructor. It automatically registers the thread with the
@ -209,11 +210,9 @@ public:
void GetStats(Stats* stats); void GetStats(Stats* stats);
/** /**
* Overridden from iosource::IOSource. * Process() forwarded to from detail::IOSource.
*/ */
void Process() override; void Process();
const char* Tag() override { return Name(); }
double GetNextTimeout() override { return -1; }
protected: protected:
friend class Manager; friend class Manager;
@ -362,7 +361,7 @@ private:
bool child_sent_finish; // Child thread asked to be finished. bool child_sent_finish; // Child thread asked to be finished.
bool failed; // Set to true when a command failed. bool failed; // Set to true when a command failed.
zeek::detail::Flare flare; detail::IOSource* io_source = nullptr; // IO source registered with the IO manager.
}; };
/** /**