diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 273a192de3..f7bd2afbcd 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -7,6 +7,8 @@ using namespace threading; +uint64_t BasicThread::thread_counter = 0; + BasicThread::BasicThread(const string& arg_name) { started = false; @@ -16,9 +18,7 @@ BasicThread::BasicThread(const string& arg_name) buf = 0; buf_len = 1024; - char tmp[128]; - snprintf(tmp, sizeof(tmp), "%s@%p", arg_name.c_str(), this); - name = string(tmp); + name = Fmt("%s@%d", arg_name.c_str(), ++thread_counter); thread_mgr->AddThread(this); } @@ -53,8 +53,15 @@ const char* BasicThread::Fmt(const char* format, ...) void BasicThread::Start() { - if ( sem_init(&terminate, 0, 0) != 0 ) - reporter->FatalError("Cannot create terminate semaphore for thread %s", name.c_str()); + if ( started ) + return; + + if ( pthread_mutex_init(&terminate, 0) != 0 ) + reporter->FatalError("Cannot create terminate mutex for thread %s", name.c_str()); + + // We use this like a binary semaphore and acquire it immediately. + if ( pthread_mutex_lock(&terminate) != 0 ) + reporter->FatalError("Cannot aquire terminate mutex for thread %s", name.c_str()); if ( pthread_create(&pthread, 0, BasicThread::launcher, this) != 0 ) reporter->FatalError("Cannot create thread %s", name.c_str()); @@ -76,8 +83,9 @@ void BasicThread::Stop() DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str()); - // Signal that it's ok for the thread to exit now. - if ( sem_post(&terminate) != 0 ) + // Signal that it's ok for the thread to exit now by unlocking the + // mutex. + if ( pthread_mutex_unlock(&terminate) != 0 ) reporter->FatalError("Failure flagging terminate condition for thread %s", name.c_str()); terminating = true; @@ -98,7 +106,7 @@ void BasicThread::Join() if ( pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name.c_str()); - sem_destroy(&terminate); + pthread_mutex_destroy(&terminate); DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); @@ -120,9 +128,8 @@ void* BasicThread::launcher(void *arg) thread->Run(); // Wait until somebody actually wants us to terminate. - - if ( sem_wait(&thread->terminate) != 0 ) - reporter->FatalError("Failure flagging terminate condition for thread %s", thread->Name().c_str()); + if ( pthread_mutex_lock(&thread->terminate) != 0 ) + reporter->FatalError("Failure acquiring terminate mutex at end of thread %s", thread->Name().c_str()); return 0; } diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 30a11b4505..df5665c464 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -14,50 +14,133 @@ namespace threading { class Manager; +/** + * Base class for all threads. + * + * This class encapsulates all the OS-level thread handling. All thread + * instances are automatically added to the threading::Manager for management. The + * manager also takes care of deleting them (which must not be done + * manually). + */ class BasicThread { public: + /** + * Creates a new thread object. Instantiating the object does however + * not yet start the actual OS thread, that requires calling Start(). + * + * Only Bro's main thread may create new thread instances. + * + * @param name A descriptive name for thread the thread. This may + * show up in messages to the user. + */ BasicThread(const string& name); // Managed by manager, must not delete otherwise. - virtual ~BasicThread(); + /** + * Returns a descriptive name for the thread. This is the name passed + * into the constructor. + * + * This method is safe to call from any thread. + */ const string& Name() const { return name; } - void Start(); // Spawns the thread and enters Run(). - void Stop(); // Signals the thread to terminate. + /** + * Starts the thread. Calling this methods will spawn a new OS thread + * executing Run(). Note that one can't restart a thread after a + * Stop(), doing so will be ignored. + * + * Only Bro's main thread must call this method. + */ + void Start(); + /** + * Signals the thread to stop. The method lets Terminating() now + * return true. It does however not force the thread to terminate. + * It's up to the Run() method to to query Terminating() and exit + * eventually. + * + * Calling this method has no effect if Start() hasn't been executed + * yet. + * + * Only Bro's main thread must call this method. + */ + void Stop(); + + /** + * Returns true if Terminate() has been called. + * + * This method is safe to call from any thread. + */ bool Terminating() const { return terminating; } - // A thread-safe version of fmt(). + /** + * A version of fmt() that the thread can safely use. + * + * This is safe to call from Run() but must not be used from any + * other thread than the current one. + */ const char* Fmt(const char* format, ...); protected: - virtual void Run() = 0; - - virtual void OnStart() {} - virtual void OnStop() {} - -private: friend class Manager; + /** + * Entry point for the thread. This must be overridden by derived + * classes and will execute in a separate thread once Start() is + * called. The thread will not terminate before this method finishes. + * An implementation should regularly check Terminating() to see if + * exiting has been requested. + */ + virtual void Run() = 0; + + /** + * Executed with Start(). This is a hook into starting the thread. It + * will be called from Bro's main thread after the OS thread has been + * started. + */ + virtual void OnStart() {} + + /** + * Executed with Stop(). This is a hook into stopping the thread. It + * will be called from Bro's main thread after the thread has been + * signaled to stop. + */ + virtual void OnStop() {} + + /** + * Destructor. This will be called by the manager. + * + * Only Bro's main thread may delete thread instances. + * + */ + virtual ~BasicThread(); + + /** + * Waits until the thread's Run() method has finished and then joins + * it. This is called from the threading::Manager. + */ + void Join(); + +private: + // pthread entry function. static void* launcher(void *arg); - // Used from the ThreadMgr. - void Join(); // Waits until the thread has terminated and then joins it. - + string name; + pthread_t pthread; bool started; // Set to to true once running. bool terminating; // Set to to true to signal termination. - string name; - pthread_t pthread; - sem_t terminate; + // Used as a semaphore to tell the pthread thread when it may + // terminate. + pthread_mutex_t terminate; // For implementing Fmt(). char* buf; unsigned int buf_len; + + static uint64_t thread_counter; }; } -extern threading::Manager* thread_mgr; - #endif diff --git a/src/threading/Manager.h b/src/threading/Manager.h index aa7292ee81..2c4f88fa1e 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -11,25 +11,78 @@ namespace threading { +/** + * The thread manager coordinates all child threads. Once a BasicThread is + * instantitated, it gets addedd to the manager, which will delete it later + * once it has terminated. + * + * In addition to basic threads, the manager also provides additional + * functionality specific to MsgThread instances. In particular, it polls + * their outgoing message queue on a regular basis and feeds data sent into + * the rest of Bro. It also triggers the regular heartbeats. + */ class Manager : public IOSource { public: + /** + * Constructor. Only a single instance of the manager must be + * created. + */ Manager(); + + /** + * Destructir. + */ ~Manager(); + /** + * Terminates the manager's processor. The method signals all threads + * to terminates and wait for them to do so. It then joins them and + * returns to the caller. Afterwards, no more thread instances may be + * created. + */ void Terminate(); protected: friend class BasicThread; friend class MsgThread; + /** + * Registers a new basic thread with the manager. This is + * automatically called by the thread's constructor. + * + * @param thread The thread. + */ void AddThread(BasicThread* thread); + + /** + * Registers a new message thread with the manager. This is + * automatically called by the thread's constructor. This must be + * called \a in \a addition to AddThread(BasicThread* thread). The + * MsgThread constructor makes sure to do so. + * + * @param thread The thread. + */ void AddMsgThread(MsgThread* thread); - // IOSource interface. + /** + * Part of the IOSource interface. + */ virtual void GetFds(int* read, int* write, int* except); + + /** + * Part of the IOSource interface. + */ virtual double NextTimestamp(double* network_time); + + /** + * Part of the IOSource interface. + */ virtual void Process(); + + /** + * Part of the IOSource interface. + */ virtual const char* Tag() { return "threading::Manager"; } private: @@ -41,12 +94,16 @@ private: typedef std::list msg_thread_list; msg_thread_list msg_threads; - bool did_process; - double next_beat; + bool did_process; // True if the last Process() found some work to do. + double next_beat; // Timestamp when the next heartbeat will be sent. }; } +/** + * A singleton instance of the thread manager. All methods must only be + * called from Bro's main thread. + */ extern threading::Manager* thread_mgr; #endif diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index d78c7533a3..455c177df6 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -6,25 +6,13 @@ using namespace threading; -static void strreplace(const string& s, const string& o, const string& n) - { - string r = s; - - while ( true ) - { - size_t i = r.find(o); - - if ( i == std::string::npos ) - break; - - r.replace(i, o.size(), n); - } - } - namespace threading { -// Standard messages. +////// Messages. +// Signals child thread to terminate. This is actually a no-op; its only +// purpose is unblock the current read operation so that the child's Run() +// methods can check the termination status. class TerminateMessage : public InputMessage { public: @@ -33,6 +21,22 @@ public: virtual bool Process() { return true; } }; +/// Sends a heartbeat to the child thread. +class HeartbeatMessage : public InputMessage +{ +public: + HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time) + : InputMessage("Heartbeat", thread) + { network_time = arg_network_time; current_time = arg_current_time; } + + virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); } + +private: + double network_time; + double current_time; +}; + +// A message from the child to be passed on to the Reporter. class ReporterMessage : public OutputMessage { public: @@ -52,21 +56,8 @@ private: Type type; }; -class HeartbeatMessage : public InputMessage -{ -public: - HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time) - : InputMessage("Heartbeat", thread) - { network_time = arg_network_time; current_time = arg_current_time; } - - virtual bool Process() { return Object()->DoHeartbeat(network_time, current_time); } - -private: - double network_time; - double current_time; -}; - #ifdef DEBUG +// A debug message from the child to be passed on to the DebugLogger. class DebugMessage : public OutputMessage { public: @@ -77,8 +68,7 @@ public: virtual bool Process() { string s = Object()->Name() + ": " + msg; - strreplace(s, "%", "%%"); - debug_logger.Log(stream, s.c_str()); + debug_logger.Log(stream, "%s", s.c_str()); return true; } private: @@ -89,7 +79,7 @@ private: } -// Methods. +////// Methods. Message::~Message() { diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 2e976c1773..8f37041bb6 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -15,121 +15,332 @@ class BasicInputMessage; class BasicOutputMessage; class HeartbeatMessage; +/** + * A specialized thread that provides bi-directional message passing between + * Bro's main thread and the child thread. Messages are instances of + * BasicInputMessage and BasicOutputMessage for message sent \a to the child + * thread and received \a from the child thread, respectively. + * + * The thread's Run() method implements main loop that processes incoming + * messages until Terminating() indicates that execution should stop. Once + * that happens, the thread stops accepting any new messages, finishes + * processes all remaining ones still in the queue, and then exits. + */ class MsgThread : public BasicThread { public: + /** + * Constructor. It automatically registers the thread with the + * threading::Manager. + * + * Only Bro's main thread may instantiate a new thread. + * + * @param name A descriptive name. This is passed on to BasicThread(). + */ MsgThread(const string& name); + /** + * Sends a message to the child thread. The message will be proceesed + * once the thread has retrieved it from its incoming queue. + * + * Only the main thread may call this method. + * + * @param msg The message. + */ void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); } + + /** + * Sends a message from the child thread to the main thread. + * + * Only the child thread may call this method. + * + * @param msg The mesasge. + */ void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); } - BasicOutputMessage* RetrieveOut(); - - // Report an informational message, nothing that needs specific - // attention. + /** + * Reports an informational message from the child thread. The main + * thread will pass this to the Reporter once received. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void Info(const char* msg); - // Report a warning that may indicate a problem. + /** + * Reports a warning from the child thread that may indicate a + * problem. The main thread will pass this to the Reporter once + * received. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void Warning(const char* msg); - // Report a non-fatal error. Processing proceeds normally after the error - // has been reported. + /** + * Reports a non-fatal error from the child thread. The main thread + * will pass this to the Reporter once received. Processing proceeds + * normally after the error has been reported. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void Error(const char* msg); - // Report a fatal error. Bro will terminate after the message has been - // reported. + /** + * Reports a fatal error from the child thread. The main thread will + * pass this to the Reporter once received. Bro will terminate after + * the message has been reported. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void FatalError(const char* msg); - // Report a fatal error. Bro will terminate after the message has been - // reported and always generate a core dump. + /** + * Reports a fatal error from the child thread. The main thread will + * pass this to the Reporter once received. Bro will terminate with a + * core dump after the message has been reported. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void FatalErrorWithCore(const char* msg); - // Report about a potential internal problem. Bro will continue - // normally. + /** + * Reports a potential internal problem from the child thread. The + * main thread will pass this to the Reporter once received. Bro will + * continue normally. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void InternalWarning(const char* msg); - // Report an internal program error. Bro will terminate with a core - // dump after the message has been reported. + /** + * Reports an internal program error from the child thread. The main + * thread will pass this to the Reporter once received. Bro will + * terminate with a core dump after the message has been reported. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void InternalError(const char* msg); #ifdef DEBUG - // Records a debug message for the given stream. + /** + * Records a debug message for the given stream from the child + * thread. The main thread will pass this to the DebugLogger once + * received. + * + * Only the child thread may call this method. + * + * @param msg The message. It will be prefixed with the thread's name. + */ void Debug(DebugStream stream, const char* msg); #endif - void Heartbeat(); - + /** + * Statistics about inter-thread communication. + */ struct Stats { - uint64_t sent_in; - uint64_t sent_out; - uint64_t pending_in; - uint64_t pending_out; + uint64_t sent_in; //! Number of messages sent to the child thread. + uint64_t sent_out; //! Number of messages sent from the child thread to the main thread + uint64_t pending_in; //! Number of messages sent to the child but not yet processed. + uint64_t pending_out; //! Number of messages sent from the child but not yet processed by the main thread. }; + /** + * Returns statistics about the inter-thread communication. + * + * @param stats A pointer to a structure that will be filled with + * current numbers. + */ void GetStats(Stats* stats); protected: + friend class Manager; friend class HeartbeatMessage; + /** + * Pops a message sent by the child from the child-to-main queue. + * + * This is method is called regularly by the threading::Manager. + * + * @return The message, wth ownership passed to caller. Returns null + * if the queue is empty. + */ + BasicOutputMessage* RetrieveOut(); + + /** + * Triggers a heartbeat message being sent to the client thread. + * + * This is method is called regularly by the threading::Manager. + */ + void Heartbeat(); + + /** + * Overriden from BasicThread. + * + */ virtual void Run(); virtual void OnStop(); virtual bool DoHeartbeat(double network_time, double current_time) { return true; } private: - friend class Manager; - + /** + * Pops a message sent by the main thread from the main-to-chold + * queue. + * + * Must only be called by the child thread. + * + * @return The message, wth ownership passed to caller. Returns null + * if the queue is empty. + */ BasicInputMessage* RetrieveIn(); + /** + * Queues a message for the child. + * + * Must only be called by the main thread. + * + * @param msg The message. + * + * @param force: If true, the message will be queued even when we're already + * Terminating(). Normally, the message would be discarded in that + * case. + */ void SendIn(BasicInputMessage* msg, bool force); + + /** + * Queues a message for the main thread. + * + * Must only be called by the child thread. + * + * @param msg The message. + * + * @param force: If true, the message will be queued even when we're already + * Terminating(). Normally, the message would be discarded in that + * case. + */ void SendOut(BasicOutputMessage* msg, bool force); + /** + * Returns true if there's at least one message pending for the child + * thread. + */ bool HasIn() { return queue_in.Ready(); } + + /** + * Returns true if there's at least one message pending for the main + * thread. + */ bool HasOut() { return queue_out.Ready(); } - Queue_ queue_in; - Queue_ queue_out; + Queue queue_in; + Queue queue_out; - uint64_t cnt_sent_in; - uint64_t cnt_sent_out; + uint64_t cnt_sent_in; // Counts message sent to child. + uint64_t cnt_sent_out; // Counts message sent by child. }; +/** + * Base class for all message between Bro's main process and a MsgThread. + */ class Message { public: + /** + * Destructor. + */ virtual ~Message(); + /** + * Returns a descriptive name for the message's general type. This is + * what's passed into the constructor and used mainly for debugging + * purposes. + */ const string& Name() const { return name; } + /** + * Callback that must be overriden for processing a message. + */ virtual bool Process() = 0; // Thread will be terminated if returngin false. protected: + /** + * Constructor. + * + * @param arg_name A descriptive name for the type of message. Used + * mainly for debugging purposes. + */ Message(const string& arg_name) { name = arg_name; } private: string name; }; +/** + * Base class for messages sent from Bro's main thread to a child MsgThread. + */ class BasicInputMessage : public Message { protected: + /** + * Constructor. + * + * @param name A descriptive name for the type of message. Used + * mainly for debugging purposes. + */ BasicInputMessage(const string& name) : Message(name) {} }; +/** + * Base class for messages sent from a child MsgThread to Bro's main thread. + */ class BasicOutputMessage : public Message { protected: + /** + * Constructor. + * + * @param name A descriptive name for the type of message. Used + * mainly for debugging purposes. + */ BasicOutputMessage(const string& name) : Message(name) {} }; +/** + * A paremeterized InputMessage that stores a pointer to an argument object. + * Normally, the objects will be used from the Process() callback. + */ template class InputMessage : public BasicInputMessage { public: + /** + * Returns the objects passed to the constructor. + */ O* Object() const { return object; } protected: + /** + * Constructor. + * + * @param name: A descriptive name for the type of message. Used + * mainly for debugging purposes. + * + * @param arg_object: An object to store with the message. + */ InputMessage(const string& name, O* arg_object) : BasicInputMessage(name) { object = arg_object; } @@ -137,13 +348,28 @@ private: O* object; }; +/** + * A paremeterized OututMessage that stores a pointer to an argument object. + * Normally, the objects will be used from the Process() callback. + */ template class OutputMessage : public BasicOutputMessage { public: + /** + * Returns the objects passed to the constructor. + */ O* Object() const { return object; } protected: + /** + * Constructor. + * + * @param name A descriptive name for the type of message. Used + * mainly for debugging purposes. + * + * @param arg_object An object to store with the message. + */ OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name) { object = arg_object; } diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 49859dc051..add7019f9c 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -9,23 +9,53 @@ #include "Reporter.h" +#undef Queue // Defined elsewhere unfortunately. + namespace threading { /** - * Just a simple threaded queue wrapper class. Uses multiple queues and reads / writes in rotary fashion in an attempt to limit contention. - * Due to locking granularity, bulk put / get is no faster than single put / get as long as FIFO guarantee is required. + * A thread-safe single-reader single-writer queue. + * + * The implementation uses multiple queues and reads/writes in rotary fashion + * in an attempt to limit contention. + * + * All Queue instances must be instantiated by Bro's main thread. + * + * TODO: Unclear how critical performance is for this qeueue. We could like;y + * optimize it further if helpful. */ - template -class Queue_ +class Queue { public: - Queue_(); - ~Queue_(); + /** + * Constructor. + */ + Queue(); + /** + * Destructor. + */ + ~Queue(); + + /** + * Retrieves one elment. + */ T Get(); + + /** + * Queues one element. + */ void Put(T data); + + /** + * Returns true if the next Get() operation will succeed. + */ bool Ready(); + + /** + * Returns the number of queued items not yet retrieved. + */ uint64_t Size(); private: @@ -37,7 +67,7 @@ private: int read_ptr; // Where the next operation will read from int write_ptr; // Where the next operation will write to - uint64_t size; + uint64_t size; // Current queue size. }; inline static void safe_lock(pthread_mutex_t* mutex) @@ -53,7 +83,7 @@ inline static void safe_unlock(pthread_mutex_t* mutex) } template -inline Queue_::Queue_() +inline Queue::Queue() { read_ptr = 0; write_ptr = 0; @@ -69,7 +99,7 @@ inline Queue_::Queue_() } template -inline Queue_::~Queue_() +inline Queue::~Queue() { for( int i = 0; i < NUM_QUEUES; ++i ) { @@ -79,7 +109,7 @@ inline Queue_::~Queue_() } template -inline T Queue_::Get() +inline T Queue::Get() { safe_lock(&mutex[read_ptr]); @@ -100,7 +130,7 @@ inline T Queue_::Get() } template -inline void Queue_::Put(T data) +inline void Queue::Put(T data) { safe_lock(&mutex[write_ptr]); @@ -121,7 +151,7 @@ inline void Queue_::Put(T data) template -inline bool Queue_::Ready() +inline bool Queue::Ready() { safe_lock(&mutex[read_ptr]); @@ -133,7 +163,7 @@ inline bool Queue_::Ready() } template -inline uint64_t Queue_::Size() +inline uint64_t Queue::Size() { safe_lock(&mutex[read_ptr]);