diff --git a/src/Timer.cc b/src/Timer.cc index a16ee3bf33..686eca9e68 100644 --- a/src/Timer.cc +++ b/src/Timer.cc @@ -43,6 +43,7 @@ const char* TimerNames[] = { "TriggerTimer", "ParentProcessIDCheck", "TimerMgrExpireTimer", + "ThreadHeartbeat", }; const char* timer_type_to_string(TimerType type) diff --git a/src/Timer.h b/src/Timer.h index 8f73041416..6ad7cda003 100644 --- a/src/Timer.h +++ b/src/Timer.h @@ -38,8 +38,9 @@ enum TimerType : uint8_t { TIMER_TRIGGER, TIMER_PPID_CHECK, TIMER_TIMERMGR_EXPIRE, + TIMER_THREAD_HEARTBEAT, }; -const int NUM_TIMER_TYPES = int(TIMER_TIMERMGR_EXPIRE) + 1; +const int NUM_TIMER_TYPES = int(TIMER_THREAD_HEARTBEAT) + 1; extern const char* timer_type_to_string(TimerType type); diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index b07ab0d3d5..a8d721b76c 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -1,33 +1,44 @@ +#include +#include #include "Manager.h" #include "NetVar.h" +#include "iosource/Manager.h" using namespace threading; +void HeartbeatTimer::Dispatch(double t, int is_expire) + { + if ( is_expire ) + return; + + thread_mgr->SendHeartbeats(); + thread_mgr->StartHeartbeatTimer(); + } + Manager::Manager() { DBG_LOG(DBG_THREADING, "Creating thread manager ..."); did_process = true; next_beat = 0; - terminating = false; - SetIdle(true); } Manager::~Manager() { if ( all_threads.size() ) Terminate(); + + if ( heartbeat_timer ) + delete heartbeat_timer; } void Manager::Terminate() { DBG_LOG(DBG_THREADING, "Terminating thread manager ..."); - terminating = true; - // First process remaining thread output for the message threads. - do Process(); while ( did_process ); + do Flush(); while ( did_process ); // Signal all to stop. @@ -46,17 +57,15 @@ void Manager::Terminate() all_threads.clear(); msg_threads.clear(); - - SetIdle(true); - SetClosed(true); - terminating = false; } void Manager::AddThread(BasicThread* thread) { DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); all_threads.push_back(thread); - SetIdle(false); + + if ( ! heartbeat_timer ) + StartHeartbeatTimer(); } void Manager::AddMsgThread(MsgThread* thread) @@ -65,34 +74,6 @@ void Manager::AddMsgThread(MsgThread* thread) msg_threads.push_back(thread); } -void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) - { - } - -double Manager::NextTimestamp(double* network_time) - { -// fprintf(stderr, "N %.6f %.6f did_process=%d next_next=%.6f\n", ::network_time, timer_mgr->Time(), (int)did_process, next_beat); - - if ( ::network_time && (did_process || ::network_time > next_beat || ! next_beat) ) - // If we had something to process last time (or out heartbeat - // is due or not set yet), we want to check for more asap. - return timer_mgr->Time(); - - for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ ) - { - MsgThread* t = *i; - - if ( t->MightHaveOut() || t->Killed() ) - // Even if the thread doesn't have output, it may be killed/done, - // which should also signify that processing is needed. The - // "processing" in that case is joining the thread and deleting it. - return timer_mgr->Time(); - } - - return -1.0; - } - void Manager::KillThreads() { DBG_LOG(DBG_THREADING, "Killing threads ..."); @@ -107,7 +88,46 @@ void Manager::KillThread(BasicThread* thread) thread->Kill(); } -void Manager::Process() +void Manager::SendHeartbeats() + { + for ( MsgThread* thread : msg_threads ) + thread->Heartbeat(); + + // Since this is a regular timer, this is also an ideal place to check whether we have + // and dead threads and to delete them. + all_thread_list to_delete; + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + { + BasicThread* t = *i; + + if ( t->Killed() ) + to_delete.push_back(t); + } + + for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ ) + { + BasicThread* t = *i; + t->WaitForStop(); + + all_threads.remove(t); + + MsgThread* mt = dynamic_cast(t); + + if ( mt ) + msg_threads.remove(mt); + + t->Join(); + delete t; + } + } + +void Manager::StartHeartbeatTimer() + { + heartbeat_timer = new HeartbeatTimer(network_time + BifConst::Threading::heartbeat_interval); + timer_mgr->Add(heartbeat_timer); + } + +void Manager::Flush() { bool do_beat = false; @@ -192,5 +212,3 @@ const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats( return stats; } - - diff --git a/src/threading/Manager.h b/src/threading/Manager.h index bf2cb429b4..132d2238c4 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -3,13 +3,25 @@ #include -#include "iosource/IOSource.h" - #include "BasicThread.h" #include "MsgThread.h" +#include "Timer.h" namespace threading { +class HeartbeatTimer : public Timer { +public: + HeartbeatTimer(double t) : Timer(t, TIMER_THREAD_HEARTBEAT) {} + virtual ~HeartbeatTimer() {} + + void Dispatch(double t, int is_expire); + +protected: + + void Init(); + int do_expire; +}; + /** * The thread manager coordinates all child threads. Once a BasicThread is * instantitated, it gets addedd to the manager, which will delete it later @@ -20,7 +32,7 @@ namespace threading { * their outgoing message queue on a regular basis and feeds data sent into * the rest of Bro. It also triggers the regular heartbeats. */ -class Manager : public iosource::IOSource +class Manager { public: /** @@ -30,9 +42,9 @@ public: Manager(); /** - * Destructir. + * Destructor. */ - ~Manager() override; + ~Manager(); /** * Terminates the manager's processor. The method signals all threads @@ -80,6 +92,7 @@ public: protected: friend class BasicThread; friend class MsgThread; + friend class HeartbeatTimer; /** * Registers a new basic thread with the manager. This is @@ -99,26 +112,17 @@ protected: */ void AddMsgThread(MsgThread* thread); - /** - * Part of the IOSource interface. - */ - void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) override; + void Flush(); /** - * Part of the IOSource interface. + * Sends heartbeat messages to all active message threads. */ - double NextTimestamp(double* network_time) override; + void SendHeartbeats(); /** - * Part of the IOSource interface. + * Sets up a timer to periodically send heartbeat messages to all threads. */ - void Process() override; - - /** - * Part of the IOSource interface. - */ - const char* Tag() override { return "threading::Manager"; } + void StartHeartbeatTimer(); private: typedef std::list all_thread_list; @@ -132,6 +136,8 @@ private: bool terminating; // True if we are in Terminate(). msg_stats_list stats; + + HeartbeatTimer* heartbeat_timer = nullptr; }; } diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 01f90921e8..41267147d0 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -1,11 +1,12 @@ +#include +#include +#include #include "DebugLogger.h" #include "MsgThread.h" #include "Manager.h" - -#include -#include +#include "iosource/Manager.h" using namespace threading; @@ -179,6 +180,17 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) child_sent_finish = false; failed = false; thread_mgr->AddMsgThread(this); + + iosource_mgr->RegisterFd(flare.FD(), this); + + SetClosed(false); + } + +MsgThread::~MsgThread() + { + // Unregister this thread from the iosource manager so it doesn't wake + // up the main poll anymore. + iosource_mgr->UnregisterFd(flare.FD()); } // Set by Bro's main signal handler. @@ -252,6 +264,8 @@ void MsgThread::OnWaitForStop() void MsgThread::OnKill() { + SetClosed(true); + // Send a message to unblock the reader if its currently waiting for // input. This is just an optimization to make it terminate more // quickly, even without the message it will eventually time out. @@ -344,6 +358,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) queue_out.Put(msg); ++cnt_sent_out; + + flare.Fire(); } BasicOutputMessage* MsgThread::RetrieveOut() @@ -418,3 +434,21 @@ void MsgThread::GetStats(Stats* stats) queue_out.GetStats(&stats->queue_out_stats); } +void MsgThread::Process() + { + flare.Extinguish(); + + while ( HasOut() ) + { + Message* msg = RetrieveOut(); + assert(msg); + + if ( ! msg->Process() ) + { + reporter->Error("%s failed, terminating thread", msg->Name()); + SignalStop(); + } + + delete msg; + } + } diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 1f5ec0e017..b920410e73 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -5,6 +5,8 @@ #include "BasicThread.h" #include "Queue.h" +#include "iosource/IOSource.h" +#include "Flare.h" namespace threading { @@ -23,7 +25,7 @@ class HeartbeatMessage; * that happens, the thread stops accepting any new messages, finishes * processes all remaining ones still in the queue, and then exits. */ -class MsgThread : public BasicThread +class MsgThread : public BasicThread, public iosource::IOSource { public: /** @@ -34,6 +36,11 @@ public: */ MsgThread(); + /** + * Destructor. + */ + virtual ~MsgThread(); + /** * Sends a message to the child thread. The message will be proceesed * once the thread has retrieved it from its incoming queue. @@ -175,6 +182,13 @@ public: */ void GetStats(Stats* stats); + /** + * Overridden from iosource::IOSource. + */ + void Process() override; + const char* Tag() override { return Name(); } + double GetNextTimeout() override { return -1; } + protected: friend class Manager; friend class HeartbeatMessage; @@ -229,7 +243,6 @@ protected: /** * Overriden from BasicThread. - * */ void Run() override; void OnWaitForStop() override; @@ -308,6 +321,8 @@ private: bool child_finished; // Child thread is finished. bool child_sent_finish; // Child thread asked to be finished. bool failed; // Set to true when a command failed. + + bro::Flare flare; }; /**