diff --git a/src/input/readers/raw/Plugin.cc b/src/input/readers/raw/Plugin.cc index c7af84e34e..e16a233fe6 100644 --- a/src/input/readers/raw/Plugin.cc +++ b/src/input/readers/raw/Plugin.cc @@ -8,7 +8,6 @@ using namespace plugin::Bro_RawReader; Plugin::Plugin() { - init = false; } plugin::Configuration Plugin::Configure() @@ -23,21 +22,14 @@ plugin::Configuration Plugin::Configure() void Plugin::InitPreScript() { - if ( pthread_mutex_init(&fork_mutex, 0) != 0 ) - reporter->FatalError("cannot initialize raw reader's mutex"); - - init = true; } void Plugin::Done() { - pthread_mutex_destroy(&fork_mutex); - init = false; } -pthread_mutex_t* Plugin::ForkMutex() +std::unique_lock Plugin::ForkMutex() { - assert(init); - return &fork_mutex; + return std::unique_lock(fork_mutex, std::defer_lock); } diff --git a/src/input/readers/raw/Plugin.h b/src/input/readers/raw/Plugin.h index 59a5dfd2be..7b621c9440 100644 --- a/src/input/readers/raw/Plugin.h +++ b/src/input/readers/raw/Plugin.h @@ -1,8 +1,9 @@ -// See the file in the main distribution directory for copyright. +// See the file in the main distribution directory for copyright. #include "plugin/Plugin.h" #include "Raw.h" +#include namespace plugin { namespace Bro_RawReader { @@ -16,11 +17,10 @@ public: virtual void InitPreScript(); virtual void Done(); - pthread_mutex_t * ForkMutex(); + std::unique_lock ForkMutex(); private: - bool init; - pthread_mutex_t fork_mutex; + std::mutex fork_mutex; }; diff --git a/src/input/readers/raw/Raw.cc b/src/input/readers/raw/Raw.cc index cfa7b72602..c892c207cc 100644 --- a/src/input/readers/raw/Raw.cc +++ b/src/input/readers/raw/Raw.cc @@ -96,24 +96,16 @@ bool Raw::SetFDFlags(int fd, int cmd, int flags) } -bool Raw::LockForkMutex() +std::unique_lock Raw::AcquireForkMutex() { - int res = pthread_mutex_lock(plugin::Bro_RawReader::plugin.ForkMutex()); - if ( res == 0 ) - return true; - - Error(Fmt("cannot lock fork mutex: %d", res)); - return false; + auto lock = plugin::Bro_RawReader::plugin.ForkMutex(); + try { + lock.lock(); + } catch(const std::system_error& e) { + reporter->FatalErrorWithCore("cannot lock fork mutex: %s", e.what()); } -bool Raw::UnlockForkMutex() - { - int res = pthread_mutex_unlock(plugin::Bro_RawReader::plugin.ForkMutex()); - if ( res == 0 ) - return true; - - Error(Fmt("cannot unlock fork mutex: %d", res)); - return false; + return lock; } bool Raw::Execute() @@ -126,12 +118,10 @@ bool Raw::Execute() // never crops up... ("never" meaning I haven't seen in it in // hundreds of tests using 50+ threads where before I'd see the issue // w/ just 2 threads ~33% of the time). - if ( ! LockForkMutex() ) - return false; + auto lock = AcquireForkMutex(); if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) { - UnlockForkMutex(); Error(Fmt("Could not open pipe: %d", errno)); return false; } @@ -139,7 +129,6 @@ bool Raw::Execute() childpid = fork(); if ( childpid < 0 ) { - UnlockForkMutex(); Error(Fmt("Could not create child process: %d", errno)); return false; } @@ -208,8 +197,7 @@ bool Raw::Execute() } } - if ( ! UnlockForkMutex() ) - return false; + lock.unlock(); ClosePipeEnd(stdout_out); diff --git a/src/input/readers/raw/Raw.h b/src/input/readers/raw/Raw.h index 2a166ae322..ec50ade8fe 100644 --- a/src/input/readers/raw/Raw.h +++ b/src/input/readers/raw/Raw.h @@ -4,8 +4,8 @@ #define INPUT_READERS_RAW_H #include -#include #include +#include #include "input/ReaderBackend.h" @@ -37,8 +37,7 @@ protected: private: void ClosePipeEnd(int i); bool SetFDFlags(int fd, int cmd, int flags); - bool LockForkMutex(); - bool UnlockForkMutex(); + std::unique_lock AcquireForkMutex(); bool OpenInput(); bool CloseInput(); @@ -87,7 +86,6 @@ private: }; static const int block_size; - static pthread_mutex_t fork_mutex; }; } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 86d7d7b560..ea6c8c433b 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -5,6 +5,7 @@ #include "bro-config.h" #include "BasicThread.h" #include "Manager.h" +#include "pthread.h" #ifdef HAVE_LINUX #include @@ -21,7 +22,6 @@ BasicThread::BasicThread() started = false; terminating = false; killed = false; - pthread = 0; buf_len = STD_FMT_BUF_LEN; buf = (char*) safe_malloc(buf_len); @@ -50,6 +50,7 @@ void BasicThread::SetName(const char* arg_name) void BasicThread::SetOSName(const char* arg_name) { + static_assert(std::is_same::value, "libstdc++ doesn't use pthread_t"); #ifdef HAVE_LINUX prctl(PR_SET_NAME, arg_name, 0, 0, 0); @@ -60,7 +61,7 @@ void BasicThread::SetOSName(const char* arg_name) #endif #ifdef FREEBSD - pthread_set_name_np(pthread_self(), arg_name, arg_name); + pthread_set_name_np(thread.native_handle(), arg_name, arg_name); #endif } @@ -108,9 +109,7 @@ void BasicThread::Start() started = true; - int err = pthread_create(&pthread, 0, BasicThread::launcher, this); - if ( err != 0 ) - reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err)); + thread = std::thread(&BasicThread::launcher, this); DBG_LOG(DBG_THREADING, "Started thread %s", name); @@ -147,17 +146,18 @@ void BasicThread::Join() if ( ! started ) return; - if ( ! pthread ) + if ( ! thread.joinable() ) return; assert(terminating); - if ( pthread_join(pthread, 0) != 0 ) - reporter->FatalError("Failure joining thread %s", name); + try { + thread.join(); + } catch(const std::system_error& e) { + reporter->FatalError("Failure joining thread %s with error %s", name, e.what()); + } DBG_LOG(DBG_THREADING, "Joined with thread %s", name); - - pthread = 0; } void BasicThread::Kill() @@ -180,6 +180,7 @@ void BasicThread::Done() void* BasicThread::launcher(void *arg) { + static_assert(std::is_same::value, "libstdc++ doesn't use pthread_t"); BasicThread* thread = (BasicThread *)arg; // Block signals in thread. We handle signals only in the main diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 6386e5ae66..ea829fce54 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -2,8 +2,7 @@ #ifndef THREADING_BASICTHREAD_H #define THREADING_BASICTHREAD_H -#include -#include +#include #include "util.h" @@ -35,6 +34,9 @@ public: */ BasicThread(); + BasicThread(BasicThread const&) = delete; + BasicThread& operator =(BasicThread const&) = delete; + /** * Returns a descriptive name for the thread. If not set via * SetName(). If not set, a default name is choosen automatically. @@ -192,11 +194,11 @@ protected: void Done(); private: - // pthread entry function. + // thread entry function. static void* launcher(void *arg); const char* name; - pthread_t pthread; + std::thread thread; bool started; // Set to to true once running. bool terminating; // Set to to true to signal termination. bool killed; // Set to true once forcefully killed. diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 96da68e1d0..480ab2974c 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -2,8 +2,6 @@ #ifndef THREADING_MSGTHREAD_H #define THREADING_MSGTHREAD_H -#include - #include "DebugLogger.h" #include "BasicThread.h" diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 6d21bfd998..48b41ad81a 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -1,7 +1,8 @@ #ifndef THREADING_QUEUE_H #define THREADING_QUEUE_H -#include +#include +#include #include #include #include @@ -22,7 +23,7 @@ namespace threading { * * All Queue instances must be instantiated by Bro's main thread. * - * TODO: Unclear how critical performance is for this qeueue. We could like;y + * TODO: Unclear how critical performance is for this qeueue. We could likely * optimize it further if helpful. */ template @@ -71,9 +72,10 @@ public: */ bool MaybeReady() { return (num_reads != num_writes); } - /** Wake up the reader if it's currently blocked for input. This is - primarily to give it a chance to check termination quickly. - **/ + /** + * Wake up the reader if it's currently blocked for input. This is + * primarily to give it a chance to check termination quickly. + */ void WakeUp(); /** @@ -94,14 +96,15 @@ public: * Returns statistics about the queue's usage. * * @param stats A pointer to a structure that will be filled with - * current numbers. */ + * current numbers. + */ void GetStats(Stats* stats); private: static const int NUM_QUEUES = 8; - pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. - pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available + std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses. + std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available std::queue messages[NUM_QUEUES]; // Actually holds the queued messages int read_ptr; // Where the next operation will read from @@ -115,17 +118,17 @@ private: uint64_t num_writes; }; -inline static void safe_lock(pthread_mutex_t* mutex) +inline static std::unique_lock safe_lock(std::mutex& m) { - int res = pthread_mutex_lock(mutex); - if ( res != 0 ) - reporter->FatalErrorWithCore("cannot lock mutex: %d(%s)", res, strerror(res)); + std::unique_lock lock(m, std::defer_lock); + + try { + lock.lock(); + } catch(const std::system_error& e) { + reporter->FatalErrorWithCore("cannot lock mutex: %s", e.what()); } -inline static void safe_unlock(pthread_mutex_t* mutex) - { - if ( pthread_mutex_unlock(mutex) != 0 ) - reporter->FatalErrorWithCore("cannot unlock mutex"); + return lock; } template @@ -136,50 +139,28 @@ inline Queue::Queue(BasicThread* arg_reader, BasicThread* arg_writer) num_reads = num_writes = 0; reader = arg_reader; writer = arg_writer; - - for( int i = 0; i < NUM_QUEUES; ++i ) - { - if ( pthread_cond_init(&has_data[i], 0) != 0 ) - reporter->FatalError("cannot init queue condition variable"); - - if ( pthread_mutex_init(&mutex[i], 0) != 0 ) - reporter->FatalError("cannot init queue mutex"); - } } template inline Queue::~Queue() { - for( int i = 0; i < NUM_QUEUES; ++i ) - { - pthread_cond_destroy(&has_data[i]); - pthread_mutex_destroy(&mutex[i]); - } } template inline T Queue::Get() { - safe_lock(&mutex[read_ptr]); + auto lock = safe_lock(mutex[read_ptr]); int old_read_ptr = read_ptr; if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) ) { - struct timespec ts; - ts.tv_sec = time(0) + 5; - ts.tv_nsec = 0; - - pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts); - safe_unlock(&mutex[read_ptr]); - return 0; + if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout ) + return nullptr; } - else if ( messages[read_ptr].empty() ) - { - safe_unlock(&mutex[read_ptr]); - return 0; - } + if ( messages[read_ptr].empty() ) + return nullptr; T data = messages[read_ptr].front(); messages[read_ptr].pop(); @@ -187,15 +168,13 @@ inline T Queue::Get() read_ptr = (read_ptr + 1) % NUM_QUEUES; ++num_reads; - safe_unlock(&mutex[old_read_ptr]); - return data; } template inline void Queue::Put(T data) { - safe_lock(&mutex[write_ptr]); + auto lock = safe_lock(mutex[write_ptr]); int old_write_ptr = write_ptr; @@ -203,25 +182,24 @@ inline void Queue::Put(T data) messages[write_ptr].push(data); - if ( need_signal ) - pthread_cond_signal(&has_data[write_ptr]); - write_ptr = (write_ptr + 1) % NUM_QUEUES; ++num_writes; - safe_unlock(&mutex[old_write_ptr]); + if ( need_signal ) + { + lock.unlock(); + has_data[old_write_ptr].notify_one(); + } } template inline bool Queue::Ready() { - safe_lock(&mutex[read_ptr]); + auto lock = safe_lock(mutex[read_ptr]); bool ret = (messages[read_ptr].size()); - safe_unlock(&mutex[read_ptr]); - return ret; } @@ -229,17 +207,15 @@ template inline uint64_t Queue::Size() { // Need to lock all queues. + std::vector> locks; for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_lock(&mutex[i]); + locks.push_back(safe_lock(mutex[i])); uint64_t size = 0; for ( int i = 0; i < NUM_QUEUES; i++ ) size += messages[i].size(); - for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_unlock(&mutex[i]); - return size; } @@ -248,29 +224,21 @@ inline void Queue::GetStats(Stats* stats) { // To be safe, we look all queues. That's probably unneccessary, but // doesn't really hurt. + std::vector> locks; for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_lock(&mutex[i]); + locks.push_back(safe_lock(mutex[i])); stats->num_reads = num_reads; stats->num_writes = num_writes; - - for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_unlock(&mutex[i]); } template inline void Queue::WakeUp() { for ( int i = 0; i < NUM_QUEUES; i++ ) - { - safe_lock(&mutex[i]); - pthread_cond_signal(&has_data[i]); - safe_unlock(&mutex[i]); - } + has_data[i].notify_all(); } } - #endif -