Documenting the threading/* classes.

Also switching from semaphores to mutexes as the former don't seem to
be fully supported on MacOS.
This commit is contained in:
Robin Sommer 2012-01-31 23:47:33 -08:00
parent 2df3738d1a
commit a428645b2a
6 changed files with 499 additions and 106 deletions

View file

@ -7,6 +7,8 @@
using namespace threading;
uint64_t BasicThread::thread_counter = 0;
BasicThread::BasicThread(const string& arg_name)
{
started = false;
@ -16,9 +18,7 @@ BasicThread::BasicThread(const string& arg_name)
buf = 0;
buf_len = 1024;
char tmp[128];
snprintf(tmp, sizeof(tmp), "%s@%p", arg_name.c_str(), this);
name = string(tmp);
name = Fmt("%s@%d", arg_name.c_str(), ++thread_counter);
thread_mgr->AddThread(this);
}
@ -53,8 +53,15 @@ const char* BasicThread::Fmt(const char* format, ...)
void BasicThread::Start()
{
if ( sem_init(&terminate, 0, 0) != 0 )
reporter->FatalError("Cannot create terminate semaphore for thread %s", name.c_str());
if ( started )
return;
if ( pthread_mutex_init(&terminate, 0) != 0 )
reporter->FatalError("Cannot create terminate mutex for thread %s", name.c_str());
// We use this like a binary semaphore and acquire it immediately.
if ( pthread_mutex_lock(&terminate) != 0 )
reporter->FatalError("Cannot aquire terminate mutex for thread %s", name.c_str());
if ( pthread_create(&pthread, 0, BasicThread::launcher, this) != 0 )
reporter->FatalError("Cannot create thread %s", name.c_str());
@ -76,8 +83,9 @@ void BasicThread::Stop()
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str());
// Signal that it's ok for the thread to exit now.
if ( sem_post(&terminate) != 0 )
// Signal that it's ok for the thread to exit now by unlocking the
// mutex.
if ( pthread_mutex_unlock(&terminate) != 0 )
reporter->FatalError("Failure flagging terminate condition for thread %s", name.c_str());
terminating = true;
@ -98,7 +106,7 @@ void BasicThread::Join()
if ( pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name.c_str());
sem_destroy(&terminate);
pthread_mutex_destroy(&terminate);
DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str());
@ -120,9 +128,8 @@ void* BasicThread::launcher(void *arg)
thread->Run();
// Wait until somebody actually wants us to terminate.
if ( sem_wait(&thread->terminate) != 0 )
reporter->FatalError("Failure flagging terminate condition for thread %s", thread->Name().c_str());
if ( pthread_mutex_lock(&thread->terminate) != 0 )
reporter->FatalError("Failure acquiring terminate mutex at end of thread %s", thread->Name().c_str());
return 0;
}

View file

@ -14,50 +14,133 @@ namespace threading {
class Manager;
/**
* Base class for all threads.
*
* This class encapsulates all the OS-level thread handling. All thread
* instances are automatically added to the threading::Manager for management. The
* manager also takes care of deleting them (which must not be done
* manually).
*/
class BasicThread
{
public:
/**
* Creates a new thread object. Instantiating the object does however
* not yet start the actual OS thread, that requires calling Start().
*
* Only Bro's main thread may create new thread instances.
*
* @param name A descriptive name for thread the thread. This may
* show up in messages to the user.
*/
BasicThread(const string& name); // Managed by manager, must not delete otherwise.
virtual ~BasicThread();
/**
* Returns a descriptive name for the thread. This is the name passed
* into the constructor.
*
* This method is safe to call from any thread.
*/
const string& Name() const { return name; }
void Start(); // Spawns the thread and enters Run().
void Stop(); // Signals the thread to terminate.
/**
* Starts the thread. Calling this methods will spawn a new OS thread
* executing Run(). Note that one can't restart a thread after a
* Stop(), doing so will be ignored.
*
* Only Bro's main thread must call this method.
*/
void Start();
/**
* Signals the thread to stop. The method lets Terminating() now
* return true. It does however not force the thread to terminate.
* It's up to the Run() method to to query Terminating() and exit
* eventually.
*
* Calling this method has no effect if Start() hasn't been executed
* yet.
*
* Only Bro's main thread must call this method.
*/
void Stop();
/**
* Returns true if Terminate() has been called.
*
* This method is safe to call from any thread.
*/
bool Terminating() const { return terminating; }
// A thread-safe version of fmt().
/**
* A version of fmt() that the thread can safely use.
*
* This is safe to call from Run() but must not be used from any
* other thread than the current one.
*/
const char* Fmt(const char* format, ...);
protected:
virtual void Run() = 0;
virtual void OnStart() {}
virtual void OnStop() {}
private:
friend class Manager;
/**
* Entry point for the thread. This must be overridden by derived
* classes and will execute in a separate thread once Start() is
* called. The thread will not terminate before this method finishes.
* An implementation should regularly check Terminating() to see if
* exiting has been requested.
*/
virtual void Run() = 0;
/**
* Executed with Start(). This is a hook into starting the thread. It
* will be called from Bro's main thread after the OS thread has been
* started.
*/
virtual void OnStart() {}
/**
* Executed with Stop(). This is a hook into stopping the thread. It
* will be called from Bro's main thread after the thread has been
* signaled to stop.
*/
virtual void OnStop() {}
/**
* Destructor. This will be called by the manager.
*
* Only Bro's main thread may delete thread instances.
*
*/
virtual ~BasicThread();
/**
* Waits until the thread's Run() method has finished and then joins
* it. This is called from the threading::Manager.
*/
void Join();
private:
// pthread entry function.
static void* launcher(void *arg);
// Used from the ThreadMgr.
void Join(); // Waits until the thread has terminated and then joins it.
string name;
pthread_t pthread;
bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination.
string name;
pthread_t pthread;
sem_t terminate;
// Used as a semaphore to tell the pthread thread when it may
// terminate.
pthread_mutex_t terminate;
// For implementing Fmt().
char* buf;
unsigned int buf_len;
static uint64_t thread_counter;
};
}
extern threading::Manager* thread_mgr;
#endif

