Merge remote-tracking branch 'origin/master' into topic/bernhard/input-threads

Conflicts:
	src/CMakeLists.txt
	testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log
	testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log
This commit is contained in:
Bernhard Amann 2012-05-18 15:26:36 -07:00
commit 3b82d69eb3
167 changed files with 3528 additions and 1066 deletions

View file

@ -10,13 +10,21 @@ namespace threading {
////// Messages.
// Signals child thread to terminate. This is actually a no-op; its only
// purpose is unblock the current read operation so that the child's Run()
// methods can check the termination status.
class TerminateMessage : public InputMessage<MsgThread>
// Signals child thread to shutdown operation.
class FinishMessage : public InputMessage<MsgThread>
{
public:
TerminateMessage(MsgThread* thread) : InputMessage<MsgThread>("Terminate", thread) { }
FinishMessage(MsgThread* thread) : InputMessage<MsgThread>("Finish", thread) { }
virtual bool Process() { return Object()->DoFinish(); }
};
// A dummy message that's only purpose is unblock the current read operation
// so that the child's Run() methods can check the termination status.
class UnblockMessage : public InputMessage<MsgThread>
{
public:
UnblockMessage(MsgThread* thread) : InputMessage<MsgThread>("Unblock", thread) { }
virtual bool Process() { return true; }
};
@ -130,13 +138,29 @@ bool ReporterMessage::Process()
MsgThread::MsgThread() : BasicThread()
{
cnt_sent_in = cnt_sent_out = 0;
finished = false;
thread_mgr->AddMsgThread(this);
}
void MsgThread::OnStop()
{
// This is to unblock the current queue read operation.
SendIn(new TerminateMessage(this), true);
// Signal thread to terminate and wait until it has acknowledged.
SendIn(new FinishMessage(this), true);
int cnt = 0;
while ( ! finished )
{
if ( ++cnt > 1000 ) // Insurance against broken threads ...
{
reporter->Warning("thread %s didn't finish in time", Name().c_str());
break;
}
usleep(1000);
}
// One more message to make sure the current queue read operation unblocks.
SendIn(new UnblockMessage(this), true);
}
void MsgThread::Heartbeat()
@ -157,6 +181,14 @@ bool MsgThread::DoHeartbeat(double network_time, double current_time)
return true;
}
bool MsgThread::DoFinish()
{
// This is thread-safe "enough", we're the only one ever writing
// there.
finished = true;
return true;
}
void MsgThread::Info(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg));
@ -189,7 +221,9 @@ void MsgThread::InternalWarning(const char* msg)
void MsgThread::InternalError(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INTERNAL_ERROR, this, msg));
// This one aborts immediately.
fprintf(stderr, "internal error in thread: %s\n", msg);
abort();
}
#ifdef DEBUG