diff --git a/src/Stats.cc b/src/Stats.cc index 05ce33daed..27b433c9ee 100644 --- a/src/Stats.cc +++ b/src/Stats.cc @@ -224,7 +224,8 @@ void ProfileLogger::Log() i != thread_stats.end(); ++i ) { threading::MsgThread::Stats s = i->second; - file->Write(fmt(" %20s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64 "\n", + file->Write(fmt("%0.6f %-15s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64 "\n", + network_time, i->first.c_str(), s.sent_in, s.sent_out, s.pending_in, s.pending_out)); diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 52da7c7400..1bda8943da 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -206,6 +206,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) return; queue_out.Put(msg); + ++cnt_sent_out; } diff --git a/src/threading/Queue.h b/src/threading/Queue.h index add7019f9c..a25f897d23 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -67,7 +67,6 @@ private: int read_ptr; // Where the next operation will read from int write_ptr; // Where the next operation will write to - uint64_t size; // Current queue size. }; inline static void safe_lock(pthread_mutex_t* mutex) @@ -120,7 +119,6 @@ inline T Queue::Get() T data = messages[read_ptr].front(); messages[read_ptr].pop(); - --size; read_ptr = (read_ptr + 1) % NUM_QUEUES; @@ -139,7 +137,6 @@ inline void Queue::Put(T data) bool need_signal = messages[write_ptr].empty(); messages[write_ptr].push(data); - ++size; if ( need_signal ) pthread_cond_signal(&has_data[write_ptr]); @@ -165,13 +162,19 @@ inline bool Queue::Ready() template inline uint64_t Queue::Size() { - safe_lock(&mutex[read_ptr]); + // Need to lock all queues. + for ( int i = 0; i < NUM_QUEUES; i++ ) + safe_lock(&mutex[i]); - uint64_t s = size; + uint64_t size = 0; - safe_unlock(&mutex[read_ptr]); + for ( int i = 0; i < NUM_QUEUES; i++ ) + size += messages[i].size(); - return s; + for ( int i = 0; i < NUM_QUEUES; i++ ) + safe_unlock(&mutex[i]); + + return size; } }