View file

@ -11,25 +11,78 @@
namespace threading {
/**
* The thread manager coordinates all child threads. Once a BasicThread is
* instantitated, it gets addedd to the manager, which will delete it later
* once it has terminated.
*
* In addition to basic threads, the manager also provides additional
* functionality specific to MsgThread instances. In particular, it polls
* their outgoing message queue on a regular basis and feeds data sent into
* the rest of Bro. It also triggers the regular heartbeats.
*/
class Manager : public IOSource
{
public:
/**
* Constructor. Only a single instance of the manager must be
* created.
*/
Manager();
/**
* Destructir.
*/
~Manager();
/**
* Terminates the manager's processor. The method signals all threads
* to terminates and wait for them to do so. It then joins them and
* returns to the caller. Afterwards, no more thread instances may be
* created.
*/
void Terminate();
protected:
friend class BasicThread;
friend class MsgThread;
/**
* Registers a new basic thread with the manager. This is
* automatically called by the thread's constructor.
*
* @param thread The thread.
*/
void AddThread(BasicThread* thread);
/**
* Registers a new message thread with the manager. This is
* automatically called by the thread's constructor. This must be
* called \a in \a addition to AddThread(BasicThread* thread). The
* MsgThread constructor makes sure to do so.
*
* @param thread The thread.
*/
void AddMsgThread(MsgThread* thread);
// IOSource interface.
/**
* Part of the IOSource interface.
*/
virtual void GetFds(int* read, int* write, int* except);
/**
* Part of the IOSource interface.
*/
virtual double NextTimestamp(double* network_time);
/**
* Part of the IOSource interface.
*/
virtual void Process();
/**
* Part of the IOSource interface.
*/
virtual const char* Tag() { return "threading::Manager"; }
private:
@ -41,12 +94,16 @@ private:
typedef std::list<MsgThread*> msg_thread_list;
msg_thread_list msg_threads;
bool did_process;
double next_beat;
bool did_process; // True if the last Process() found some work to do.
double next_beat; // Timestamp when the next heartbeat will be sent.
};
}
/**
* A singleton instance of the thread manager. All methods must only be
* called from Bro's main thread.
*/
extern threading::Manager* thread_mgr;
#endif

View file

