Another small change to MsgThread API.

Threads will now reliably get a call to DoFinish() no matter how the
thread terminates. This will always be called from within the thread,
whereas the destructor is called from the main thread after the child
thread has already terminated.

Also removing debugging code.

However, two problems remain with the ASCII writer (seeing them only
on MacOS):

    - the #start/#end timestamps contain only dummy values right now.
      The odd thing is that once I enable strftime() to print actual
      timestamps, I get crashes (even though strftime() is supposed to
      be thread-safe).

    - occassionally, there's still output missing in tests. In those
      cases, the file descriptor apparently goes bad: a write() will
      suddently return EBADF for reasons I don't understand yet.
This commit is contained in:
Robin Sommer 2012-07-22 15:50:12 -07:00
parent 053b307e24
commit 71fc2a1728
6 changed files with 39 additions and 27 deletions

View file

@ -53,12 +53,11 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
Ascii::~Ascii()
{
//fprintf(stderr, "DTOR %p\n", this);
// Normally, the file will be closed here already via the Finish()
// message. But when we terminate abnormally, we may still have it open.
if ( fd )
CloseFile(0);
if ( ! ascii_done )
{
fprintf(stderr, "internal error: finish missing\n");
abort();
}
delete [] separator;
delete [] set_separator;
@ -77,7 +76,7 @@ bool Ascii::WriteHeaderField(const string& key, const string& val)
void Ascii::CloseFile(double t)
{
if ( ! fd)
if ( ! fd )
return;
if ( include_meta )
@ -170,7 +169,7 @@ bool Ascii::DoFinish(double network_time)
{
if ( ascii_done )
{
fprintf(stderr, "duplicate finish message\n");
fprintf(stderr, "internal error: duplicate finish\n");
abort();
}
@ -353,6 +352,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
// It would so escape the first character.
char buf[16];
snprintf(buf, sizeof(buf), "\\x%02x", bytes[0]);
if ( ! safe_write(fd, buf, strlen(buf)) )
goto write_error;
@ -416,14 +416,23 @@ string Ascii::LogExt()
string Ascii::Timestamp(double t)
{
#if 1
return "2012-01-01-00-00-00";
#else
// Using the version below leads to occasional crashes at least on Mac OS.
// Not sure why, all the function should be thread-safe ...
time_t teatime = time_t(t);
struct tm tmbuf;
struct tm* tm = localtime_r(&teatime, &tmbuf);
char buf[128];
char tmp[128];
const char* const date_fmt = "%Y-%m-%d-%H-%M-%S";
strftime(buf, sizeof(buf), date_fmt, tm);
return buf;
strftime(tmp, sizeof(tmp), date_fmt, tm);
return tmp;
#endif
}

View file

@ -35,7 +35,7 @@ BasicThread::BasicThread()
BasicThread::~BasicThread()
{
if ( buf )
if ( buf )
free(buf);
delete [] name;
@ -50,6 +50,7 @@ void BasicThread::SetName(const char* arg_name)
void BasicThread::SetOSName(const char* arg_name)
{
#ifdef HAVE_LINUX
prctl(PR_SET_NAME, arg_name, 0, 0, 0);
#endif
@ -131,16 +132,12 @@ void BasicThread::PrepareStop()
void BasicThread::Stop()
{
// XX fprintf(stderr, "stop1 %s %d %d\n", name, started, terminating);
if ( ! started )
return;
if ( terminating )
return;
// XX fprintf(stderr, "stop2 %s\n", name);
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name);
OnStop();
@ -177,7 +174,6 @@ void BasicThread::Kill()
void BasicThread::Done()
{
// XX fprintf(stderr, "DONE from thread %s\n", name);
DBG_LOG(DBG_THREADING, "Thread %s has finished", name);
terminating = true;

View file

@ -155,8 +155,6 @@ void MsgThread::OnPrepareStop()
if ( finished || Killed() )
return;
// XX fprintf(stderr, "Sending FINISH to thread %s ...\n", Name());
// Signal thread to terminate and wait until it has acknowledged.
SendIn(new FinishMessage(this, network_time), true);
}
@ -356,7 +354,14 @@ void MsgThread::Run()
delete msg;
}
Finished();
// In case we haven't send the finish method yet, do it now. Reading
// global network_time here should be fine, it isn't changing
// anymore.
if ( ! finished )
{
OnFinish(network_time);
Finished();
}
}
void MsgThread::GetStats(Stats* stats)

View file

@ -197,10 +197,6 @@ protected:
*/
virtual void Heartbeat();
/** Flags that the child process has finished processing. Called from child.
*/
void Finished();
/** Internal heartbeat processing. Called from child.
*/
void HeartbeatInChild();
@ -217,8 +213,7 @@ protected:
virtual bool OnHeartbeat(double network_time, double current_time) = 0;
/** Triggered for execution in the child thread just before shutting threads down.
* The child thread should finish its operations and then *must*
* call this class' implementation.
* The child thread should finish its operations.
*/
virtual bool OnFinish(double network_time) = 0;
@ -288,6 +283,10 @@ private:
*/
bool MightHaveOut() { return queue_out.MaybeReady(); }
/** Flags that the child process has finished processing. Called from child.
*/
void Finished();
Queue<BasicInputMessage *> queue_in;
Queue<BasicOutputMessage *> queue_out;

View file

@ -1301,6 +1301,9 @@ bool safe_write(int fd, const char* data, int len)
if ( errno == EINTR )
continue;
fprintf(stderr, "safe_write error: %d\n", errno);
abort();
return false;
}

View file

@ -2,4 +2,4 @@
#
# Default canonifier used with the tests in testing/btest/*.
`dirname $0`/diff-remove-timestamps | grep -v XXX
`dirname $0`/diff-remove-timestamps