diff --git a/scripts/base/frameworks/logging/main.bro b/scripts/base/frameworks/logging/main.bro index 054ad4a30b..1126686c13 100644 --- a/scripts/base/frameworks/logging/main.bro +++ b/scripts/base/frameworks/logging/main.bro @@ -189,6 +189,15 @@ export { ## .. bro:see:: Log::add_default_filter Log::remove_default_filter global create_stream: function(id: ID, stream: Stream) : bool; + ## Removes a logging stream completely, stopping all the threads. + ## + ## id: The ID enum to be associated with the new logging stream. + ## + ## Returns: True if a new stream was successfully removed. + ## + ## .. bro:see:: Log:create_stream + global remove_stream: function(id: ID) : bool; + ## Enables a previously disabled logging stream. Disabled streams ## will not be written to until they are enabled again. New streams ## are enabled by default. @@ -442,6 +451,13 @@ function create_stream(id: ID, stream: Stream) : bool return add_default_filter(id); } +function remove_stream(id: ID) : bool + { + delete active_streams[id]; + + return __remove_stream(id); + } + function disable_stream(id: ID) : bool { delete active_streams[id]; diff --git a/src/logging.bif b/src/logging.bif index f5d3e8e3e6..cf97c59cd3 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool return new Val(result, TYPE_BOOL); %} +function Log::__remove_stream%(id: Log::ID%) : bool + %{ + bool result = log_mgr->RemoveStream(id->AsEnumVal()); + return new Val(result, TYPE_BOOL); + %} + function Log::__enable_stream%(id: Log::ID%) : bool %{ bool result = log_mgr->EnableStream(id->AsEnumVal()); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1ab83d84ba..67e7d998ed 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -374,6 +374,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) return true; } +bool Manager::RemoveStream(EnumVal* id) + { + unsigned int idx = id->AsEnum(); + + if ( idx >= streams.size() || ! streams[idx] ) + return false; + + Stream* stream = streams[idx]; + + if ( ! stream ) + return false; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) + { + WriterInfo* winfo = i->second; + + DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'", + winfo->writer->Name(), stream->name.c_str()); + + winfo->writer->Stop(); + delete winfo->writer; + delete winfo; + } + + stream->writers.clear(); + delete stream; + streams[idx] = 0; + + DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str()); + return true; + } + bool Manager::EnableStream(EnumVal* id) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 90ad944bc6..5b5f8014e3 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -47,6 +47,16 @@ public: */ bool CreateStream(EnumVal* id, RecordVal* stream); + /** + * Remove a log stream, stopping all threads. + * + * @param id The enum value corresponding the log stream. + * + * This methods corresponds directly to the internal BiF defined in + * logging.bif, which just forwards here. + */ + bool RemoveStream(EnumVal* id); + /** * Enables a log log stream. * diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index a97f48c1ed..73cba2ff3a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -138,6 +138,12 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); + + if ( backend ) + { + backend->SignalStop(); + backend = 0; // Thread manager will clean it up once it finishes. + } } void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index a4a8dcd415..5bcde21a5e 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -54,7 +54,8 @@ public: /** * Stops all output to this writer. Calling this methods disables all - * message forwarding to the backend. + * message forwarding to the backend and will eventually remove the + * backend thread. * * This method must only be called from the main thread. */ diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index c708bb79ef..09b6e95d7a 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -117,20 +117,7 @@ void BasicThread::Start() OnStart(); } -void BasicThread::PrepareStop() - { - if ( ! started ) - return; - - if ( terminating ) - return; - - DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name); - - OnPrepareStop(); - } - -void BasicThread::Stop() +void BasicThread::SignalStop() { if ( ! started ) return; @@ -140,7 +127,20 @@ void BasicThread::Stop() DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); - OnStop(); + OnSignalStop(); + } + +void BasicThread::WaitForStop() + { + if ( ! started ) + return; + + if ( terminating ) + return; + + DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate ...", name); + + OnWaitForStop(); terminating = true; } @@ -150,11 +150,12 @@ void BasicThread::Join() if ( ! started ) return; + if ( ! pthread ) + return; + assert(terminating); - DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); - - if ( pthread && pthread_join(pthread, 0) != 0 ) + if ( pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name); diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 100efe8851..cb0108219c 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -71,32 +71,33 @@ public: void Start(); /** - * Signals the thread to prepare for stopping. This must be called - * before Stop() and allows the thread to trigger shutting down - * without yet blocking for doing so. + * Signals the thread to prepare for stopping, but doesn't block to + * wait for that to happen. Use WaitForStop() for that. * + * The method lets Terminating() now return true, it does however not + * force the thread to terminate. It's up to the Run() method to to + * query Terminating() and exit eventually. + * * Calling this method has no effect if Start() hasn't been executed * yet. * * Only Bro's main thread must call this method. */ - void PrepareStop(); + void SignalStop(); /** - * Signals the thread to stop. The method lets Terminating() now - * return true. It does however not force the thread to terminate. - * It's up to the Run() method to to query Terminating() and exit - * eventually. + * Waits until a thread has stopped after receiving SignalStop(). * * Calling this method has no effect if Start() hasn't been executed - * yet. + * yet. If this is executed without calling SignalStop() first, + * results are undefined. * * Only Bro's main thread must call this method. */ - void Stop(); + void WaitForStop(); /** - * Returns true if Stop() has been called. + * Returns true if WaitForStop() has been called and finished. * * This method is safe to call from any thread. */ @@ -145,18 +146,19 @@ protected: virtual void OnStart() {} /** - * Executed with PrepareStop() (and before OnStop()). This is a hook - * into preparing the thread for stopping. It will be called from - * Bro's main thread before the thread has been signaled to stop. + * Executed with SignalStop(). This is a hook into preparing the + * thread for stopping. It will be called from Bro's main thread + * before the thread has been signaled to stop. */ - virtual void OnPrepareStop() {} + virtual void OnSignalStop() {} /** - * Executed with Stop() (and after OnPrepareStop()). This is a hook - * into stopping the thread. It will be called from Bro's main thread - * after the thread has been signaled to stop. + * Executed with WaitForStop(). This is a hook into waiting for the + * thread to stop. It must be overridden by derived classes and only + * return once the thread has indeed finished processing. The method + * will be called from Bro's main thread. */ - virtual void OnStop() {} + virtual void OnWaitForStop() = 0; /** * Executed with Kill(). This is a hook into killing the thread. diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index cfc44596e1..39a6bdce7d 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -32,10 +32,10 @@ void Manager::Terminate() // Signal all to stop. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->PrepareStop(); + (*i)->SignalStop(); for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->Stop(); + (*i)->WaitForStop(); // Then join them all. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) @@ -141,13 +141,40 @@ void Manager::Process() else { reporter->Error("%s failed, terminating thread", msg->Name()); - t->Stop(); + t->SignalStop(); } delete msg; } } + all_thread_list to_delete; + + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + { + BasicThread* t = *i; + + if ( ! t->Killed() ) + continue; + + to_delete.push_back(t); + } + + for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ ) + { + BasicThread* t = *i; + + all_threads.remove(t); + + MsgThread* mt = dynamic_cast(t); + + if ( mt ) + msg_threads.remove(mt); + + t->Join(); + delete t; + } + // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); } diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6c63c5a287..902ea36410 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -161,16 +161,16 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) // Set by Bro's main signal handler. extern int signal_val; -void MsgThread::OnPrepareStop() +void MsgThread::OnSignalStop() { if ( finished || Killed() ) return; - // Signal thread to terminate and wait until it has acknowledged. + // Signal thread to terminate. SendIn(new FinishMessage(this, network_time), true); } -void MsgThread::OnStop() +void MsgThread::OnWaitForStop() { int signal_count = 0; int old_signal_val = signal_val; diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index d5e223d48f..4492320541 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -228,8 +228,8 @@ protected: * */ virtual void Run(); - virtual void OnStop(); - virtual void OnPrepareStop(); + virtual void OnWaitForStop(); + virtual void OnSignalStop(); virtual void OnKill(); private: