Fixing threads' DoFinish() method.

It wasn't called reliably. Now, it's always called before the thread
is destroyed (assuming processing has went normally so far).
This commit is contained in:
Robin Sommer 2012-04-03 22:12:44 -07:00
parent 6e7faafdb7
commit 99e3c58494
2 changed files with 50 additions and 7 deletions

View file

@ -10,13 +10,21 @@ namespace threading {
////// Messages. ////// Messages.
// Signals child thread to terminate. This is actually a no-op; its only // Signals child thread to shutdown operation.
// purpose is unblock the current read operation so that the child's Run() class FinishMessage : public InputMessage<MsgThread>
// methods can check the termination status.
class TerminateMessage : public InputMessage<MsgThread>
{ {
public: public:
TerminateMessage(MsgThread* thread) : InputMessage<MsgThread>("Terminate", thread) { } FinishMessage(MsgThread* thread) : InputMessage<MsgThread>("Finish", thread) { }
virtual bool Process() { return Object()->DoFinish(); }
};
// A dummy message that's only purpose is unblock the current read operation
// so that the child's Run() methods can check the termination status.
class UnblockMessage : public InputMessage<MsgThread>
{
public:
UnblockMessage(MsgThread* thread) : InputMessage<MsgThread>("Unblock", thread) { }
virtual bool Process() { return true; } virtual bool Process() { return true; }
}; };
@ -130,13 +138,30 @@ bool ReporterMessage::Process()
MsgThread::MsgThread() : BasicThread() MsgThread::MsgThread() : BasicThread()
{ {
cnt_sent_in = cnt_sent_out = 0; cnt_sent_in = cnt_sent_out = 0;
finished = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
} }
void MsgThread::OnStop() void MsgThread::OnStop()
{ {
// This is to unblock the current queue read operation. // Signal thread to terminate and wait until it has acknowledged.
SendIn(new TerminateMessage(this), true); SendIn(new FinishMessage(this), true);
int cnt = 0;
while ( ! finished )
{
if ( ++cnt > 1000 ) // Insurance against broken threads ...
{
abort();
reporter->Warning("thread %s didn't finish in time", Name().c_str());
break;
}
usleep(1000);
}
// One more message to make sure the current queue read operation unblocks.
SendIn(new UnblockMessage(this), true);
} }
void MsgThread::Heartbeat() void MsgThread::Heartbeat()
@ -157,6 +182,14 @@ bool MsgThread::DoHeartbeat(double network_time, double current_time)
return true; return true;
} }
bool MsgThread::DoFinish()
{
// This is thread-safe "enough", we're the only one ever writing
// there.
finished = true;
return true;
}
void MsgThread::Info(const char* msg) void MsgThread::Info(const char* msg)
{ {
SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg)); SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg));

View file

@ -171,6 +171,8 @@ public:
protected: protected:
friend class Manager; friend class Manager;
friend class HeartbeatMessage; friend class HeartbeatMessage;
friend class FinishMessage;
friend class FinishedMessage;
/** /**
* Pops a message sent by the child from the child-to-main queue. * Pops a message sent by the child from the child-to-main queue.
@ -215,6 +217,12 @@ protected:
*/ */
virtual bool DoHeartbeat(double network_time, double current_time); virtual bool DoHeartbeat(double network_time, double current_time);
/** Triggered for execution in the child thread just before shutting threads down.
* The child thread shoudl finish its operations and then *must*
* call this class' implementation.
*/
virtual bool DoFinish();
private: private:
/** /**
* Pops a message sent by the main thread from the main-to-chold * Pops a message sent by the main thread from the main-to-chold
@ -270,6 +278,8 @@ private:
uint64_t cnt_sent_in; // Counts message sent to child. uint64_t cnt_sent_in; // Counts message sent to child.
uint64_t cnt_sent_out; // Counts message sent by child. uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message.
}; };
/** /**