Threading changes for the new loop architecture

- threading::Manager is no longer an IOSource.
- threading::MsgThread is now an IOSource. This allows threads themselves to signal when they have data to process instead of continually checking each of the threads on every loop pass.
- Make the thread heartbeat timer an actual timer and let it fire as necessary instead of checking to see if it should fire
This commit is contained in:
Tim Wojtulewicz 2019-11-26 12:54:51 -07:00
parent a159d075cf
commit 8b9160fb7e
6 changed files with 141 additions and 66 deletions

View file

@ -43,6 +43,7 @@ const char* TimerNames[] = {
"TriggerTimer", "TriggerTimer",
"ParentProcessIDCheck", "ParentProcessIDCheck",
"TimerMgrExpireTimer", "TimerMgrExpireTimer",
"ThreadHeartbeat",
}; };
const char* timer_type_to_string(TimerType type) const char* timer_type_to_string(TimerType type)

View file

@ -38,8 +38,9 @@ enum TimerType : uint8_t {
TIMER_TRIGGER, TIMER_TRIGGER,
TIMER_PPID_CHECK, TIMER_PPID_CHECK,
TIMER_TIMERMGR_EXPIRE, TIMER_TIMERMGR_EXPIRE,
TIMER_THREAD_HEARTBEAT,
}; };
const int NUM_TIMER_TYPES = int(TIMER_TIMERMGR_EXPIRE) + 1; const int NUM_TIMER_TYPES = int(TIMER_THREAD_HEARTBEAT) + 1;
extern const char* timer_type_to_string(TimerType type); extern const char* timer_type_to_string(TimerType type);

View file

