diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 53c11f2ee9..cfc44596e1 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -80,8 +80,10 @@ double Manager::NextTimestamp(double* network_time) for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) { - if ( (*i)->MightHaveOut() ) - return timer_mgr->Time(); + MsgThread* t = *i; + + if ( (*i)->MightHaveOut() && ! t->Killed() ) + return timer_mgr->Time(); } return -1.0; @@ -95,6 +97,12 @@ void Manager::KillThreads() (*i)->Kill(); } +void Manager::KillThread(BasicThread* thread) + { + DBG_LOG(DBG_THREADING, "Killing thread %s ...", thread->Name()); + thread->Kill(); + } + void Manager::Process() { bool do_beat = false; @@ -114,7 +122,7 @@ void Manager::Process() if ( do_beat ) t->Heartbeat(); - while ( t->HasOut() ) + while ( t->HasOut() && ! t->Killed() ) { Message* msg = t->RetrieveOut(); diff --git a/src/threading/Manager.h b/src/threading/Manager.h index be81c69ba0..b46a06a46e 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -74,6 +74,16 @@ public: */ void ForceProcessing() { Process(); } + /** + * Signals a specific threads to terminate immediately. + */ + void KillThread(BasicThread* thread); + + /** + * Signals all threads to terminate immediately. + */ + void KillThreads(); + protected: friend class BasicThread; friend class MsgThread; @@ -106,13 +116,6 @@ protected: */ virtual double NextTimestamp(double* network_time); - /** - * Kills all thread immediately. Note that this may cause race conditions - * if a child thread currently holds a lock that might block somebody - * else. - */ - virtual void KillThreads(); - /** * Part of the IOSource interface. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 48c7253885..e0f3fd8b0c 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -70,6 +70,16 @@ private: Type type; }; +// A message from the the child to the main process, requesting suicide. +class KillMeMessage : public OutputMessage +{ +public: + KillMeMessage(MsgThread* thread) + : OutputMessage("ReporterMessage", thread) {} + + virtual bool Process() { thread_mgr->KillThread(Object()); return true; } +}; + #ifdef DEBUG // A debug message from the child to be passed on to the DebugLogger. class DebugMessage : public OutputMessage @@ -346,16 +356,20 @@ void MsgThread::Run() if ( ! result ) { - string s = Fmt("%s failed, terminating thread (MsgThread)", Name()); - Error(s.c_str()); - break; + Error("terminating thread"); + + // This will eventually kill this thread, but only + // after all other outgoing messages (in particular + // error messages have been processed by then main + // thread). + SendOut(new KillMeMessage(this)); } } // In case we haven't send the finish method yet, do it now. Reading // global network_time here should be fine, it isn't changing // anymore. - if ( ! finished ) + if ( ! finished && ! Killed() ) { OnFinish(network_time); Finished();