From 39f1b9e01f4b99b575a13e11ebe971dd305016c5 Mon Sep 17 00:00:00 2001 From: Bernhard Amann Date: Tue, 14 May 2013 23:45:55 -0700 Subject: [PATCH] Change thread shutdown again to also work with input framework. Seems to work, tests pass, but not really verified. Major change 1: finished flag in MsgThread was replaced by 2 flags: child_finished and main_finished. child_finished is set by child_thread and means that the processing loop is stopped immediately (no longer needed, no new input messages will be processed, if loop continues running there is an ugly delay on shutdown). (This took me a while to realize...) main_finished is set by a message that is sent back by the child to the main thread when Finished() is called (and child_finished is set). when main_finished is set, processing of output messages stops. But all messages that the child thread pushed in the queue before calling Finish() are still processed. Change 2: Logging terminate call was replaced by a smaller call that just flushes out the cache held by the main thread. This call has to be done before thread shutdown is called - otherwhise the threads will be shut down before all messages are pushed on them. (This also took me a while to realize...). Change 3: Input framework actually calls it stop methods correctly (everything was prepared, function call was missing) --- src/input/Manager.cc | 2 ++ src/input/ReaderFrontend.cc | 9 ++++++++ src/input/ReaderFrontend.h | 4 ++-- src/input/readers/Ascii.cc | 6 ++++-- src/logging/Manager.cc | 19 ++++++---------- src/logging/Manager.h | 6 +++--- src/logging/WriterFrontend.h | 2 +- src/logging/writers/Ascii.cc | 2 ++ src/main.cc | 2 +- src/threading/Manager.h | 6 ------ src/threading/MsgThread.cc | 42 ++++++++++++++++++++++++++++-------- src/threading/MsgThread.h | 6 ++++-- 12 files changed, 67 insertions(+), 39 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index f5d0e2693c..b905958b51 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -742,6 +742,8 @@ bool Manager::RemoveStream(Stream *i) DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s", i->name.c_str()); + + //i->reader->Stop(); return true; } diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index a8528c002d..d28f410de0 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -49,6 +49,15 @@ ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVa backend->Start(); } +void ReaderFrontend::Stop() + { + if ( backend ) + { + backend->SignalStop(); + backend = 0; // Thread manager will clean it up once it finishes. + } + } + ReaderFrontend::~ReaderFrontend() { delete [] name; diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index a93f7703ac..0b4b27ec44 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -67,7 +67,7 @@ public: * This method must only be called from the main thread. */ void Update(); - + /** * Finalizes reading from this stream. * @@ -75,7 +75,7 @@ public: * the corresponding message there. This method must only be called * from the main thread. */ - void Close(); + void Stop(); /** * Disables the reader frontend. From now on, all method calls that diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 9a855b102b..dacebe3eb5 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -73,8 +73,10 @@ Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) Ascii::~Ascii() { - DoClose(); - delete ascii; + /* printf("Destructor called\n"); + if ( file ) + DoClose(); + delete ascii; */ } void Ascii::DoClose() diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 67e7d998ed..eba94db467 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1268,25 +1268,18 @@ bool Manager::Flush(EnumVal* id) return true; } -void Manager::Terminate() +void Manager::FlushBuffers() { - // Make sure we process all the pending rotations. - - while ( rotations_pending > 0 ) - { - thread_mgr->ForceProcessing(); // A blatant layering violation ... - usleep(1000); - } - - if ( rotations_pending < 0 ) - reporter->InternalError("Negative pending log rotations: %d", rotations_pending); - + // Flush out cached entries in Frontend for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) { if ( ! *s ) continue; - Flush((*s)->id); + for ( Stream::WriterMap::iterator i = (*s)->writers.begin(); + i != (*s)->writers.end(); i++ ) + i->second->writer->FlushWriteBuffer(); + } } diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 5b5f8014e3..bd68bc603f 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -150,10 +150,10 @@ public: bool Flush(EnumVal* id); /** - * Prepares the log manager to terminate. This will flush all log - * stream. + * Flushes all buffers that are currently held by writer frontends + * out to the threads. Does not call the thread flush operation. */ - void Terminate(); + void FlushBuffers(); /** * Returns a list of supported output formats. diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 5bcde21a5e..994c82c513 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -208,7 +208,7 @@ protected: EnumVal* stream; EnumVal* writer; - WriterBackend* backend; // The backend we have instanatiated. + WriterBackend* backend; // The backend we have instantiated. bool disabled; // True if disabled. bool initialized; // True if initialized. bool buf; // True if buffering is enabled (default). diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 5a1c710a87..ddb63db36f 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -192,6 +192,8 @@ bool Ascii::DoFinish(double network_time) abort(); } + DoFlush(network_time); + ascii_done = true; CloseFile(network_time); diff --git a/src/main.cc b/src/main.cc index 2232180c5e..57f8e62ea3 100644 --- a/src/main.cc +++ b/src/main.cc @@ -319,7 +319,7 @@ void terminate_bro() mgr.Drain(); - log_mgr->Terminate(); + log_mgr->FlushBuffers(); thread_mgr->Terminate(); mgr.Drain(); diff --git a/src/threading/Manager.h b/src/threading/Manager.h index b46a06a46e..e839749a91 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -68,12 +68,6 @@ public: */ int NumThreads() const { return all_threads.size(); } - /** Manually triggers processing of any thread input. This can be useful - * if the main thread is waiting for a specific message from a child. - * Usually, though, one should avoid using it. - */ - void ForceProcessing() { Process(); } - /** * Signals a specific threads to terminate immediately. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 902ea36410..b1d78485e0 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -30,6 +30,21 @@ private: double network_time; }; +// Signals main thread that operations shut down +class FinishedMessage : public OutputMessage +{ +public: + FinishedMessage(MsgThread* thread) + : OutputMessage("FinishedMessage", thread) + { } + + virtual bool Process() { + Object()->main_finished = true; + return true; + } +}; + + /// Sends a heartbeat to the child thread. class HeartbeatMessage : public InputMessage { @@ -153,7 +168,8 @@ bool ReporterMessage::Process() MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) { cnt_sent_in = cnt_sent_out = 0; - finished = false; + main_finished = false; + child_finished = false; failed = false; thread_mgr->AddMsgThread(this); } @@ -163,7 +179,7 @@ extern int signal_val; void MsgThread::OnSignalStop() { - if ( finished || Killed() ) + if ( main_finished || Killed() ) return; // Signal thread to terminate. @@ -180,7 +196,7 @@ void MsgThread::OnWaitForStop() uint64_t last_size = 0; uint64_t cur_size = 0; - while ( ! (finished || Killed() ) ) + while ( ! (main_finished || Killed() ) ) { // Terminate if we get another kill signal. if ( signal_val == SIGTERM || signal_val == SIGINT ) @@ -206,6 +222,15 @@ void MsgThread::OnWaitForStop() } queue_in.WakeUp(); + while ( HasOut() && !Killed() ) + { + Message* msg = RetrieveOut(); + assert ( msg ); + if ( !msg->Process() ) + reporter->Error("%s failed during thread termination", msg->Name()); + + delete msg; + } usleep(1000); } @@ -237,9 +262,8 @@ void MsgThread::HeartbeatInChild() void MsgThread::Finished() { - // This is thread-safe "enough", we're the only one ever writing - // there. - finished = true; + child_finished = true; + SendOut(new FinishedMessage(this)); } void MsgThread::Info(const char* msg) @@ -344,7 +368,7 @@ BasicInputMessage* MsgThread::RetrieveIn() void MsgThread::Run() { - while ( ! (finished || Killed() ) ) + while ( ! (child_finished || Killed() ) ) { BasicInputMessage* msg = RetrieveIn(); @@ -368,10 +392,10 @@ void MsgThread::Run() } } - // In case we haven't send the finish method yet, do it now. Reading + // In case we haven't sent the finish method yet, do it now. Reading // global network_time here should be fine, it isn't changing // anymore. - if ( ! finished && ! Killed() ) + if ( ! child_finished && ! Killed() ) { OnFinish(network_time); Finished(); diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 4492320541..295498f58f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -289,7 +289,8 @@ private: */ bool MightHaveOut() { return queue_out.MaybeReady(); } - /** Flags that the child process has finished processing. Called from child. + /** Sends a message to the main thread signaling that the child process + * has finished processing. Called from child. */ void Finished(); @@ -299,7 +300,8 @@ private: uint64_t cnt_sent_in; // Counts message sent to child. uint64_t cnt_sent_out; // Counts message sent by child. - bool finished; // Set to true by Finished message. + bool main_finished; // main thread is finished. Means child_finished propagated back through message queue. + bool child_finished; // child thread is finished bool failed; // Set to true when a command failed. };