diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 68b0b506a1..a284c56201 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -162,9 +162,7 @@ bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const F num_fields = arg_num_fields; fields = arg_fields; - string name = Fmt("%s/%s", info.path.c_str(), frontend_name.c_str()); - - SetName(name); + SetName(frontend->Name()); if ( ! DoInit(arg_info, arg_num_fields, arg_fields) ) { diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index c1f307fb4e..20963d1535 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -169,6 +169,7 @@ bool Ascii::DoFinish(double network_time) ascii_done = true; CloseFile(network_time); + return true; } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 075581e9db..e7fb3f4c84 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -125,7 +125,7 @@ void BasicThread::Join() DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str()); - if ( pthread_join(pthread, 0) != 0 ) + if ( pthread && pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name.c_str()); DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); @@ -135,13 +135,13 @@ void BasicThread::Join() void BasicThread::Kill() { + terminating = true; + if ( ! (started && pthread) ) return; - // I believe this is safe to call from a signal handler ... Not error - // checking so that killing doesn't bail out if we have already - // terminated. - pthread_kill(pthread, SIGKILL); + pthread = 0; + pthread_kill(pthread, SIGTERM); } void* BasicThread::launcher(void *arg) diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 8e0610a056..f1f9307b03 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -83,6 +83,14 @@ double Manager::NextTimestamp(double* network_time) return -1.0; } +void Manager::KillThreads() + { + DBG_LOG(DBG_THREADING, "Killing threads ..."); + + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + (*i)->Kill(); + } + void Manager::Process() { bool do_beat = false; diff --git a/src/threading/Manager.h b/src/threading/Manager.h index 1c7914fcde..be81c69ba0 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -106,6 +106,13 @@ protected: */ virtual double NextTimestamp(double* network_time); + /** + * Kills all thread immediately. Note that this may cause race conditions + * if a child thread currently holds a lock that might block somebody + * else. + */ + virtual void KillThreads(); + /** * Part of the IOSource interface. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index f101d0ca3c..3913624654 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -156,6 +156,9 @@ MsgThread::MsgThread() : BasicThread() thread_mgr->AddMsgThread(this); } +// Set by Bro's main signal handler. +extern int signal_val; + void MsgThread::OnStop() { if ( stopped ) @@ -164,13 +167,31 @@ void MsgThread::OnStop() // Signal thread to terminate and wait until it has acknowledged. SendIn(new FinishMessage(this, network_time), true); + int old_signal_val = signal_val; + signal_val = 0; + int cnt = 0; + bool aborted = 0; + while ( ! finished ) { - if ( ++cnt % 2000 == 0 ) // Insurance against broken threads ... + // Terminate if we get another kill signal. + if ( signal_val == SIGTERM || signal_val == SIGINT ) { - reporter->Warning("thread %s has not yet terminated ...", Name().c_str()); - fprintf(stderr, "warning: thread %s has not yet terminated ...", Name().c_str()); + // Abort all threads here so that we won't hang next + // on another one. + fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name().c_str()); + thread_mgr->KillThreads(); + aborted = true; + break; + } + + if ( ++cnt % 10000 == 0 ) // Insurance against broken threads ... + { + fprintf(stderr, "killing thread %s ...\n", Name().c_str()); + Kill(); + aborted = true; + break; } usleep(1000); @@ -178,8 +199,11 @@ void MsgThread::OnStop() Finished(); + signal_val = old_signal_val; + // One more message to make sure the current queue read operation unblocks. - SendIn(new UnblockMessage(this), true); + if ( ! aborted ) + SendIn(new UnblockMessage(this), true); } void MsgThread::Heartbeat()