diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index a1d2e5e3da..022a8ce2b4 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -165,6 +165,50 @@ bool ReporterMessage::Process() { return true; } +// This is the IO source used by MsgThread. +// +// The lifetime of the IO source is decoupled from +// the thread. The thread may be terminated prior +// to the IO source being properly unregistered and +// removed by the IO manager. +class IOSource : public iosource::IOSource { +public: + explicit IOSource(MsgThread* thread) : thread(thread) { + if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) + reporter->InternalError("Failed to register MsgThread FD with iosource_mgr"); + + SetClosed(false); + } + + ~IOSource() override { + if ( ! iosource_mgr->UnregisterFd(flare.FD(), this) ) + reporter->InternalError("Failed to unregister MsgThread FD from iosource_mgr"); + } + + void Process() override { + flare.Extinguish(); + + if ( thread ) + thread->Process(); + } + + const char* Tag() override { return thread ? thread->Name() : ""; } + + double GetNextTimeout() override { return -1; } + + + void Fire() { flare.Fire(); }; + + void Close() { + thread = nullptr; + SetClosed(true); + } + +private: + MsgThread* thread = nullptr; + zeek::detail::Flare flare; +}; + } // namespace detail ////// Methods. @@ -181,16 +225,20 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullp failed = false; thread_mgr->AddMsgThread(this); - if ( ! iosource_mgr->RegisterFd(flare.FD(), this) ) - reporter->FatalError("Failed to register MsgThread fd with iosource_mgr"); + io_source = new detail::IOSource(this); - SetClosed(false); + // Register IOSource as non-counting lifetime managed IO source. + iosource_mgr->Register(io_source, true); } MsgThread::~MsgThread() { - // Unregister this thread from the iosource manager so it doesn't wake - // up the main poll anymore. - iosource_mgr->UnregisterFd(flare.FD(), this); + // Unregister this thread from the IO source so we don't + // get Process() callbacks anymore. The IO source itself + // is life-time managed by the IO manager. + if ( io_source ) { + io_source->Close(); + io_source = nullptr; + } } void MsgThread::OnSignalStop() { @@ -253,7 +301,13 @@ void MsgThread::OnWaitForStop() { } void MsgThread::OnKill() { - SetClosed(true); + // Ensure the IO source is closed and won't call Process() on this + // thread anymore. The thread got killed, so the threading manager will + // remove it forcefully soon. + if ( io_source ) { + io_source->Close(); + io_source = nullptr; + } // Send a message to unblock the reader if its currently waiting for // input. This is just an optimization to make it terminate more @@ -345,7 +399,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { ++cnt_sent_out; - flare.Fire(); + if ( io_source ) + io_source->Fire(); } void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals) { @@ -418,8 +473,6 @@ void MsgThread::GetStats(Stats* stats) { } void MsgThread::Process() { - flare.Extinguish(); - while ( HasOut() ) { Message* msg = RetrieveOut(); assert(msg); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 55b8f0ba1d..259e64b11f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -26,6 +26,7 @@ class HeartbeatMessage; class FinishMessage; class FinishedMessage; class KillMeMessage; +class IOSource; } // namespace detail @@ -40,7 +41,7 @@ class KillMeMessage; * that happens, the thread stops accepting any new messages, finishes * processes all remaining ones still in the queue, and then exits. */ -class MsgThread : public BasicThread, public iosource::IOSource { +class MsgThread : public BasicThread { public: /** * Constructor. It automatically registers the thread with the @@ -209,11 +210,9 @@ public: void GetStats(Stats* stats); /** - * Overridden from iosource::IOSource. + * Process() forwarded to from detail::IOSource. */ - void Process() override; - const char* Tag() override { return Name(); } - double GetNextTimeout() override { return -1; } + void Process(); protected: friend class Manager; @@ -362,7 +361,7 @@ private: bool child_sent_finish; // Child thread asked to be finished. bool failed; // Set to true when a command failed. - zeek::detail::Flare flare; + detail::IOSource* io_source = nullptr; // IO source registered with the IO manager. }; /**