Move threading to c++11 primitives (mostly).

This moves all threading code in Bro from pthreads to the c++11
primitives, which make for shorter, easier to use, and less error-prone
code.

pthreads is still used in 2 places in Bro currently. BasicThread uses
two bits of functionality that are not available using the c++ API
(setting thread names & setting signal masks). Since all c++
implementations that I am aware of still use an underlying pthreads
implementation, we just use native_handle to access the underlying
pthreads implementation for these cases. I do not expect this to lead to
problems in the forseable future. If we ever encounter a platform where
a different thread architecture is used, we might have to change that
around.

This code is guarded by static_asserts, so we will notice if a platform
uses a different implementation.

sqlite also uses pthreads directly.
This commit is contained in:
Johanna Amann 2017-03-02 08:53:38 -08:00
parent 1c973f4adf
commit 9341ff801c
8 changed files with 70 additions and 123 deletions

View file

@ -8,7 +8,6 @@ using namespace plugin::Bro_RawReader;
Plugin::Plugin() Plugin::Plugin()
{ {
init = false;
} }
plugin::Configuration Plugin::Configure() plugin::Configuration Plugin::Configure()
@ -23,21 +22,14 @@ plugin::Configuration Plugin::Configure()
void Plugin::InitPreScript() void Plugin::InitPreScript()
{ {
if ( pthread_mutex_init(&fork_mutex, 0) != 0 )
reporter->FatalError("cannot initialize raw reader's mutex");
init = true;
} }
void Plugin::Done() void Plugin::Done()
{ {
pthread_mutex_destroy(&fork_mutex);
init = false;
} }
pthread_mutex_t* Plugin::ForkMutex() std::unique_lock<std::mutex> Plugin::ForkMutex()
{ {
assert(init); return std::unique_lock<std::mutex>(fork_mutex, std::defer_lock);
return &fork_mutex;
} }

View file

@ -1,8 +1,9 @@
// See the file in the main distribution directory for copyright. // See the file in the main distribution directory for copyright.
#include "plugin/Plugin.h" #include "plugin/Plugin.h"
#include "Raw.h" #include "Raw.h"
#include <mutex>
namespace plugin { namespace plugin {
namespace Bro_RawReader { namespace Bro_RawReader {
@ -16,11 +17,10 @@ public:
virtual void InitPreScript(); virtual void InitPreScript();
virtual void Done(); virtual void Done();
pthread_mutex_t * ForkMutex(); std::unique_lock<std::mutex> ForkMutex();
private: private:
bool init; std::mutex fork_mutex;
pthread_mutex_t fork_mutex;
}; };

View file

@ -96,24 +96,16 @@ bool Raw::SetFDFlags(int fd, int cmd, int flags)
} }
bool Raw::LockForkMutex() std::unique_lock<std::mutex> Raw::AcquireForkMutex()
{ {
int res = pthread_mutex_lock(plugin::Bro_RawReader::plugin.ForkMutex()); auto lock = plugin::Bro_RawReader::plugin.ForkMutex();
if ( res == 0 ) try {
return true; lock.lock();
} catch(const std::system_error& e) {
Error(Fmt("cannot lock fork mutex: %d", res)); reporter->FatalErrorWithCore("cannot lock fork mutex: %s", e.what());
return false;
} }
bool Raw::UnlockForkMutex() return lock;
{
int res = pthread_mutex_unlock(plugin::Bro_RawReader::plugin.ForkMutex());
if ( res == 0 )
return true;
Error(Fmt("cannot unlock fork mutex: %d", res));
return false;
} }
bool Raw::Execute() bool Raw::Execute()
@ -126,12 +118,10 @@ bool Raw::Execute()
// never crops up... ("never" meaning I haven't seen in it in // never crops up... ("never" meaning I haven't seen in it in
// hundreds of tests using 50+ threads where before I'd see the issue // hundreds of tests using 50+ threads where before I'd see the issue
// w/ just 2 threads ~33% of the time). // w/ just 2 threads ~33% of the time).
if ( ! LockForkMutex() ) auto lock = AcquireForkMutex();
return false;
if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) )
{ {
UnlockForkMutex();
Error(Fmt("Could not open pipe: %d", errno)); Error(Fmt("Could not open pipe: %d", errno));
return false; return false;
} }
@ -139,7 +129,6 @@ bool Raw::Execute()
childpid = fork(); childpid = fork();
if ( childpid < 0 ) if ( childpid < 0 )
{ {
UnlockForkMutex();
Error(Fmt("Could not create child process: %d", errno)); Error(Fmt("Could not create child process: %d", errno));
return false; return false;
} }
@ -208,8 +197,7 @@ bool Raw::Execute()
} }
} }
if ( ! UnlockForkMutex() ) lock.unlock();
return false;
ClosePipeEnd(stdout_out); ClosePipeEnd(stdout_out);

