mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 08:08:19 +00:00
Improving error handling for threads.
If a thread command fails (like the input framework not finding a file), that now (1) no longer hangs Bro, and (2) even allows for propagating error messages back before the thread is stops. (Actually, the thread doesn't really "stop"; the thread manager keeps threads around independent of their success; but it no longer polls them for input.) Closes #858.
This commit is contained in:
parent
ef3b75129f
commit
743fc1680d
3 changed files with 39 additions and 14 deletions
|
@ -80,8 +80,10 @@ double Manager::NextTimestamp(double* network_time)
|
||||||
|
|
||||||
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
|
for ( msg_thread_list::iterator i = msg_threads.begin(); i != msg_threads.end(); i++ )
|
||||||
{
|
{
|
||||||
if ( (*i)->MightHaveOut() )
|
MsgThread* t = *i;
|
||||||
return timer_mgr->Time();
|
|
||||||
|
if ( (*i)->MightHaveOut() && ! t->Killed() )
|
||||||
|
return timer_mgr->Time();
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1.0;
|
return -1.0;
|
||||||
|
@ -95,6 +97,12 @@ void Manager::KillThreads()
|
||||||
(*i)->Kill();
|
(*i)->Kill();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Manager::KillThread(BasicThread* thread)
|
||||||
|
{
|
||||||
|
DBG_LOG(DBG_THREADING, "Killing thread %s ...", thread->Name());
|
||||||
|
thread->Kill();
|
||||||
|
}
|
||||||
|
|
||||||
void Manager::Process()
|
void Manager::Process()
|
||||||
{
|
{
|
||||||
bool do_beat = false;
|
bool do_beat = false;
|
||||||
|
@ -114,7 +122,7 @@ void Manager::Process()
|
||||||
if ( do_beat )
|
if ( do_beat )
|
||||||
t->Heartbeat();
|
t->Heartbeat();
|
||||||
|
|
||||||
while ( t->HasOut() )
|
while ( t->HasOut() && ! t->Killed() )
|
||||||
{
|
{
|
||||||
Message* msg = t->RetrieveOut();
|
Message* msg = t->RetrieveOut();
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,16 @@ public:
|
||||||
*/
|
*/
|
||||||
void ForceProcessing() { Process(); }
|
void ForceProcessing() { Process(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals a specific threads to terminate immediately.
|
||||||
|
*/
|
||||||
|
void KillThread(BasicThread* thread);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals all threads to terminate immediately.
|
||||||
|
*/
|
||||||
|
void KillThreads();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class BasicThread;
|
friend class BasicThread;
|
||||||
friend class MsgThread;
|
friend class MsgThread;
|
||||||
|
@ -106,13 +116,6 @@ protected:
|
||||||
*/
|
*/
|
||||||
virtual double NextTimestamp(double* network_time);
|
virtual double NextTimestamp(double* network_time);
|
||||||
|
|
||||||
/**
|
|
||||||
* Kills all thread immediately. Note that this may cause race conditions
|
|
||||||
* if a child thread currently holds a lock that might block somebody
|
|
||||||
* else.
|
|
||||||
*/
|
|
||||||
virtual void KillThreads();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Part of the IOSource interface.
|
* Part of the IOSource interface.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -70,6 +70,16 @@ private:
|
||||||
Type type;
|
Type type;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A message from the the child to the main process, requesting suicide.
|
||||||
|
class KillMeMessage : public OutputMessage<MsgThread>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
KillMeMessage(MsgThread* thread)
|
||||||
|
: OutputMessage<MsgThread>("ReporterMessage", thread) {}
|
||||||
|
|
||||||
|
virtual bool Process() { thread_mgr->KillThread(Object()); return true; }
|
||||||
|
};
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
// A debug message from the child to be passed on to the DebugLogger.
|
// A debug message from the child to be passed on to the DebugLogger.
|
||||||
class DebugMessage : public OutputMessage<MsgThread>
|
class DebugMessage : public OutputMessage<MsgThread>
|
||||||
|
@ -346,16 +356,20 @@ void MsgThread::Run()
|
||||||
|
|
||||||
if ( ! result )
|
if ( ! result )
|
||||||
{
|
{
|
||||||
string s = Fmt("%s failed, terminating thread (MsgThread)", Name());
|
Error("terminating thread");
|
||||||
Error(s.c_str());
|
|
||||||
break;
|
// This will eventually kill this thread, but only
|
||||||
|
// after all other outgoing messages (in particular
|
||||||
|
// error messages have been processed by then main
|
||||||
|
// thread).
|
||||||
|
SendOut(new KillMeMessage(this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// In case we haven't send the finish method yet, do it now. Reading
|
// In case we haven't send the finish method yet, do it now. Reading
|
||||||
// global network_time here should be fine, it isn't changing
|
// global network_time here should be fine, it isn't changing
|
||||||
// anymore.
|
// anymore.
|
||||||
if ( ! finished )
|
if ( ! finished && ! Killed() )
|
||||||
{
|
{
|
||||||
OnFinish(network_time);
|
OnFinish(network_time);
|
||||||
Finished();
|
Finished();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue