From 38e1dc9ca47d97508276a2f7192c5353bb8e6837 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Thu, 14 Mar 2013 14:51:10 -0700 Subject: [PATCH 1/7] Support for cleaning up threads that have terminated. Once a BasicThread leaves its run() method, a thread is now marked for cleaning up, and the ThreadMgr will soon join it to release the OS resources. Also, adding a function Log::remove_stream() that remove a logging stream, stopping all writer threads that are associated with it. Note, however, that removing a *filter* from a stream still doesn't clean up any threads. The problem is that because of the output paths potentially being created dynamically it's unclear if the writer thread will still be needed in the future. We could add clean writers up with timeouts, but that doesn't sound great either. So for now, the only way to sure clean up logging threads is to remove the entire stream. Also note that cleanup doesn't work with input threads yet, which don't seem to terminate (at least in the case I tried). --- scripts/base/frameworks/logging/main.bro | 16 ++++++++++++ src/logging.bif | 6 +++++ src/logging/Manager.cc | 32 ++++++++++++++++++++++++ src/logging/Manager.h | 10 ++++++++ src/logging/WriterFrontend.cc | 7 ++++++ src/logging/WriterFrontend.h | 3 ++- src/threading/BasicThread.cc | 7 +++--- src/threading/Manager.cc | 27 ++++++++++++++++++++ 8 files changed, 104 insertions(+), 4 deletions(-) diff --git a/scripts/base/frameworks/logging/main.bro b/scripts/base/frameworks/logging/main.bro index 054ad4a30b..1126686c13 100644 --- a/scripts/base/frameworks/logging/main.bro +++ b/scripts/base/frameworks/logging/main.bro @@ -189,6 +189,15 @@ export { ## .. bro:see:: Log::add_default_filter Log::remove_default_filter global create_stream: function(id: ID, stream: Stream) : bool; + ## Removes a logging stream completely, stopping all the threads. + ## + ## id: The ID enum to be associated with the new logging stream. + ## + ## Returns: True if a new stream was successfully removed. + ## + ## .. bro:see:: Log:create_stream + global remove_stream: function(id: ID) : bool; + ## Enables a previously disabled logging stream. Disabled streams ## will not be written to until they are enabled again. New streams ## are enabled by default. @@ -442,6 +451,13 @@ function create_stream(id: ID, stream: Stream) : bool return add_default_filter(id); } +function remove_stream(id: ID) : bool + { + delete active_streams[id]; + + return __remove_stream(id); + } + function disable_stream(id: ID) : bool { delete active_streams[id]; diff --git a/src/logging.bif b/src/logging.bif index f5d3e8e3e6..cf97c59cd3 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -18,6 +18,12 @@ function Log::__create_stream%(id: Log::ID, stream: Log::Stream%) : bool return new Val(result, TYPE_BOOL); %} +function Log::__remove_stream%(id: Log::ID%) : bool + %{ + bool result = log_mgr->RemoveStream(id->AsEnumVal()); + return new Val(result, TYPE_BOOL); + %} + function Log::__enable_stream%(id: Log::ID%) : bool %{ bool result = log_mgr->EnableStream(id->AsEnumVal()); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1ab83d84ba..67e7d998ed 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -374,6 +374,38 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) return true; } +bool Manager::RemoveStream(EnumVal* id) + { + unsigned int idx = id->AsEnum(); + + if ( idx >= streams.size() || ! streams[idx] ) + return false; + + Stream* stream = streams[idx]; + + if ( ! stream ) + return false; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) + { + WriterInfo* winfo = i->second; + + DBG_LOG(DBG_LOGGING, "Removed writer '%s' from stream '%s'", + winfo->writer->Name(), stream->name.c_str()); + + winfo->writer->Stop(); + delete winfo->writer; + delete winfo; + } + + stream->writers.clear(); + delete stream; + streams[idx] = 0; + + DBG_LOG(DBG_LOGGING, "Removed logging stream '%s'", stream->name.c_str()); + return true; + } + bool Manager::EnableStream(EnumVal* id) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 90ad944bc6..5b5f8014e3 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -47,6 +47,16 @@ public: */ bool CreateStream(EnumVal* id, RecordVal* stream); + /** + * Remove a log stream, stopping all threads. + * + * @param id The enum value corresponding the log stream. + * + * This methods corresponds directly to the internal BiF defined in + * logging.bif, which just forwards here. + */ + bool RemoveStream(EnumVal* id); + /** * Enables a log log stream. * diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index a97f48c1ed..9ed9f802d3 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -138,6 +138,13 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); + + if ( backend ) + { + backend->PrepareStop(); + backend->Stop(); + backend = 0; // Thread manager will clean it up once it finishes. + } } void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index a4a8dcd415..5bcde21a5e 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -54,7 +54,8 @@ public: /** * Stops all output to this writer. Calling this methods disables all - * message forwarding to the backend. + * message forwarding to the backend and will eventually remove the + * backend thread. * * This method must only be called from the main thread. */ diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index c708bb79ef..b7ec094309 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -150,11 +150,12 @@ void BasicThread::Join() if ( ! started ) return; + if ( ! pthread ) + return; + assert(terminating); - DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); - - if ( pthread && pthread_join(pthread, 0) != 0 ) + if ( pthread_join(pthread, 0) != 0 ) reporter->FatalError("Failure joining thread %s", name); DBG_LOG(DBG_THREADING, "Joined with thread %s", name); diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index cfc44596e1..59eff4fd99 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -148,6 +148,33 @@ void Manager::Process() } } + all_thread_list to_delete; + + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + { + BasicThread* t = *i; + + if ( ! t->Killed() ) + continue; + + to_delete.push_back(t); + } + + for ( all_thread_list::iterator i = to_delete.begin(); i != to_delete.end(); i++ ) + { + BasicThread* t = *i; + + all_threads.remove(t); + + MsgThread* mt = dynamic_cast(t); + + if ( mt ) + msg_threads.remove(mt); + + t->Join(); + delete t; + } + // fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat); } From d11bd56b5d809460327cfc06b52b8c44b5a56f4b Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Fri, 15 Mar 2013 17:54:20 -0700 Subject: [PATCH 2/7] Changing semantics of thread stop methods. PrepareStop() is now SignalStop() and just signals a thread that it should terminate. After that's called, WaitForStop() (formerly Stop()) wait for it to actually finish processing. When stopping writers during operation, we now no longer wait for them to finish. --- src/logging/WriterFrontend.cc | 3 +-- src/threading/BasicThread.cc | 30 +++++++++++++------------- src/threading/BasicThread.h | 40 ++++++++++++++++++----------------- src/threading/Manager.cc | 6 +++--- src/threading/MsgThread.cc | 6 +++--- src/threading/MsgThread.h | 4 ++-- 6 files changed, 45 insertions(+), 44 deletions(-) diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 9ed9f802d3..73cba2ff3a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -141,8 +141,7 @@ void WriterFrontend::Stop() if ( backend ) { - backend->PrepareStop(); - backend->Stop(); + backend->SignalStop(); backend = 0; // Thread manager will clean it up once it finishes. } } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index b7ec094309..09b6e95d7a 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -117,20 +117,7 @@ void BasicThread::Start() OnStart(); } -void BasicThread::PrepareStop() - { - if ( ! started ) - return; - - if ( terminating ) - return; - - DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name); - - OnPrepareStop(); - } - -void BasicThread::Stop() +void BasicThread::SignalStop() { if ( ! started ) return; @@ -140,7 +127,20 @@ void BasicThread::Stop() DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); - OnStop(); + OnSignalStop(); + } + +void BasicThread::WaitForStop() + { + if ( ! started ) + return; + + if ( terminating ) + return; + + DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate ...", name); + + OnWaitForStop(); terminating = true; } diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 100efe8851..cb0108219c 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -71,32 +71,33 @@ public: void Start(); /** - * Signals the thread to prepare for stopping. This must be called - * before Stop() and allows the thread to trigger shutting down - * without yet blocking for doing so. + * Signals the thread to prepare for stopping, but doesn't block to + * wait for that to happen. Use WaitForStop() for that. * + * The method lets Terminating() now return true, it does however not + * force the thread to terminate. It's up to the Run() method to to + * query Terminating() and exit eventually. + * * Calling this method has no effect if Start() hasn't been executed * yet. * * Only Bro's main thread must call this method. */ - void PrepareStop(); + void SignalStop(); /** - * Signals the thread to stop. The method lets Terminating() now - * return true. It does however not force the thread to terminate. - * It's up to the Run() method to to query Terminating() and exit - * eventually. + * Waits until a thread has stopped after receiving SignalStop(). * * Calling this method has no effect if Start() hasn't been executed - * yet. + * yet. If this is executed without calling SignalStop() first, + * results are undefined. * * Only Bro's main thread must call this method. */ - void Stop(); + void WaitForStop(); /** - * Returns true if Stop() has been called. + * Returns true if WaitForStop() has been called and finished. * * This method is safe to call from any thread. */ @@ -145,18 +146,19 @@ protected: virtual void OnStart() {} /** - * Executed with PrepareStop() (and before OnStop()). This is a hook - * into preparing the thread for stopping. It will be called from - * Bro's main thread before the thread has been signaled to stop. + * Executed with SignalStop(). This is a hook into preparing the + * thread for stopping. It will be called from Bro's main thread + * before the thread has been signaled to stop. */ - virtual void OnPrepareStop() {} + virtual void OnSignalStop() {} /** - * Executed with Stop() (and after OnPrepareStop()). This is a hook - * into stopping the thread. It will be called from Bro's main thread - * after the thread has been signaled to stop. + * Executed with WaitForStop(). This is a hook into waiting for the + * thread to stop. It must be overridden by derived classes and only + * return once the thread has indeed finished processing. The method + * will be called from Bro's main thread. */ - virtual void OnStop() {} + virtual void OnWaitForStop() = 0; /** * Executed with Kill(). This is a hook into killing the thread. diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 59eff4fd99..39a6bdce7d 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -32,10 +32,10 @@ void Manager::Terminate() // Signal all to stop. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->PrepareStop(); + (*i)->SignalStop(); for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) - (*i)->Stop(); + (*i)->WaitForStop(); // Then join them all. for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) @@ -141,7 +141,7 @@ void Manager::Process() else { reporter->Error("%s failed, terminating thread", msg->Name()); - t->Stop(); + t->SignalStop(); } delete msg; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 6c63c5a287..902ea36410 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -161,16 +161,16 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) // Set by Bro's main signal handler. extern int signal_val; -void MsgThread::OnPrepareStop() +void MsgThread::OnSignalStop() { if ( finished || Killed() ) return; - // Signal thread to terminate and wait until it has acknowledged. + // Signal thread to terminate. SendIn(new FinishMessage(this, network_time), true); } -void MsgThread::OnStop() +void MsgThread::OnWaitForStop() { int signal_count = 0; int old_signal_val = signal_val; diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index d5e223d48f..4492320541 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -228,8 +228,8 @@ protected: * */ virtual void Run(); - virtual void OnStop(); - virtual void OnPrepareStop(); + virtual void OnWaitForStop(); + virtual void OnSignalStop(); virtual void OnKill(); private: From 39f1b9e01f4b99b575a13e11ebe971dd305016c5 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Tue, 14 May 2013 23:45:55 -0700 Subject: [PATCH 3/7] Change thread shutdown again to also work with input framework. Seems to work, tests pass, but not really verified. Major change 1: finished flag in MsgThread was replaced by 2 flags: child_finished and main_finished. child_finished is set by child_thread and means that the processing loop is stopped immediately (no longer needed, no new input messages will be processed, if loop continues running there is an ugly delay on shutdown). (This took me a while to realize...) main_finished is set by a message that is sent back by the child to the main thread when Finished() is called (and child_finished is set). when main_finished is set, processing of output messages stops. But all messages that the child thread pushed in the queue before calling Finish() are still processed. Change 2: Logging terminate call was replaced by a smaller call that just flushes out the cache held by the main thread. This call has to be done before thread shutdown is called - otherwhise the threads will be shut down before all messages are pushed on them. (This also took me a while to realize...). Change 3: Input framework actually calls it stop methods correctly (everything was prepared, function call was missing) --- src/input/Manager.cc | 2 ++ src/input/ReaderFrontend.cc | 9 ++++++++ src/input/ReaderFrontend.h | 4 ++-- src/input/readers/Ascii.cc | 6 ++++-- src/logging/Manager.cc | 19 ++++++---------- src/logging/Manager.h | 6 +++--- src/logging/WriterFrontend.h | 2 +- src/logging/writers/Ascii.cc | 2 ++ src/main.cc | 2 +- src/threading/Manager.h | 6 ------ src/threading/MsgThread.cc | 42 ++++++++++++++++++++++++++++-------- src/threading/MsgThread.h | 6 ++++-- 12 files changed, 67 insertions(+), 39 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f5d0e2693c..b905958b51 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -742,6 +742,8 @@ bool Manager::RemoveStream(Stream *i) DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); + + //i->reader->Stop(); return true; } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index a8528c002d..d28f410de0 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -49,6 +49,15 @@ ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVa backend->Start(); } +void ReaderFrontend::Stop() + { + if ( backend ) + { + backend->SignalStop(); + backend = 0; // Thread manager will clean it up once it finishes. + } + } + ReaderFrontend::~ReaderFrontend() { delete [] name; diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index a93f7703ac..0b4b27ec44 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -67,7 +67,7 @@ public: * This method must only be called from the main thread. */ void Update(); - + /** * Finalizes reading from this stream. * @@ -75,7 +75,7 @@ public: * the corresponding message there. This method must only be called * from the main thread. */ - void Close(); + void Stop(); /** * Disables the reader frontend. From now on, all method calls that diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 9a855b102b..dacebe3eb5 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -73,8 +73,10 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) Ascii::~Ascii() { - DoClose(); - delete ascii; + /* printf("Destructor called\n"); + if ( file ) + DoClose(); + delete ascii; */ } void Ascii::DoClose() diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 67e7d998ed..eba94db467 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1268,25 +1268,18 @@ bool Manager::Flush(EnumVal* id) return true; } -void Manager::Terminate() +void Manager::FlushBuffers() { - // Make sure we process all the pending rotations. - - while ( rotations_pending > 0 ) - { - thread_mgr->ForceProcessing(); // A blatant layering violation ... - usleep(1000); - } - - if ( rotations_pending < 0 ) - reporter->InternalError("Negative pending log rotations: %d", rotations_pending); - + // Flush out cached entries in Frontend for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) { if ( ! *s ) continue; - Flush((*s)->id); + for ( Stream::WriterMap::iterator i = (*s)->writers.begin(); + i != (*s)->writers.end(); i++ ) + i->second->writer->FlushWriteBuffer(); + } } diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 5b5f8014e3..bd68bc603f 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -150,10 +150,10 @@ public: bool Flush(EnumVal* id); /** - * Prepares the log manager to terminate. This will flush all log - * stream. + * Flushes all buffers that are currently held by writer frontends + * out to the threads. Does not call the thread flush operation. */ - void Terminate(); + void FlushBuffers(); /** * Returns a list of supported output formats. diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 5bcde21a5e..994c82c513 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -208,7 +208,7 @@ protected: EnumVal* stream; EnumVal* writer; - WriterBackend* backend; // The backend we have instanatiated. + WriterBackend* backend; // The backend we have instantiated. bool disabled; // True if disabled. bool initialized; // True if initialized. bool buf; // True if buffering is enabled (default). diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 5a1c710a87..ddb63db36f 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -192,6 +192,8 @@ bool Ascii::DoFinish(double network_time) abort(); } + DoFlush(network_time); + ascii_done = true; CloseFile(network_time); diff --git a/src/main.cc b/src/main.cc index 2232180c5e..57f8e62ea3 100644 --- a/src/main.cc +++ b/src/main.cc @@ -319,7 +319,7 @@ void terminate_bro() mgr.Drain(); - log_mgr->Terminate(); + log_mgr->FlushBuffers(); thread_mgr->Terminate(); mgr.Drain(); diff --git a/src/threading/Manager.h b/src/threading/Manager.h index b46a06a46e..e839749a91 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -68,12 +68,6 @@ public: */ int NumThreads() const { return all_threads.size(); } - /** Manually triggers processing of any thread input. This can be useful - * if the main thread is waiting for a specific message from a child. - * Usually, though, one should avoid using it. - */ - void ForceProcessing() { Process(); } - /** * Signals a specific threads to terminate immediately. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 902ea36410..b1d78485e0 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -30,6 +30,21 @@ private: double network_time; }; +// Signals main thread that operations shut down +class FinishedMessage : public OutputMessage +{ +public: + FinishedMessage(MsgThread* thread) + : OutputMessage("FinishedMessage", thread) + { } + + virtual bool Process() { + Object()->main_finished = true; + return true; + } +}; + + /// Sends a heartbeat to the child thread. class HeartbeatMessage : public InputMessage { @@ -153,7 +168,8 @@ bool ReporterMessage::Process() MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) { cnt_sent_in = cnt_sent_out = 0; - finished = false; + main_finished = false; + child_finished = false; failed = false; thread_mgr->AddMsgThread(this); } @@ -163,7 +179,7 @@ extern int signal_val; void MsgThread::OnSignalStop() { - if ( finished || Killed() ) + if ( main_finished || Killed() ) return; // Signal thread to terminate. @@ -180,7 +196,7 @@ void MsgThread::OnWaitForStop() uint64_t last_size = 0; uint64_t cur_size = 0; - while ( ! (finished || Killed() ) ) + while ( ! (main_finished || Killed() ) ) { // Terminate if we get another kill signal. if ( signal_val == SIGTERM || signal_val == SIGINT ) @@ -206,6 +222,15 @@ void MsgThread::OnWaitForStop() } queue_in.WakeUp(); + while ( HasOut() && !Killed() ) + { + Message* msg = RetrieveOut(); + assert ( msg ); + if ( !msg->Process() ) + reporter->Error("%s failed during thread termination", msg->Name()); + + delete msg; + } usleep(1000); } @@ -237,9 +262,8 @@ void MsgThread::HeartbeatInChild() void MsgThread::Finished() { - // This is thread-safe "enough", we're the only one ever writing - // there. - finished = true; + child_finished = true; + SendOut(new FinishedMessage(this)); } void MsgThread::Info(const char* msg) @@ -344,7 +368,7 @@ BasicInputMessage* MsgThread::RetrieveIn() void MsgThread::Run() { - while ( ! (finished || Killed() ) ) + while ( ! (child_finished || Killed() ) ) { BasicInputMessage* msg = RetrieveIn(); @@ -368,10 +392,10 @@ void MsgThread::Run() } } - // In case we haven't send the finish method yet, do it now. Reading + // In case we haven't sent the finish method yet, do it now. Reading // global network_time here should be fine, it isn't changing // anymore. - if ( ! finished && ! Killed() ) + if ( ! child_finished && ! Killed() ) { OnFinish(network_time); Finished(); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 4492320541..295498f58f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -289,7 +289,8 @@ private: */ bool MightHaveOut() { return queue_out.MaybeReady(); } - /** Flags that the child process has finished processing. Called from child. + /** Sends a message to the main thread signaling that the child process + * has finished processing. Called from child. */ void Finished(); @@ -299,7 +300,8 @@ private: uint64_t cnt_sent_in; // Counts message sent to child. uint64_t cnt_sent_out; // Counts message sent by child. - bool finished; // Set to true by Finished message. + bool main_finished; // main thread is finished. Means child_finished propagated back through message queue. + bool child_finished; // child thread is finished bool failed; // Set to true when a command failed. }; From b947394990720032ac7f374f7c9d1902ed4485b9 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 15 May 2013 00:49:57 -0700 Subject: [PATCH 4/7] spoke to soon (forgot to comment in line again). Now it should work. However - this commit changes a basic assumption of the threading queue. This basic assumption is, that nothing can be read out of the out-queue of a dead thread. I think that reading out of the queue of a dead thread makes perfect sense (when the thread shuts down, pushes the rest of its work on the queue and says bye, and wants the main thread to pick it up afterwards) - however, I guess one can be of a differing opinion here. In any case, it makes stuff a bit easier to understand - in my opinion. It took me a while to find out why the messages disappear in thin air and never arrive in the main thread ;) --- src/input/Manager.cc | 2 +- src/input/readers/Ascii.cc | 6 ++---- src/threading/Manager.cc | 9 ++------- src/threading/MsgThread.cc | 2 +- src/threading/Queue.h | 7 +++---- 5 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index b905958b51..e32f758620 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -743,7 +743,7 @@ bool Manager::RemoveStream(Stream *i) DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); - //i->reader->Stop(); + i->reader->Stop(); return true; } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index dacebe3eb5..a2f36497ab 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -73,10 +73,8 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) Ascii::~Ascii() { - /* printf("Destructor called\n"); - if ( file ) - DoClose(); - delete ascii; */ + DoClose(); + delete ascii; } void Ascii::DoClose() diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 39a6bdce7d..1b6cb551e2 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -122,15 +122,10 @@ void Manager::Process() if ( do_beat ) t->Heartbeat(); - while ( t->HasOut() && ! t->Killed() ) + while ( t->HasOut() ) { Message* msg = t->RetrieveOut(); - - if ( ! msg ) - { - assert(t->Killed()); - break; - } + assert(msg); if ( msg->Process() ) { diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index b1d78485e0..3a68c4195f 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -333,7 +333,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) { delete msg; return; - } + } queue_out.Put(msg); diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 0ddcda29f7..4ca8f9cd92 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -155,14 +155,11 @@ inline Queue::~Queue() template inline T Queue::Get() { - if ( (reader && reader->Killed()) || (writer && writer->Killed()) ) - return 0; - safe_lock(&mutex[read_ptr]); int old_read_ptr = read_ptr; - if ( messages[read_ptr].empty() ) + if ( messages[read_ptr].empty() && !( (reader && reader->Killed()) || (writer && writer->Killed())) ) { struct timespec ts; ts.tv_sec = time(0) + 5; @@ -172,6 +169,8 @@ inline T Queue::Get() safe_unlock(&mutex[read_ptr]); return 0; } + else if ( messages[read_ptr].empty() ) + return 0; T data = messages[read_ptr].front(); messages[read_ptr].pop(); From 808639fc5fb81d949b533aeeaff484fb78b1d469 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 15 May 2013 00:56:27 -0700 Subject: [PATCH 5/7] it is apparently getting a bit late for changes at important code... --- src/threading/Queue.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 4ca8f9cd92..4393cd6737 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -170,7 +170,10 @@ inline T Queue::Get() return 0; } else if ( messages[read_ptr].empty() ) + { + safe_unlock(&mutex[read_ptr]); return 0; + } T data = messages[read_ptr].front(); messages[read_ptr].pop(); From 37566e73d8bf2310686b44ea45ea533a48b0700c Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 15 May 2013 15:36:34 -0700 Subject: [PATCH 6/7] hopefully finally fix last interesting race-condition --- src/threading/MsgThread.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 3a68c4195f..1e52d61427 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -196,7 +196,7 @@ void MsgThread::OnWaitForStop() uint64_t last_size = 0; uint64_t cur_size = 0; - while ( ! (main_finished || Killed() ) ) + while ( ! main_finished ) { // Terminate if we get another kill signal. if ( signal_val == SIGTERM || signal_val == SIGINT ) @@ -221,8 +221,10 @@ void MsgThread::OnWaitForStop() signal_val = 0; } - queue_in.WakeUp(); - while ( HasOut() && !Killed() ) + if ( !Killed() ) + queue_in.WakeUp(); + + while ( HasOut() ) { Message* msg = RetrieveOut(); assert ( msg ); From 7bf456c11a08898c010b0148dec564b22ab4afc3 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Wed, 15 May 2013 15:55:38 -0700 Subject: [PATCH 7/7] and just to be really sure - always make threads go through OnWaitForStop --- src/threading/BasicThread.cc | 5 +---- src/threading/MsgThread.cc | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 09b6e95d7a..7f5dbfc56b 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -135,10 +135,7 @@ void BasicThread::WaitForStop() if ( ! started ) return; - if ( terminating ) - return; - - DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate ...", name); + DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate and process last queue items...", name); OnWaitForStop(); diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 1e52d61427..553641c282 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -234,7 +234,8 @@ void MsgThread::OnWaitForStop() delete msg; } - usleep(1000); + if ( !Killed() ) + usleep(1000); } signal_val = old_signal_val;