View file

@ -4,8 +4,8 @@
#define INPUT_READERS_RAW_H #define INPUT_READERS_RAW_H
#include <vector> #include <vector>
#include <pthread.h>
#include <memory> #include <memory>
#include <mutex>
#include "input/ReaderBackend.h" #include "input/ReaderBackend.h"
@ -37,8 +37,7 @@ protected:
private: private:
void ClosePipeEnd(int i); void ClosePipeEnd(int i);
bool SetFDFlags(int fd, int cmd, int flags); bool SetFDFlags(int fd, int cmd, int flags);
bool LockForkMutex(); std::unique_lock<std::mutex> AcquireForkMutex();
bool UnlockForkMutex();
bool OpenInput(); bool OpenInput();
bool CloseInput(); bool CloseInput();
@ -87,7 +86,6 @@ private:
}; };
static const int block_size; static const int block_size;
static pthread_mutex_t fork_mutex;
}; };
} }

View file

@ -5,6 +5,7 @@
#include "bro-config.h" #include "bro-config.h"
#include "BasicThread.h" #include "BasicThread.h"
#include "Manager.h" #include "Manager.h"
#include "pthread.h"
#ifdef HAVE_LINUX #ifdef HAVE_LINUX
#include <sys/prctl.h> #include <sys/prctl.h>
@ -21,7 +22,6 @@ BasicThread::BasicThread()
started = false; started = false;
terminating = false; terminating = false;
killed = false; killed = false;
pthread = 0;
buf_len = STD_FMT_BUF_LEN; buf_len = STD_FMT_BUF_LEN;
buf = (char*) safe_malloc(buf_len); buf = (char*) safe_malloc(buf_len);
@ -50,6 +50,7 @@ void BasicThread::SetName(const char* arg_name)
void BasicThread::SetOSName(const char* arg_name) void BasicThread::SetOSName(const char* arg_name)
{ {
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
#ifdef HAVE_LINUX #ifdef HAVE_LINUX
prctl(PR_SET_NAME, arg_name, 0, 0, 0); prctl(PR_SET_NAME, arg_name, 0, 0, 0);
@ -60,7 +61,7 @@ void BasicThread::SetOSName(const char* arg_name)
#endif #endif
#ifdef FREEBSD #ifdef FREEBSD
pthread_set_name_np(pthread_self(), arg_name, arg_name); pthread_set_name_np(thread.native_handle(), arg_name, arg_name);
#endif #endif
} }
@ -108,9 +109,7 @@ void BasicThread::Start()
started = true; started = true;
int err = pthread_create(&pthread, 0, BasicThread::launcher, this); thread = std::thread(&BasicThread::launcher, this);
if ( err != 0 )
reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err));
DBG_LOG(DBG_THREADING, "Started thread %s", name); DBG_LOG(DBG_THREADING, "Started thread %s", name);
@ -147,17 +146,18 @@ void BasicThread::Join()
if ( ! started ) if ( ! started )
return; return;
if ( ! pthread ) if ( ! thread.joinable() )
return; return;
assert(terminating); assert(terminating);
if ( pthread_join(pthread, 0) != 0 ) try {
reporter->FatalError("Failure joining thread %s", name); thread.join();
} catch(const std::system_error& e) {
reporter->FatalError("Failure joining thread %s with error %s", name, e.what());
}
DBG_LOG(DBG_THREADING, "Joined with thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name);
pthread = 0;
} }
void BasicThread::Kill() void BasicThread::Kill()
@ -180,6 +180,7 @@ void BasicThread::Done()
void* BasicThread::launcher(void *arg) void* BasicThread::launcher(void *arg)
{ {
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
BasicThread* thread = (BasicThread *)arg; BasicThread* thread = (BasicThread *)arg;
// Block signals in thread. We handle signals only in the main // Block signals in thread. We handle signals only in the main

View file

@ -2,8 +2,7 @@
#ifndef THREADING_BASICTHREAD_H #ifndef THREADING_BASICTHREAD_H
#define THREADING_BASICTHREAD_H #define THREADING_BASICTHREAD_H
#include <pthread.h> #include <thread>
#include <semaphore.h>
#include "util.h" #include "util.h"
@ -35,6 +34,9 @@ public:
*/ */
BasicThread(); BasicThread();
BasicThread(BasicThread const&) = delete;
BasicThread& operator =(BasicThread const&) = delete;
/** /**
* Returns a descriptive name for the thread. If not set via * Returns a descriptive name for the thread. If not set via
* SetName(). If not set, a default name is choosen automatically. * SetName(). If not set, a default name is choosen automatically.
@ -192,11 +194,11 @@ protected:
void Done(); void Done();
private: private:
// pthread entry function. // thread entry function.
static void* launcher(void *arg); static void* launcher(void *arg);
const char* name; const char* name;
pthread_t pthread; std::thread thread;
bool started; // Set to to true once running. bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination. bool terminating; // Set to to true to signal termination.
bool killed; // Set to true once forcefully killed. bool killed; // Set to true once forcefully killed.

View file

@ -2,8 +2,6 @@
#ifndef THREADING_MSGTHREAD_H #ifndef THREADING_MSGTHREAD_H
#define THREADING_MSGTHREAD_H #define THREADING_MSGTHREAD_H
#include <pthread.h>
#include "DebugLogger.h" #include "DebugLogger.h"
#include "BasicThread.h" #include "BasicThread.h"

View file

@ -1,7 +1,8 @@
#ifndef THREADING_QUEUE_H #ifndef THREADING_QUEUE_H
#define THREADING_QUEUE_H #define THREADING_QUEUE_H
#include <pthread.h> #include <mutex>
#include <condition_variable>
#include <queue> #include <queue>
#include <deque> #include <deque>
#include <stdint.h> #include <stdint.h>
@ -22,7 +23,7 @@ namespace threading {
* *
* All Queue instances must be instantiated by Bro's main thread. * All Queue instances must be instantiated by Bro's main thread.
* *
* TODO: Unclear how critical performance is for this qeueue. We could like;y * TODO: Unclear how critical performance is for this qeueue. We could likely
* optimize it further if helpful. * optimize it further if helpful.
*/ */
template<typename T> template<typename T>
@ -71,9 +72,10 @@ public:
*/ */
bool MaybeReady() { return (num_reads != num_writes); } bool MaybeReady() { return (num_reads != num_writes); }
/** Wake up the reader if it's currently blocked for input. This is /**
primarily to give it a chance to check termination quickly. * Wake up the reader if it's currently blocked for input. This is
**/ * primarily to give it a chance to check termination quickly.
*/
void WakeUp(); void WakeUp();
/** /**
@ -94,14 +96,15 @@ public:
* Returns statistics about the queue's usage. * Returns statistics about the queue's usage.
* *
* @param stats A pointer to a structure that will be filled with * @param stats A pointer to a structure that will be filled with
* current numbers. */ * current numbers.
*/
void GetStats(Stats* stats); void GetStats(Stats* stats);
private: private:
static const int NUM_QUEUES = 8; static const int NUM_QUEUES = 8;
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses.
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from int read_ptr; // Where the next operation will read from
@ -115,17 +118,17 @@ private:
uint64_t num_writes; uint64_t num_writes;
}; };
inline static void safe_lock(pthread_mutex_t* mutex) inline static std::unique_lock<std::mutex> safe_lock(std::mutex& m)
{ {
int res = pthread_mutex_lock(mutex); std::unique_lock<std::mutex> lock(m, std::defer_lock);
if ( res != 0 )
reporter->FatalErrorWithCore("cannot lock mutex: %d(%s)", res, strerror(res)); try {
lock.lock();
} catch(const std::system_error& e) {
reporter->FatalErrorWithCore("cannot lock mutex: %s", e.what());
} }
inline static void safe_unlock(pthread_mutex_t* mutex) return lock;
{
if ( pthread_mutex_unlock(mutex) != 0 )
reporter->FatalErrorWithCore("cannot unlock mutex");
} }
template<typename T> template<typename T>
@ -136,50 +139,28 @@ inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
num_reads = num_writes = 0; num_reads = num_writes = 0;
reader = arg_reader; reader = arg_reader;
writer = arg_writer; writer = arg_writer;
for( int i = 0; i < NUM_QUEUES; ++i )
{
if ( pthread_cond_init(&has_data[i], 0) != 0 )
reporter->FatalError("cannot init queue condition variable");
if ( pthread_mutex_init(&mutex[i], 0) != 0 )
reporter->FatalError("cannot init queue mutex");
}
} }
template<typename T> template<typename T>
inline Queue<T>::~Queue() inline Queue<T>::~Queue()
{ {
for( int i = 0; i < NUM_QUEUES; ++i )
{
pthread_cond_destroy(&has_data[i]);
pthread_mutex_destroy(&mutex[i]);
}
} }
template<typename T> template<typename T>
inline T Queue<T>::Get() inline T Queue<T>::Get()
{ {
safe_lock(&mutex[read_ptr]); auto lock = safe_lock(mutex[read_ptr]);
int old_read_ptr = read_ptr; int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) ) if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) )
{ {
struct timespec ts; if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout )
ts.tv_sec = time(0) + 5; return nullptr;
ts.tv_nsec = 0;
pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts);
safe_unlock(&mutex[read_ptr]);
return 0;
} }
else if ( messages[read_ptr].empty() ) if ( messages[read_ptr].empty() )
{ return nullptr;
safe_unlock(&mutex[read_ptr]);
return 0;
}
T data = messages[read_ptr].front(); T data = messages[read_ptr].front();
messages[read_ptr].pop(); messages[read_ptr].pop();
@ -187,15 +168,13 @@ inline T Queue<T>::Get()
read_ptr = (read_ptr + 1) % NUM_QUEUES; read_ptr = (read_ptr + 1) % NUM_QUEUES;
++num_reads; ++num_reads;
safe_unlock(&mutex[old_read_ptr]);
return data; return data;
} }
template<typename T> template<typename T>
inline void Queue<T>::Put(T data) inline void Queue<T>::Put(T data)
{ {
safe_lock(&mutex[write_ptr]); auto lock = safe_lock(mutex[write_ptr]);
int old_write_ptr = write_ptr; int old_write_ptr = write_ptr;
@ -203,25 +182,24 @@ inline void Queue<T>::Put(T data)
messages[write_ptr].push(data); messages[write_ptr].push(data);
if ( need_signal )
pthread_cond_signal(&has_data[write_ptr]);
write_ptr = (write_ptr + 1) % NUM_QUEUES; write_ptr = (write_ptr + 1) % NUM_QUEUES;
++num_writes; ++num_writes;
safe_unlock(&mutex[old_write_ptr]); if ( need_signal )
{
lock.unlock();
has_data[old_write_ptr].notify_one();
}
} }
template<typename T> template<typename T>
inline bool Queue<T>::Ready() inline bool Queue<T>::Ready()
{ {
safe_lock(&mutex[read_ptr]); auto lock = safe_lock(mutex[read_ptr]);
bool ret = (messages[read_ptr].size()); bool ret = (messages[read_ptr].size());
safe_unlock(&mutex[read_ptr]);
return ret; return ret;
} }
@ -229,17 +207,15 @@ template<typename T>
inline uint64_t Queue<T>::Size() inline uint64_t Queue<T>::Size()
{ {
// Need to lock all queues. // Need to lock all queues.
std::vector<std::unique_lock<std::mutex>> locks;
for ( int i = 0; i < NUM_QUEUES; i++ ) for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]); locks.push_back(safe_lock(mutex[i]));
uint64_t size = 0; uint64_t size = 0;
for ( int i = 0; i < NUM_QUEUES; i++ ) for ( int i = 0; i < NUM_QUEUES; i++ )
size += messages[i].size(); size += messages[i].size();
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
return size; return size;
} }
@ -248,29 +224,21 @@ inline void Queue<T>::GetStats(Stats* stats)
{ {
// To be safe, we look all queues. That's probably unneccessary, but // To be safe, we look all queues. That's probably unneccessary, but
// doesn't really hurt. // doesn't really hurt.
std::vector<std::unique_lock<std::mutex>> locks;
for ( int i = 0; i < NUM_QUEUES; i++ ) for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]); locks.push_back(safe_lock(mutex[i]));
stats->num_reads = num_reads; stats->num_reads = num_reads;
stats->num_writes = num_writes; stats->num_writes = num_writes;
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
} }
template<typename T> template<typename T>
inline void Queue<T>::WakeUp() inline void Queue<T>::WakeUp()
{ {
for ( int i = 0; i < NUM_QUEUES; i++ ) for ( int i = 0; i < NUM_QUEUES; i++ )
{ has_data[i].notify_all();
safe_lock(&mutex[i]);
pthread_cond_signal(&has_data[i]);
safe_unlock(&mutex[i]);
}
} }
} }
#endif #endif