diff --git a/CHANGES b/CHANGES index 3fe0fa2b73..44a3edc3c6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,17 @@ +2.0-891 | 2012-07-26 17:15:10 -0700 + + * Reader/writer API: preventing plugins from receiving further + messages after a failure. (Robin Sommer) + + * New test for input framework that fails to find a file. (Robin + Sommer) + + * Improving error handling for threads. (Robin Sommer) + + * Tweaking the custom-rotate test to produce stable output. (Robin + Sommer) + 2.0-884 | 2012-07-26 14:33:21 -0700 * Add comprehensive error handling for close() calls. (Jon Siwek) diff --git a/VERSION b/VERSION index ced5c78870..b97bde7b8d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0-884 +2.0-891 diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 88a78c3cd7..81060be7d5 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -191,6 +191,9 @@ void ReaderBackend::SendEntry(Value* *vals) bool ReaderBackend::Init(const int arg_num_fields, const threading::Field* const* arg_fields) { + if ( Failed() ) + return true; + num_fields = arg_num_fields; fields = arg_fields; @@ -210,7 +213,9 @@ bool ReaderBackend::Init(const int arg_num_fields, bool ReaderBackend::OnFinish(double network_time) { - DoClose(); + if ( ! Failed() ) + DoClose(); + disabled = true; // frontend disables itself when it gets the Close-message. SendOut(new ReaderClosedMessage(frontend)); @@ -231,6 +236,9 @@ bool ReaderBackend::Update() if ( disabled ) return false; + if ( Failed() ) + return true; + bool success = DoUpdate(); if ( ! success ) DisableFrontend(); @@ -248,6 +256,9 @@ void ReaderBackend::DisableFrontend() bool ReaderBackend::OnHeartbeat(double network_time, double current_time) { + if ( Failed() ) + return true; + return DoHeartbeat(network_time, current_time); } diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 2933062eff..afdc4b99c5 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -174,6 +174,9 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields) num_fields = arg_num_fields; fields = arg_fields; + if ( Failed() ) + return true; + if ( ! DoInit(*info, arg_num_fields, arg_fields) ) { DisableFrontend(); @@ -222,12 +225,15 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) bool success = true; - for ( int j = 0; j < num_writes; j++ ) + if ( ! Failed() ) { - success = DoWrite(num_fields, fields, vals[j]); + for ( int j = 0; j < num_writes; j++ ) + { + success = DoWrite(num_fields, fields, vals[j]); - if ( ! success ) - break; + if ( ! success ) + break; + } } DeleteVals(num_writes, vals); @@ -244,6 +250,9 @@ bool WriterBackend::SetBuf(bool enabled) // No change. return true; + if ( Failed() ) + return true; + buffering = enabled; if ( ! DoSetBuf(enabled) ) @@ -258,6 +267,9 @@ bool WriterBackend::SetBuf(bool enabled) bool WriterBackend::Rotate(const char* rotated_path, double open, double close, bool terminating) { + if ( Failed() ) + return true; + if ( ! DoRotate(rotated_path, open, close, terminating) ) { DisableFrontend(); @@ -269,6 +281,9 @@ bool WriterBackend::Rotate(const char* rotated_path, double open, bool WriterBackend::Flush(double network_time) { + if ( Failed() ) + return true; + if ( ! DoFlush(network_time) ) { DisableFrontend(); @@ -280,11 +295,17 @@ bool WriterBackend::Flush(double network_time) bool WriterBackend::OnFinish(double network_time) { + if ( Failed() ) + return true; + return DoFinish(network_time); } bool WriterBackend::OnHeartbeat(double network_time, double current_time) { + if ( Failed() ) + return true; + SendOut(new FlushWriteBufferMessage(frontend)); return DoHeartbeat(network_time, current_time); } diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index d5f2be225e..77dbe71f45 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -182,6 +182,8 @@ public: /** * Disables the frontend that has instantiated this backend. Once * disabled,the frontend will not send any further message over. + * + * TODO: Do we still need this method (and the corresponding message)? */ void DisableFrontend(); diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 0ccdd1f569..c471b3db0c 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -92,7 +92,7 @@ void Ascii::CloseFile(double t) bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields) { - assert(! fd); + assert(! fd); string path = info.path; diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index e0f3fd8b0c..6c63c5a287 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -154,6 +154,7 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) { cnt_sent_in = cnt_sent_out = 0; finished = false; + failed = false; thread_mgr->AddMsgThread(this); } @@ -363,6 +364,7 @@ void MsgThread::Run() // error messages have been processed by then main // thread). SendOut(new KillMeMessage(this)); + failed = true; } } diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index da505de6be..e3e7c8500f 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -201,6 +201,12 @@ protected: */ void HeartbeatInChild(); + /** Returns true if a child command has reported a failure. In that case, we'll + * be in the process of killing this thread and no further activity + * should carried out. To be called only from this child thread. + */ + bool Failed() const { return failed; } + /** * Regulatly triggered for execution in the child thread. * @@ -294,6 +300,7 @@ private: uint64_t cnt_sent_out; // Counts message sent by child. bool finished; // Set to true by Finished message. + bool failed; // Set to true when a command failed. }; /**