@ -6,25 +6,13 @@
using namespace threading;
static void strreplace(const string& s, const string& o, const string& n)
{
string r = s;
while ( true )
{
size_t i = r.find(o);
if ( i == std::string::npos )
break;
r.replace(i, o.size(), n);
}
}
namespace threading {
// Standard messages.
////// Messages.
// Signals child thread to terminate. This is actually a no-op; its only
// purpose is unblock the current read operation so that the child's Run()
// methods can check the termination status.
class TerminateMessage : public InputMessage<MsgThread>
{
public:
@ -33,6 +21,22 @@ public:
virtual bool Process() { return true; }
};
/// Sends a heartbeat to the child thread.
class HeartbeatMessage : public InputMessage<MsgThread>
{
public:
HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time)
: InputMessage<MsgThread>("Heartbeat", thread)
{ network_time = arg_network_time; current_time = arg_current_time; }
virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); }
private:
double network_time;
double current_time;
};
// A message from the child to be passed on to the Reporter.
class ReporterMessage : public OutputMessage<MsgThread>
{
public:
@ -52,21 +56,8 @@ private:
Type type;
};
class HeartbeatMessage : public InputMessage<MsgThread>
{
public:
HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time)
: InputMessage<MsgThread>("Heartbeat", thread)
{ network_time = arg_network_time; current_time = arg_current_time; }
virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); }
private:
double network_time;
double current_time;
};
#ifdef DEBUG
// A debug message from the child to be passed on to the DebugLogger.
class DebugMessage : public OutputMessage<MsgThread>
{
public:
@ -77,8 +68,7 @@ public:
virtual bool Process()
{
string s = Object()->Name() + ": " + msg;
strreplace(s, "%", "%%");
debug_logger.Log(stream, s.c_str());
debug_logger.Log(stream, "%s", s.c_str());
return true;
}
private:
@ -89,7 +79,7 @@ private:
}
// Methods.
////// Methods.
Message::~Message()
{

View file

@ -15,121 +15,332 @@ class BasicInputMessage;
class BasicOutputMessage;
class HeartbeatMessage;
/**
* A specialized thread that provides bi-directional message passing between
* Bro's main thread and the child thread. Messages are instances of
* BasicInputMessage and BasicOutputMessage for message sent \a to the child
* thread and received \a from the child thread, respectively.
*
* The thread's Run() method implements main loop that processes incoming
* messages until Terminating() indicates that execution should stop. Once
* that happens, the thread stops accepting any new messages, finishes
* processes all remaining ones still in the queue, and then exits.
*/
class MsgThread : public BasicThread
{
public:
/**
* Constructor. It automatically registers the thread with the
* threading::Manager.
*
* Only Bro's main thread may instantiate a new thread.
*
* @param name A descriptive name. This is passed on to BasicThread().
*/
MsgThread(const string& name);
/**
* Sends a message to the child thread. The message will be proceesed
* once the thread has retrieved it from its incoming queue.
*
* Only the main thread may call this method.
*
* @param msg The message.
*/
void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); }
/**
* Sends a message from the child thread to the main thread.
*
* Only the child thread may call this method.
*
* @param msg The mesasge.
*/
void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); }
BasicOutputMessage* RetrieveOut();
// Report an informational message, nothing that needs specific
// attention.
/**
* Reports an informational message from the child thread. The main
* thread will pass this to the Reporter once received.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void Info(const char* msg);
// Report a warning that may indicate a problem.
/**
* Reports a warning from the child thread that may indicate a
* problem. The main thread will pass this to the Reporter once
* received.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void Warning(const char* msg);
// Report a non-fatal error. Processing proceeds normally after the error
// has been reported.
/**
* Reports a non-fatal error from the child thread. The main thread
* will pass this to the Reporter once received. Processing proceeds
* normally after the error has been reported.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void Error(const char* msg);
// Report a fatal error. Bro will terminate after the message has been
// reported.
/**
* Reports a fatal error from the child thread. The main thread will
* pass this to the Reporter once received. Bro will terminate after
* the message has been reported.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void FatalError(const char* msg);
// Report a fatal error. Bro will terminate after the message has been
// reported and always generate a core dump.
/**
* Reports a fatal error from the child thread. The main thread will
* pass this to the Reporter once received. Bro will terminate with a
* core dump after the message has been reported.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void FatalErrorWithCore(const char* msg);
// Report about a potential internal problem. Bro will continue
// normally.
/**
* Reports a potential internal problem from the child thread. The
* main thread will pass this to the Reporter once received. Bro will
* continue normally.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void InternalWarning(const char* msg);
// Report an internal program error. Bro will terminate with a core
// dump after the message has been reported.
/**
* Reports an internal program error from the child thread. The main
* thread will pass this to the Reporter once received. Bro will
* terminate with a core dump after the message has been reported.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void InternalError(const char* msg);
#ifdef DEBUG
// Records a debug message for the given stream.
/**
* Records a debug message for the given stream from the child
* thread. The main thread will pass this to the DebugLogger once
* received.
*
* Only the child thread may call this method.
*
* @param msg The message. It will be prefixed with the thread's name.
*/
void Debug(DebugStream stream, const char* msg);
#endif
void Heartbeat();
/**
* Statistics about inter-thread communication.
*/
struct Stats
{
uint64_t sent_in;
uint64_t sent_out;
uint64_t pending_in;
uint64_t pending_out;
uint64_t sent_in; //! Number of messages sent to the child thread.
uint64_t sent_out; //! Number of messages sent from the child thread to the main thread
uint64_t pending_in; //! Number of messages sent to the child but not yet processed.
uint64_t pending_out; //! Number of messages sent from the child but not yet processed by the main thread.
};
/**
* Returns statistics about the inter-thread communication.
*
* @param stats A pointer to a structure that will be filled with
* current numbers.
*/
void GetStats(Stats* stats);
protected:
friend class Manager;
friend class HeartbeatMessage;
/**
* Pops a message sent by the child from the child-to-main queue.
*
* This is method is called regularly by the threading::Manager.
*
* @return The message, wth ownership passed to caller. Returns null
* if the queue is empty.
*/
BasicOutputMessage* RetrieveOut();
/**
* Triggers a heartbeat message being sent to the client thread.
*
* This is method is called regularly by the threading::Manager.
*/
void Heartbeat();
/**
* Overriden from BasicThread.
*
*/
virtual void Run();
virtual void OnStop();
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
private:
friend class Manager;
/**
* Pops a message sent by the main thread from the main-to-chold
* queue.
*
* Must only be called by the child thread.
*
* @return The message, wth ownership passed to caller. Returns null
* if the queue is empty.
*/
BasicInputMessage* RetrieveIn();
/**
* Queues a message for the child.
*
* Must only be called by the main thread.
*
* @param msg The message.
*
* @param force: If true, the message will be queued even when we're already
* Terminating(). Normally, the message would be discarded in that
* case.
*/
void SendIn(BasicInputMessage* msg, bool force);
/**
* Queues a message for the main thread.
*
* Must only be called by the child thread.
*
* @param msg The message.
*
* @param force: If true, the message will be queued even when we're already
* Terminating(). Normally, the message would be discarded in that
* case.
*/
void SendOut(BasicOutputMessage* msg, bool force);
/**
* Returns true if there's at least one message pending for the child
* thread.
*/
bool HasIn() { return queue_in.Ready(); }
/**
* Returns true if there's at least one message pending for the main
* thread.
*/
bool HasOut() { return queue_out.Ready(); }
Queue_<BasicInputMessage *> queue_in;
Queue_<BasicOutputMessage *> queue_out;
Queue<BasicInputMessage *> queue_in;
Queue<BasicOutputMessage *> queue_out;
uint64_t cnt_sent_in;
uint64_t cnt_sent_out;
uint64_t cnt_sent_in; // Counts message sent to child.
uint64_t cnt_sent_out; // Counts message sent by child.
};
/**
* Base class for all message between Bro's main process and a MsgThread.
*/
class Message
{
public:
/**
* Destructor.
*/
virtual ~Message();
/**
* Returns a descriptive name for the message's general type. This is
* what's passed into the constructor and used mainly for debugging
* purposes.
*/
const string& Name() const { return name; }
/**
* Callback that must be overriden for processing a message.
*/
virtual bool Process() = 0; // Thread will be terminated if returngin false.
protected:
/**
* Constructor.
*
* @param arg_name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
Message(const string& arg_name) { name = arg_name; }
private:
string name;
};
/**
* Base class for messages sent from Bro's main thread to a child MsgThread.
*/
class BasicInputMessage : public Message
{
protected:
/**
* Constructor.
*
* @param name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
BasicInputMessage(const string& name) : Message(name) {}
};
/**
* Base class for messages sent from a child MsgThread to Bro's main thread.
*/
class BasicOutputMessage : public Message
{
protected:
/**
* Constructor.
*
* @param name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
BasicOutputMessage(const string& name) : Message(name) {}
};
/**
* A paremeterized InputMessage that stores a pointer to an argument object.
* Normally, the objects will be used from the Process() callback.
*/
template<typename O>
class InputMessage : public BasicInputMessage
{
public:
/**
* Returns the objects passed to the constructor.
*/
O* Object() const { return object; }
protected:
/**
* Constructor.
*
* @param name: A descriptive name for the type of message. Used
* mainly for debugging purposes.
*
* @param arg_object: An object to store with the message.
*/
InputMessage(const string& name, O* arg_object) : BasicInputMessage(name)
{ object = arg_object; }
@ -137,13 +348,28 @@ private:
O* object;
};
/**
* A paremeterized OututMessage that stores a pointer to an argument object.
* Normally, the objects will be used from the Process() callback.
*/
template<typename O>
class OutputMessage : public BasicOutputMessage
{
public:
/**
* Returns the objects passed to the constructor.
*/
O* Object() const { return object; }
protected:
/**
* Constructor.
*
* @param name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*
* @param arg_object An object to store with the message.
*/
OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name)
{ object = arg_object; }

View file

@ -9,23 +9,53 @@
#include "Reporter.h"
#undef Queue // Defined elsewhere unfortunately.
namespace threading {
/**
* Just a simple threaded queue wrapper class. Uses multiple queues and reads / writes in rotary fashion in an attempt to limit contention.
* Due to locking granularity, bulk put / get is no faster than single put / get as long as FIFO guarantee is required.
* A thread-safe single-reader single-writer queue.
*
* The implementation uses multiple queues and reads/writes in rotary fashion
* in an attempt to limit contention.
*
* All Queue instances must be instantiated by Bro's main thread.
*
* TODO: Unclear how critical performance is for this qeueue. We could like;y
* optimize it further if helpful.
*/
template<typename T>
class Queue_
class Queue
{
public:
Queue_();
~Queue_();
/**
* Constructor.
*/
Queue();
/**
* Destructor.
*/
~Queue();
/**
* Retrieves one elment.
*/
T Get();
/**
* Queues one element.
*/
void Put(T data);
/**
* Returns true if the next Get() operation will succeed.
*/
bool Ready();
/**
* Returns the number of queued items not yet retrieved.
*/
uint64_t Size();
private:
@ -37,7 +67,7 @@ private:
int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to
uint64_t size;
uint64_t size; // Current queue size.
};
inline static void safe_lock(pthread_mutex_t* mutex)
@ -53,7 +83,7 @@ inline static void safe_unlock(pthread_mutex_t* mutex)
}
template<typename T>
inline Queue_<T>::Queue_()
inline Queue<T>::Queue()
{
read_ptr = 0;
write_ptr = 0;
@ -69,7 +99,7 @@ inline Queue_<T>::Queue_()
}
template<typename T>
inline Queue_<T>::~Queue_()
inline Queue<T>::~Queue()
{
for( int i = 0; i < NUM_QUEUES; ++i )
{
@ -79,7 +109,7 @@ inline Queue_<T>::~Queue_()
}
template<typename T>
inline T Queue_<T>::Get()
inline T Queue<T>::Get()
{
safe_lock(&mutex[read_ptr]);
@ -100,7 +130,7 @@ inline T Queue_<T>::Get()
}
template<typename T>
inline void Queue_<T>::Put(T data)
inline void Queue<T>::Put(T data)
{
safe_lock(&mutex[write_ptr]);
@ -121,7 +151,7 @@ inline void Queue_<T>::Put(T data)
template<typename T>
inline bool Queue_<T>::Ready()
inline bool Queue<T>::Ready()
{
safe_lock(&mutex[read_ptr]);
@ -133,7 +163,7 @@ inline bool Queue_<T>::Ready()
}
template<typename T>
inline uint64_t Queue_<T>::Size()
inline uint64_t Queue<T>::Size()
{
safe_lock(&mutex[read_ptr]);