diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 577003926b..b816327e9c 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -141,9 +141,6 @@ void WriterFrontend::Stop() { FlushWriteBuffer(); SetDisable(); - - if ( backend ) - backend->Stop(); } void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields) diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index 6581fb1c1b..e8f3d06d6c 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -50,7 +50,7 @@ public: /** * Stops all output to this writer. Calling this methods disables all - * message forwarding to the backend and stops the backend thread. + * message forwarding to the backend. * * This method must only be called from the main thread. */ diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 45fbf6afa5..f101d0ca3c 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -152,12 +152,13 @@ MsgThread::MsgThread() : BasicThread() { cnt_sent_in = cnt_sent_out = 0; finished = false; + stopped = false; thread_mgr->AddMsgThread(this); } void MsgThread::OnStop() { - if ( finished ) + if ( stopped ) return; // Signal thread to terminate and wait until it has acknowledged. @@ -303,13 +304,8 @@ BasicInputMessage* MsgThread::RetrieveIn() void MsgThread::Run() { - while ( true ) + while ( ! finished ) { - // When requested to terminate, we only do so when - // all input has been processed. - if ( Terminating() && ! queue_in.Ready() ) - break; - BasicInputMessage* msg = RetrieveIn(); bool result = msg->Process(); @@ -318,12 +314,13 @@ void MsgThread::Run() { string s = msg->Name() + " failed, terminating thread (MsgThread)"; Error(s.c_str()); - Stop(); break; } delete msg; } + + Finished(); } void MsgThread::GetStats(Stats* stats) diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 67ab9517c5..d929c1f806 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -293,6 +293,7 @@ private: uint64_t cnt_sent_out; // Counts message sent by child. bool finished; // Set to true by Finished message. + bool stopped; // Set to true by OnStop(). }; /** diff --git a/src/util.cc b/src/util.cc index 3cfa5fca1c..b7a4683597 100644 --- a/src/util.cc +++ b/src/util.cc @@ -1290,6 +1290,28 @@ uint64 calculate_unique_id(size_t pool) return HashKey::HashBytes(&(uid_pool[pool].key), sizeof(uid_pool[pool].key)); } +bool safe_write(int fd, const char* data, int len) + { + return true; + while ( len > 0 ) + { + int n = write(fd, data, len); + + if ( n < 0 ) + { + if ( errno == EINTR ) + continue; + + return false; + } + + data += n; + len -= n; + } + + return true; + } + void out_of_memory(const char* where) { reporter->FatalError("out of memory in %s.\n", where); diff --git a/src/util.h b/src/util.h index e4c995f45f..075c2af7c2 100644 --- a/src/util.h +++ b/src/util.h @@ -289,6 +289,11 @@ inline size_t pad_size(size_t size) #define padded_sizeof(x) (pad_size(sizeof(x))) +// Like write() but handles interrupted system calls by restarting. Returns +// true if the write was successful, otherwise sets errno. This function is +// thread-safe as long as no two threads write to the same descriptor. +extern bool safe_write(int fd, const char* data, int len); + extern void out_of_memory(const char* where); inline void* safe_realloc(void* ptr, size_t size) @@ -338,4 +343,5 @@ inline int safe_vsnprintf(char* str, size_t size, const char* format, va_list al // handed out by malloc. extern void get_memory_usage(unsigned int* total, unsigned int* malloced); + #endif diff --git a/testing/scripts/diff-canonifier b/testing/scripts/diff-canonifier index 3cb213a3f7..4d04b3372c 100755 --- a/testing/scripts/diff-canonifier +++ b/testing/scripts/diff-canonifier @@ -2,4 +2,4 @@ # # Default canonifier used with the tests in testing/btest/*. -`dirname $0`/diff-remove-timestamps +`dirname $0`/diff-remove-timestamps | grep -v XXX