Fixing prof.log output.

The queue Size() method was not yet atomic.
This commit is contained in:
Robin Sommer 2012-02-03 03:27:25 -08:00
parent 4879cb7b0d
commit cf6a346b86
3 changed files with 13 additions and 8 deletions

View file

@ -224,7 +224,8 @@ void ProfileLogger::Log()
i != thread_stats.end(); ++i ) i != thread_stats.end(); ++i )
{ {
threading::MsgThread::Stats s = i->second; threading::MsgThread::Stats s = i->second;
file->Write(fmt(" %20s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64 "\n", file->Write(fmt("%0.6f %-15s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64 "\n",
network_time,
i->first.c_str(), i->first.c_str(),
s.sent_in, s.sent_out, s.sent_in, s.sent_out,
s.pending_in, s.pending_out)); s.pending_in, s.pending_out));

View file

@ -206,6 +206,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
return; return;
queue_out.Put(msg); queue_out.Put(msg);
++cnt_sent_out; ++cnt_sent_out;
} }

View file

@ -67,7 +67,6 @@ private:
int read_ptr; // Where the next operation will read from int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to int write_ptr; // Where the next operation will write to
uint64_t size; // Current queue size.
}; };
inline static void safe_lock(pthread_mutex_t* mutex) inline static void safe_lock(pthread_mutex_t* mutex)
@ -120,7 +119,6 @@ inline T Queue<T>::Get()
T data = messages[read_ptr].front(); T data = messages[read_ptr].front();
messages[read_ptr].pop(); messages[read_ptr].pop();
--size;
read_ptr = (read_ptr + 1) % NUM_QUEUES; read_ptr = (read_ptr + 1) % NUM_QUEUES;
@ -139,7 +137,6 @@ inline void Queue<T>::Put(T data)
bool need_signal = messages[write_ptr].empty(); bool need_signal = messages[write_ptr].empty();
messages[write_ptr].push(data); messages[write_ptr].push(data);
++size;
if ( need_signal ) if ( need_signal )
pthread_cond_signal(&has_data[write_ptr]); pthread_cond_signal(&has_data[write_ptr]);
@ -165,13 +162,19 @@ inline bool Queue<T>::Ready()
template<typename T> template<typename T>
inline uint64_t Queue<T>::Size() inline uint64_t Queue<T>::Size()
{ {
safe_lock(&mutex[read_ptr]); // Need to lock all queues.
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]);
uint64_t s = size; uint64_t size = 0;
safe_unlock(&mutex[read_ptr]); for ( int i = 0; i < NUM_QUEUES; i++ )
size += messages[i].size();
return s; for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
return size;
} }
} }