diff --git a/scripts/base/frameworks/logging/main.bro b/scripts/base/frameworks/logging/main.bro index 054ad4a30b..82d3fa043b 100644 --- a/scripts/base/frameworks/logging/main.bro +++ b/scripts/base/frameworks/logging/main.bro @@ -189,6 +189,15 @@ export { ## .. bro:see:: Log::add_default_filter Log::remove_default_filter global create_stream: function(id: ID, stream: Stream) : bool; + ## Removes a logging stream completely, stopping all the threads. + ## + ## id: The ID enum to be associated with the new logging stream. + ## + ## Returns: True if a new stream was successfully removed. + ## + ## .. bro:see:: Log:create_stream + global remove_stream: function(id: ID) : bool; + ## Enables a previously disabled logging stream. Disabled streams ## will not be written to until they are enabled again. New streams ## are enabled by default. @@ -442,6 +451,12 @@ function create_stream(id: ID, stream: Stream) : bool return add_default_filter(id); } +function remove_stream(id: ID) : bool + { + delete active_streams[id]; + return __remove_stream(id); + } + function disable_stream(id: ID) : bool { delete active_streams[id]; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 7097fdfa90..933b0b594c 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -747,6 +747,8 @@ bool Manager::RemoveStream(Stream *i) DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); + i->reader->Stop(); + return true; } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index a8528c002d..d28f410de0 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -49,6 +49,15 @@ ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVa backend->Start(); } +void ReaderFrontend::Stop() + { + if ( backend ) + { + backend->SignalStop(); + backend = 0; // Thread manager will clean it up once it finishes. + } + } + ReaderFrontend::~ReaderFrontend() { delete [] name; diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index a93f7703ac..c93b9960a0 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -75,7 +75,7 @@ public: * the corresponding message there. This method must only be called * from the main thread. */ - void Close(); + void Stop(); /** * Disables the reader frontend. From now on, all method calls that diff --git a/src/logging.bif b/src/logging.bif index f7684b7216..1927e149c0 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool return new Val(result, TYPE_BOOL); %} +function Log::__remove_stream%(id: Log::ID%) : bool + %{ + bool result = log_mgr->RemoveStream(id->AsEnumVal()); + return new Val(result, TYPE_BOOL); + %} + function Log::__enable_stream%(id: Log::ID%) : bool %{ bool result = log_mgr->EnableStream(id->AsEnumVal()); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1de7230b71..7848f97c68 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -377,6 +377,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) return true; } +bool Manager::RemoveStream(EnumVal* id) + { + unsigned int idx = id->AsEnum(); + + if ( idx >= streams.size() || ! streams[idx] ) + return false; + + Stream* stream = streams[idx]; + + if ( ! stream ) + return false; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) + { + WriterInfo* winfo = i->second; + + DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'", + winfo->writer->Name(), stream->name.c_str()); + + winfo->writer->Stop(); + delete winfo->writer; + delete winfo; + } + + stream->writers.clear(); + delete stream; + streams[idx] = 0; + + DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str()); + return true; + } + bool Manager::EnableStream(EnumVal* id) { Stream* stream = FindStream(id); @@ -1239,25 +1271,17 @@ bool Manager::Flush(EnumVal* id) return true; } -void Manager::Terminate() +void Manager::FlushBuffers() { - // Make sure we process all the pending rotations. - - while ( rotations_pending > 0 ) - { - thread_mgr->ForceProcessing(); // A blatant layering violation ... - usleep(1000); - } - - if ( rotations_pending < 0 ) - reporter->InternalError("Negative pending log rotations: %d", rotations_pending); - + // Flush out cached entries in Frontend for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) { if ( ! *s ) continue; - Flush((*s)->id); + for ( Stream::WriterMap::iterator i = (*s)->writers.begin(); + i != (*s)->writers.end(); i++ ) + i->second->writer->FlushWriteBuffer(); } } diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 90ad944bc6..bd68bc603f 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -47,6 +47,16 @@ public: */ bool CreateStream(EnumVal* id, RecordVal* stream); + /** + * Remove a log stream, stopping all threads. + * + * @param id The enum value corresponding the log stream. + * + * This methods corresponds directly to the internal BiF defined in + * logging.bif, which just forwards here. + */ + bool RemoveStream(EnumVal* id); + /** * Enables a log log stream. * @@ -140,10 +150,10 @@ public: bool Flush(EnumVal* id); /** - * Prepares the log manager to terminate. This will flush all log - * stream. + * Flushes all buffers that are currently held by writer frontends + * out to the threads. Does not call the thread flush operation. */ - void Terminate(); + void FlushBuffers(); /** * Returns a list of supported output formats. diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index a97f48c1ed..73cba2ff3a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -138,6 +138,12 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); + + if ( backend ) + { + backend->SignalStop(); + backend = 0; // Thread manager will clean it up once it finishes. + } } void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index a4a8dcd415..994c82c513 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -54,7 +54,8 @@ public: /** * Stops all output to this writer. Calling this methods disables all - * message forwarding to the backend. + * message forwarding to the backend and will eventually remove the + * backend thread. * * This method must only be called from the main thread. */ @@ -207,7 +208,7 @@ protected: EnumVal* stream; EnumVal* writer; - WriterBackend* backend; // The backend we have instanatiated. + WriterBackend* backend; // The backend we have instantiated. bool disabled; // True if disabled. bool initialized; // True if initialized. bool buf; // True if buffering is enabled (default). diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 5a1c710a87..ddb63db36f 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -192,6 +192,8 @@ bool Ascii::DoFinish(double network_time) abort(); } + DoFlush(network_time); + ascii_done = true; CloseFile(network_time); diff --git a/src/main.cc b/src/main.cc index b330f36509..ff10afb1ee 100644 --- a/src/main.cc +++ b/src/main.cc @@ -330,7 +330,7 @@ void terminate_bro() mgr.Drain(); file_mgr->Terminate(); - log_mgr->Terminate(); + log_mgr->FlushBuffers(); thread_mgr->Terminate(); mgr.Drain(); diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index c708bb79ef..7f5dbfc56b 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -117,20 +117,7 @@ void BasicThread::Start() OnStart(); } -void BasicThread::PrepareStop() - { - if ( ! started ) - return; - - if ( terminating ) - return; - - DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name); - - OnPrepareStop(); - } - -void BasicThread::Stop() +void BasicThread::SignalStop() { if ( ! started ) return; @@ -140,7 +127,17 @@ void BasicThread::Stop() DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); - OnStop(); + OnSignalStop(); + } + +void BasicThread::WaitForStop() + { + if ( ! started ) + return; + + DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate and process last queue items...", name); + + OnWaitForStop(); terminating = true; } @@ -150,11 +147,12 @@ void BasicThread::Join() if ( ! started ) return; + if ( ! pthread ) + return; + assert(terminating); - DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); - - if ( pthread && pthread_join(pthread, 0) != 0 ) + if ( pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name); diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 100efe8851..117506c418 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -71,32 +71,33 @@ public: void Start(); /** - * Signals the thread to prepare for stopping. This must be called - * before Stop() and allows the thread to trigger shutting down - * without yet blocking for doing so. + * Signals the thread to prepare for stopping, but doesn't block to + * wait for that to happen. Use WaitForStop() for that. + * + * The method lets Terminating() now return true, it does however not + * force the thread to terminate. It's up to the Run() method to to + * query Terminating() and exit eventually. * * Calling this method has no effect if Start() hasn't been executed * yet. * * Only Bro's main thread must call this method. */ - void PrepareStop(); + void SignalStop(); /** - * Signals the thread to stop. The method lets Terminating() now - * return true. It does however not force the thread to terminate. - * It's up to the Run() method to to query Terminating() and exit - * eventually. + * Waits until a thread has stopped after receiving SignalStop(). * * Calling this method has no effect if Start() hasn't been executed - * yet. + * yet. If this is executed without calling SignalStop() first, + * results are undefined. * * Only Bro's main thread must call this method. */ - void Stop(); + void WaitForStop(); /** - * Returns true if Stop() has been called. + * Returns true if WaitForStop() has been called and finished. * * This method is safe to call from any thread. */ @@ -145,18 +146,19 @@ protected: virtual void OnStart() {} /** - * Executed with PrepareStop() (and before OnStop()). This is a hook - * into preparing the thread for stopping. It will be called from - * Bro's main thread before the thread has been signaled to stop. + * Executed with SignalStop(). This is a hook into preparing the + * thread for stopping. It will be called from Bro's main thread + * before the thread has been signaled to stop. */ - virtual void OnPrepareStop() {} + virtual void OnSignalStop() {} /** - * Executed with Stop() (and after OnPrepareStop()). This is a hook - * into stopping the thread. It will be called from Bro's main thread - * after the thread has been signaled to stop. + * Executed with WaitForStop(). This is a hook into waiting for the + * thread to stop. It must be overridden by derived classes and only + * return once the thread has indeed finished processing. The method + * will be called from Bro's main thread. */ - virtual void OnStop() {} + virtual void OnWaitForStop() = 0; /** * Executed with Kill(). This is a hook into killing the thread. diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index cfc44596e1..1b6cb551e2 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -32,10 +32,10 @@ void Manager::Terminate() // Signal all to stop. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->PrepareStop(); + (*i)->SignalStop(); for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->Stop(); + (*i)->WaitForStop(); // Then join them all. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) @@ -122,15 +122,10 @@ void Manager::Process() if ( do_beat ) t->Heartbeat(); - while ( t->HasOut() && ! t->Killed() ) + while ( t->HasOut() ) { Message* msg = t->RetrieveOut(); - - if ( ! msg ) - { - assert(t->Killed()); - break; - } + assert(msg); if ( msg->Process() ) { @@ -141,13 +136,40 @@ void Manager::Process() else { reporter->Error("%s failed, terminating thread", msg->Name()); - t->Stop(); + t->SignalStop(); } delete msg; } } + all_thread_list to_delete; + + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + { + BasicThread* t = *i; + + if ( ! t->Killed() ) + continue; + + to_delete.push_back(t); + } + + for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ ) + { + BasicThread* t = *i; + + all_threads.remove(t); + + MsgThread* mt = dynamic_cast(t); + + if ( mt ) + msg_threads.remove(mt); + + t->Join(); + delete t; + } + // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); } diff --git a/src/threading/Manager.h b/src/threading/Manager.h index b46a06a46e..e839749a91 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -68,12 +68,6 @@ public: */ int NumThreads() const { return all_threads.size(); } - /** Manually triggers processing of any thread input. This can be useful - * if the main thread is waiting for a specific message from a child. - * Usually, though, one should avoid using it. - */ - void ForceProcessing() { Process(); } - /** * Signals a specific threads to terminate immediately. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6c63c5a287..c713f65986 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -30,6 +30,20 @@ private: double network_time; }; +// Signals main thread that operations shut down. +class FinishedMessage : public OutputMessage +{ +public: + FinishedMessage(MsgThread* thread) + : OutputMessage("FinishedMessage", thread) + { } + + virtual bool Process() { + Object()->main_finished = true; + return true; + } +}; + /// Sends a heartbeat to the child thread. class HeartbeatMessage : public InputMessage { @@ -153,7 +167,8 @@ bool ReporterMessage::Process() MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) { cnt_sent_in = cnt_sent_out = 0; - finished = false; + main_finished = false; + child_finished = false; failed = false; thread_mgr->AddMsgThread(this); } @@ -161,16 +176,16 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) // Set by Bro's main signal handler. extern int signal_val; -void MsgThread::OnPrepareStop() +void MsgThread::OnSignalStop() { - if ( finished || Killed() ) + if ( main_finished || Killed() ) return; - // Signal thread to terminate and wait until it has acknowledged. + // Signal thread to terminate. SendIn(new FinishMessage(this, network_time), true); } -void MsgThread::OnStop() +void MsgThread::OnWaitForStop() { int signal_count = 0; int old_signal_val = signal_val; @@ -180,7 +195,7 @@ void MsgThread::OnStop() uint64_t last_size = 0; uint64_t cur_size = 0; - while ( ! (finished || Killed() ) ) + while ( ! main_finished ) { // Terminate if we get another kill signal. if ( signal_val == SIGTERM || signal_val == SIGINT ) @@ -205,9 +220,22 @@ void MsgThread::OnStop() signal_val = 0; } - queue_in.WakeUp(); + if ( ! Killed() ) + queue_in.WakeUp(); - usleep(1000); + while ( HasOut() ) + { + Message* msg = RetrieveOut(); + assert ( msg ); + + if ( ! msg->Process() ) + reporter->Error("%s failed during thread termination", msg->Name()); + + delete msg; + } + + if ( ! Killed() ) + usleep(1000); } signal_val = old_signal_val; @@ -237,9 +265,8 @@ void MsgThread::HeartbeatInChild() void MsgThread::Finished() { - // This is thread-safe "enough", we're the only one ever writing - // there. - finished = true; + child_finished = true; + SendOut(new FinishedMessage(this)); } void MsgThread::Info(const char* msg) @@ -344,7 +371,7 @@ BasicInputMessage* MsgThread::RetrieveIn() void MsgThread::Run() { - while ( ! (finished || Killed() ) ) + while ( ! (child_finished || Killed() ) ) { BasicInputMessage* msg = RetrieveIn(); @@ -368,10 +395,10 @@ void MsgThread::Run() } } - // In case we haven't send the finish method yet, do it now. Reading + // In case we haven't sent the finish method yet, do it now. Reading // global network_time here should be fine, it isn't changing // anymore. - if ( ! finished && ! Killed() ) + if ( ! child_finished && ! Killed() ) { OnFinish(network_time); Finished(); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index d5e223d48f..c5ba5b676f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -228,8 +228,8 @@ protected: * */ virtual void Run(); - virtual void OnStop(); - virtual void OnPrepareStop(); + virtual void OnWaitForStop(); + virtual void OnSignalStop(); virtual void OnKill(); private: @@ -289,7 +289,8 @@ private: */ bool MightHaveOut() { return queue_out.MaybeReady(); } - /** Flags that the child process has finished processing. Called from child. + /** Sends a message to the main thread signaling that the child process + * has finished processing. Called from child. */ void Finished(); @@ -299,7 +300,8 @@ private: uint64_t cnt_sent_in; // Counts message sent to child. uint64_t cnt_sent_out; // Counts message sent by child. - bool finished; // Set to true by Finished message. + bool main_finished; // Main thread is finished, meaning child_finished propagated back through message queue. + bool child_finished; // Child thread is finished. bool failed; // Set to true when a command failed. }; diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 0ddcda29f7..5988c94042 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -155,14 +155,11 @@ inline Queue::~Queue() template inline T Queue::Get() { - if ( (reader && reader->Killed()) || (writer && writer->Killed()) ) - return 0; - safe_lock(&mutex[read_ptr]); int old_read_ptr = read_ptr; - if ( messages[read_ptr].empty() ) + if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) ) { struct timespec ts; ts.tv_sec = time(0) + 5; @@ -173,6 +170,12 @@ inline T Queue::Get() return 0; } + else if ( messages[read_ptr].empty() ) + { + safe_unlock(&mutex[read_ptr]); + return 0; + } + T data = messages[read_ptr].front(); messages[read_ptr].pop();