Merge remote-tracking branch 'origin/topic/johanna/threads'

Tweaked the new threading code a bit more.

* origin/topic/johanna/threads:
  Move threading to c++11 primitives (mostly).
This commit is contained in:
Robin Sommer 2017-03-03 10:30:12 -08:00
commit 6302b103f4
10 changed files with 111 additions and 127 deletions

View file

@ -1,7 +1,8 @@
#ifndef THREADING_QUEUE_H
#define THREADING_QUEUE_H
#include <pthread.h>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <deque>
#include <stdint.h>
@ -22,7 +23,7 @@ namespace threading {
*
* All Queue instances must be instantiated by Bro's main thread.
*
* TODO: Unclear how critical performance is for this qeueue. We could like;y
* TODO: Unclear how critical performance is for this qeueue. We could likely
* optimize it further if helpful.
*/
template<typename T>
@ -71,9 +72,10 @@ public:
*/
bool MaybeReady() { return (num_reads != num_writes); }
/** Wake up the reader if it's currently blocked for input. This is
primarily to give it a chance to check termination quickly.
**/
/**
* Wake up the reader if it's currently blocked for input. This is
* primarily to give it a chance to check termination quickly.
*/
void WakeUp();
/**
@ -94,14 +96,17 @@ public:
* Returns statistics about the queue's usage.
*
* @param stats A pointer to a structure that will be filled with
* current numbers. */
* current numbers.
*/
void GetStats(Stats* stats);
private:
static const int NUM_QUEUES = 8;
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses.
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available
std::vector<std::unique_lock<std::mutex>> LocksForAllQueues();
std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses.
std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from
@ -115,17 +120,18 @@ private:
uint64_t num_writes;
};
inline static void safe_lock(pthread_mutex_t* mutex)
inline static std::unique_lock<std::mutex> acquire_lock(std::mutex& m)
{
int res = pthread_mutex_lock(mutex);
if ( res != 0 )
reporter->FatalErrorWithCore("cannot lock mutex: %d(%s)", res, strerror(res));
}
inline static void safe_unlock(pthread_mutex_t* mutex)
{
if ( pthread_mutex_unlock(mutex) != 0 )
reporter->FatalErrorWithCore("cannot unlock mutex");
try
{
return std::move(std::unique_lock<std::mutex>(m));
}
catch ( const std::system_error& e )
{
reporter->FatalErrorWithCore("cannot lock mutex: %s", e.what());
// Never gets here.
throw std::exception();
}
}
template<typename T>
@ -136,50 +142,28 @@ inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
num_reads = num_writes = 0;
reader = arg_reader;
writer = arg_writer;
for( int i = 0; i < NUM_QUEUES; ++i )
{
if ( pthread_cond_init(&has_data[i], 0) != 0 )
reporter->FatalError("cannot init queue condition variable");
if ( pthread_mutex_init(&mutex[i], 0) != 0 )
reporter->FatalError("cannot init queue mutex");
}
}
template<typename T>
inline Queue<T>::~Queue()
{
for( int i = 0; i < NUM_QUEUES; ++i )
{
pthread_cond_destroy(&has_data[i]);
pthread_mutex_destroy(&mutex[i]);
}
}
template<typename T>
inline T Queue<T>::Get()
{
safe_lock(&mutex[read_ptr]);
auto lock = acquire_lock(mutex[read_ptr]);
int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) )
{
struct timespec ts;
ts.tv_sec = time(0) + 5;
ts.tv_nsec = 0;
pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts);
safe_unlock(&mutex[read_ptr]);
return 0;
if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout )
return nullptr;
}
else if ( messages[read_ptr].empty() )
{
safe_unlock(&mutex[read_ptr]);
return 0;
}
if ( messages[read_ptr].empty() )
return nullptr;
T data = messages[read_ptr].front();
messages[read_ptr].pop();
@ -187,15 +171,13 @@ inline T Queue<T>::Get()
read_ptr = (read_ptr + 1) % NUM_QUEUES;
++num_reads;
safe_unlock(&mutex[old_read_ptr]);
return data;
}
template<typename T>
inline void Queue<T>::Put(T data)
{
safe_lock(&mutex[write_ptr]);
auto lock = acquire_lock(mutex[write_ptr]);
int old_write_ptr = write_ptr;
@ -203,43 +185,59 @@ inline void Queue<T>::Put(T data)
messages[write_ptr].push(data);
if ( need_signal )
pthread_cond_signal(&has_data[write_ptr]);
write_ptr = (write_ptr + 1) % NUM_QUEUES;
++num_writes;
safe_unlock(&mutex[old_write_ptr]);
if ( need_signal )
{
lock.unlock();
has_data[old_write_ptr].notify_one();
}
}
template<typename T>
inline bool Queue<T>::Ready()
{
safe_lock(&mutex[read_ptr]);
auto lock = acquire_lock(mutex[read_ptr]);
bool ret = (messages[read_ptr].size());
safe_unlock(&mutex[read_ptr]);
return ret;
}
template<typename T>
inline std::vector<std::unique_lock<std::mutex>> Queue<T>::LocksForAllQueues()
{
std::vector<std::unique_lock<std::mutex>> locks;
try
{
for ( int i = 0; i < NUM_QUEUES; i++ )
locks.emplace_back(std::unique_lock<std::mutex>(mutex[i]));
}
catch ( const std::system_error& e )
{
reporter->FatalErrorWithCore("cannot lock all mutexes: %s", e.what());
// Never gets here.
throw std::exception();
}
return std::move(locks);
}
template<typename T>
inline uint64_t Queue<T>::Size()
{
// Need to lock all queues.
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]);
auto locks = LocksForAllQueues();
uint64_t size = 0;
for ( int i = 0; i < NUM_QUEUES; i++ )
size += messages[i].size();
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
return size;
}
@ -248,29 +246,19 @@ inline void Queue<T>::GetStats(Stats* stats)
{
// To be safe, we look all queues. That's probably unneccessary, but
// doesn't really hurt.
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]);
auto locks = LocksForAllQueues();
stats->num_reads = num_reads;
stats->num_writes = num_writes;
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
}
template<typename T>
inline void Queue<T>::WakeUp()
{
for ( int i = 0; i < NUM_QUEUES; i++ )
{
safe_lock(&mutex[i]);
pthread_cond_signal(&has_data[i]);
safe_unlock(&mutex[i]);
}
has_data[i].notify_all();
}
}
#endif