@ -1,33 +1,44 @@
#include <sys/socket.h>
#include <unistd.h>
#include "Manager.h" #include "Manager.h"
#include "NetVar.h" #include "NetVar.h"
#include "iosource/Manager.h"
using namespace threading; using namespace threading;
void HeartbeatTimer::Dispatch(double t, int is_expire)
{
if ( is_expire )
return;
thread_mgr->SendHeartbeats();
thread_mgr->StartHeartbeatTimer();
}
Manager::Manager() Manager::Manager()
{ {
DBG_LOG(DBG_THREADING, "Creating thread manager ..."); DBG_LOG(DBG_THREADING, "Creating thread manager ...");
did_process = true; did_process = true;
next_beat = 0; next_beat = 0;
terminating = false;
SetIdle(true);
} }
Manager::~Manager() Manager::~Manager()
{ {
if ( all_threads.size() ) if ( all_threads.size() )
Terminate(); Terminate();
if ( heartbeat_timer )
delete heartbeat_timer;
} }
void Manager::Terminate() void Manager::Terminate()
{ {
DBG_LOG(DBG_THREADING, "Terminating thread manager ..."); DBG_LOG(DBG_THREADING, "Terminating thread manager ...");
terminating = true;
// First process remaining thread output for the message threads. // First process remaining thread output for the message threads.
do Process(); while ( did_process ); do Flush(); while ( did_process );
// Signal all to stop. // Signal all to stop.
@ -46,17 +57,15 @@ void Manager::Terminate()
all_threads.clear(); all_threads.clear();
msg_threads.clear(); msg_threads.clear();
SetIdle(true);
SetClosed(true);
terminating = false;
} }
void Manager::AddThread(BasicThread* thread) void Manager::AddThread(BasicThread* thread)
{ {
DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name());
all_threads.push_back(thread); all_threads.push_back(thread);
SetIdle(false);
if ( ! heartbeat_timer )
StartHeartbeatTimer();
} }
void Manager::AddMsgThread(MsgThread* thread) void Manager::AddMsgThread(MsgThread* thread)
@ -65,34 +74,6 @@ void Manager::AddMsgThread(MsgThread* thread)
msg_threads.push_back(thread); msg_threads.push_back(thread);
} }
void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
}
double Manager::NextTimestamp(double* network_time)
{
// fprintf(stderr, "N %.6f %.6f did_process=%d next_next=%.6f\n", ::network_time, timer_mgr->Time(), (int)did_process, next_beat);
if ( ::network_time && (did_process || ::network_time > next_beat || ! next_beat) )
// If we had something to process last time (or out heartbeat
// is due or not set yet), we want to check for more asap.
return timer_mgr->Time();
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
{
MsgThread* t = *i;
if ( t->MightHaveOut() || t->Killed() )
// Even if the thread doesn't have output, it may be killed/done,
// which should also signify that processing is needed. The
// "processing" in that case is joining the thread and deleting it.
return timer_mgr->Time();
}
return -1.0;
}
void Manager::KillThreads() void Manager::KillThreads()
{ {
DBG_LOG(DBG_THREADING, "Killing threads ..."); DBG_LOG(DBG_THREADING, "Killing threads ...");
@ -107,7 +88,46 @@ void Manager::KillThread(BasicThread* thread)
thread->Kill(); thread->Kill();
} }
void Manager::Process() void Manager::SendHeartbeats()
{
for ( MsgThread* thread : msg_threads )
thread->Heartbeat();
// Since this is a regular timer, this is also an ideal place to check whether we have
// and dead threads and to delete them.
all_thread_list to_delete;
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
BasicThread* t = *i;
if ( t->Killed() )
to_delete.push_back(t);
}
for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ )
{
BasicThread* t = *i;
t->WaitForStop();
all_threads.remove(t);
MsgThread* mt = dynamic_cast<MsgThread *>(t);
if ( mt )
msg_threads.remove(mt);
t->Join();
delete t;
}
}
void Manager::StartHeartbeatTimer()
{
heartbeat_timer = new HeartbeatTimer(network_time + BifConst::Threading::heartbeat_interval);
timer_mgr->Add(heartbeat_timer);
}
void Manager::Flush()
{ {
bool do_beat = false; bool do_beat = false;
@ -192,5 +212,3 @@ const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats(
return stats; return stats;
} }

View file

@ -3,13 +3,25 @@
#include <list> #include <list>
#include "iosource/IOSource.h"
#include "BasicThread.h" #include "BasicThread.h"
#include "MsgThread.h" #include "MsgThread.h"
#include "Timer.h"
namespace threading { namespace threading {
class HeartbeatTimer : public Timer {
public:
HeartbeatTimer(double t) : Timer(t, TIMER_THREAD_HEARTBEAT) {}
virtual ~HeartbeatTimer() {}
void Dispatch(double t, int is_expire);
protected:
void Init();
int do_expire;
};
/** /**
* The thread manager coordinates all child threads. Once a BasicThread is * The thread manager coordinates all child threads. Once a BasicThread is
* instantitated, it gets addedd to the manager, which will delete it later * instantitated, it gets addedd to the manager, which will delete it later
@ -20,7 +32,7 @@ namespace threading {
* their outgoing message queue on a regular basis and feeds data sent into * their outgoing message queue on a regular basis and feeds data sent into
* the rest of Bro. It also triggers the regular heartbeats. * the rest of Bro. It also triggers the regular heartbeats.
*/ */
class Manager : public iosource::IOSource class Manager
{ {
public: public:
/** /**
@ -30,9 +42,9 @@ public:
Manager(); Manager();
/** /**
* Destructir. * Destructor.
*/ */
~Manager() override; ~Manager();
/** /**
* Terminates the manager's processor. The method signals all threads * Terminates the manager's processor. The method signals all threads
@ -80,6 +92,7 @@ public:
protected: protected:
friend class BasicThread; friend class BasicThread;
friend class MsgThread; friend class MsgThread;
friend class HeartbeatTimer;
/** /**
* Registers a new basic thread with the manager. This is * Registers a new basic thread with the manager. This is
@ -99,26 +112,17 @@ protected:
*/ */
void AddMsgThread(MsgThread* thread); void AddMsgThread(MsgThread* thread);
/** void Flush();
* Part of the IOSource interface.
*/
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
/** /**
* Part of the IOSource interface. * Sends heartbeat messages to all active message threads.
*/ */
double NextTimestamp(double* network_time) override; void SendHeartbeats();
/** /**
* Part of the IOSource interface. * Sets up a timer to periodically send heartbeat messages to all threads.
*/ */
void Process() override; void StartHeartbeatTimer();
/**
* Part of the IOSource interface.
*/
const char* Tag() override { return "threading::Manager"; }
private: private:
typedef std::list<BasicThread*> all_thread_list; typedef std::list<BasicThread*> all_thread_list;
@ -132,6 +136,8 @@ private:
bool terminating; // True if we are in Terminate(). bool terminating; // True if we are in Terminate().
msg_stats_list stats; msg_stats_list stats;
HeartbeatTimer* heartbeat_timer = nullptr;
}; };
} }

View file

@ -1,11 +1,12 @@
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include "DebugLogger.h" #include "DebugLogger.h"
#include "MsgThread.h" #include "MsgThread.h"
#include "Manager.h" #include "Manager.h"
#include "iosource/Manager.h"
#include <unistd.h>
#include <signal.h>
using namespace threading; using namespace threading;
@ -179,6 +180,17 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
child_sent_finish = false; child_sent_finish = false;
failed = false; failed = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
iosource_mgr->RegisterFd(flare.FD(), this);
SetClosed(false);
}
MsgThread::~MsgThread()
{
// Unregister this thread from the iosource manager so it doesn't wake
// up the main poll anymore.
iosource_mgr->UnregisterFd(flare.FD());
} }
// Set by Bro's main signal handler. // Set by Bro's main signal handler.
@ -252,6 +264,8 @@ void MsgThread::OnWaitForStop()
void MsgThread::OnKill() void MsgThread::OnKill()
{ {
SetClosed(true);
// Send a message to unblock the reader if its currently waiting for // Send a message to unblock the reader if its currently waiting for
// input. This is just an optimization to make it terminate more // input. This is just an optimization to make it terminate more
// quickly, even without the message it will eventually time out. // quickly, even without the message it will eventually time out.
@ -344,6 +358,8 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
queue_out.Put(msg); queue_out.Put(msg);
++cnt_sent_out; ++cnt_sent_out;
flare.Fire();
} }
BasicOutputMessage* MsgThread::RetrieveOut() BasicOutputMessage* MsgThread::RetrieveOut()
@ -418,3 +434,21 @@ void MsgThread::GetStats(Stats* stats)
queue_out.GetStats(&stats->queue_out_stats); queue_out.GetStats(&stats->queue_out_stats);
} }
void MsgThread::Process()
{
flare.Extinguish();
while ( HasOut() )
{
Message* msg = RetrieveOut();
assert(msg);
if ( ! msg->Process() )
{
reporter->Error("%s failed, terminating thread", msg->Name());
SignalStop();
}
delete msg;
}
}

