Merge remote-tracking branch 'origin/topic/robin/log-threads' into topic/bernhard/log-threads

This commit is contained in:
Bernhard Amann 2012-02-13 02:30:24 -08:00
commit 8a6dfee00c
36 changed files with 414 additions and 304 deletions

View file

@ -2,9 +2,14 @@
#include <sys/signal.h>
#include <signal.h>
#include "config.h"
#include "BasicThread.h"
#include "Manager.h"
#ifdef HAVE_LINUX
#include <sys/prctl.h>
#endif
using namespace threading;
uint64_t BasicThread::thread_counter = 0;
@ -25,6 +30,8 @@ BasicThread::BasicThread()
BasicThread::~BasicThread()
{
if ( buf )
free(buf);
}
void BasicThread::SetName(const string& arg_name)
@ -35,8 +42,8 @@ void BasicThread::SetName(const string& arg_name)
void BasicThread::SetOSName(const string& name)
{
#ifdef LINUX
pthread_setname_np(pthread_self(), name.c_str());
#ifdef HAVE_LINUX
prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
#endif
#ifdef __APPLE__

View file

@ -82,7 +82,7 @@ public:
void Stop();
/**
* Returns true if Terminate() has been called.
* Returns true if Stop() has been called.
*
* This method is safe to call from any thread.
*/

View file

@ -7,9 +7,10 @@ Manager::Manager()
{
DBG_LOG(DBG_THREADING, "Creating thread manager ...");
did_process = false;
did_process = true;
next_beat = 0;
terminating = false;
idle = false;
}
Manager::~Manager()
@ -41,6 +42,7 @@ void Manager::Terminate()
all_threads.clear();
msg_threads.clear();
idle = true;
terminating = false;
}
@ -70,18 +72,22 @@ void Manager::GetFds(int* read, int* write, int* except)
double Manager::NextTimestamp(double* network_time)
{
if ( did_process || ! next_beat == 0 )
// If we had something to process last time (or haven't had a
// chance to check yet), we want to check for more asap.
if ( ::network_time && ! next_beat )
next_beat = ::network_time + HEART_BEAT_INTERVAL;
// fprintf(stderr, "N %.6f %.6f did_process=%d next_next=%.6f\n", ::network_time, timer_mgr->Time(), (int)did_process, next_beat);
if ( did_process || ::network_time > next_beat )
// If we had something to process last time (or out heartbeat
// is due), we want to check for more asap.
return timer_mgr->Time();
// Else we assume we don't have much to do at all and wait for the next heart beat.
return next_beat;
return -1.0;
}
void Manager::Process()
{
bool do_beat = (next_beat == 0 || network_time >= next_beat);
bool do_beat = (next_beat && network_time > next_beat);
did_process = false;
@ -90,14 +96,17 @@ void Manager::Process()
MsgThread* t = *i;
if ( do_beat )
{
t->Heartbeat();
next_beat = 0;
}
if ( ! t->HasOut() )
continue;
Message* msg = t->RetrieveOut();
if ( msg->Process() )
if ( msg->Process() && network_time )
did_process = true;
else
@ -110,15 +119,14 @@ void Manager::Process()
delete msg;
}
if ( do_beat )
next_beat = network_time + HEART_BEAT_INTERVAL;
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
}
const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats()
{
stats.clear();
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
{
MsgThread* t = *i;

View file

@ -142,12 +142,19 @@ void MsgThread::OnStop()
void MsgThread::Heartbeat()
{
SendIn(new HeartbeatMessage(this, network_time, current_time()));
}
string name = Fmt("%s (%d/%d)", name.c_str(),
cnt_sent_in - queue_in.Size(),
cnt_sent_out - queue_out.Size());
bool MsgThread::DoHeartbeat(double network_time, double current_time)
{
string n = Name();
SetOSName(name.c_str());
n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", n.c_str(),
cnt_sent_in - queue_in.Size(),
cnt_sent_out - queue_out.Size());
SetOSName(n.c_str());
return true;
}
void MsgThread::Info(const char* msg)
@ -197,7 +204,10 @@ void MsgThread::Debug(DebugStream stream, const char* msg)
void MsgThread::SendIn(BasicInputMessage* msg, bool force)
{
if ( Terminating() && ! force )
{
delete msg;
return;
}
DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str());
@ -209,7 +219,10 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force)
void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
{
if ( Terminating() && ! force )
{
delete msg;
return;
}
queue_out.Put(msg);

View file

@ -184,7 +184,10 @@ protected:
* This is method is called regularly by the threading::Manager.
*
* Can be overriden in derived classed to hook into the heart beat,
* but must call the parent implementation.
* but must call the parent implementation. Note that this method is
* always called by the main thread and must not access data of the
* child thread directly. See DoHeartbeat() if you want to do
* something on the child-side.
*/
virtual void Heartbeat();
@ -206,7 +209,7 @@ protected:
* current_time: Wall clock when the heartbeat was trigger by the
* main thread.
*/
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
virtual bool DoHeartbeat(double network_time, double current_time);
private:
/**