View file

@ -5,6 +5,8 @@
#include "BasicThread.h" #include "BasicThread.h"
#include "Queue.h" #include "Queue.h"
#include "iosource/IOSource.h"
#include "Flare.h"
namespace threading { namespace threading {
@ -23,7 +25,7 @@ class HeartbeatMessage;
* that happens, the thread stops accepting any new messages, finishes * that happens, the thread stops accepting any new messages, finishes
* processes all remaining ones still in the queue, and then exits. * processes all remaining ones still in the queue, and then exits.
*/ */
class MsgThread : public BasicThread class MsgThread : public BasicThread, public iosource::IOSource
{ {
public: public:
/** /**
@ -34,6 +36,11 @@ public:
*/ */
MsgThread(); MsgThread();
/**
* Destructor.
*/
virtual ~MsgThread();
/** /**
* Sends a message to the child thread. The message will be proceesed * Sends a message to the child thread. The message will be proceesed
* once the thread has retrieved it from its incoming queue. * once the thread has retrieved it from its incoming queue.
@ -175,6 +182,13 @@ public:
*/ */
void GetStats(Stats* stats); void GetStats(Stats* stats);
/**
* Overridden from iosource::IOSource.
*/
void Process() override;
const char* Tag() override { return Name(); }
double GetNextTimeout() override { return -1; }
protected: protected:
friend class Manager; friend class Manager;
friend class HeartbeatMessage; friend class HeartbeatMessage;
@ -229,7 +243,6 @@ protected:
/** /**
* Overriden from BasicThread. * Overriden from BasicThread.
*
*/ */
void Run() override; void Run() override;
void OnWaitForStop() override; void OnWaitForStop() override;
@ -308,6 +321,8 @@ private:
bool child_finished; // Child thread is finished. bool child_finished; // Child thread is finished.
bool child_sent_finish; // Child thread asked to be finished. bool child_sent_finish; // Child thread asked to be finished.
bool failed; // Set to true when a command failed. bool failed; // Set to true when a command failed.
bro::Flare flare;
}; };
/** /**