Preventing writers/readers from receiving further messages after a

failure.

Once a writer/reader Do* method has returned false, no further ones
will be executed anymore. This is primarily a safety mechanism to make
it easier for writer/reader authors as otherwise they would often need
to track the failure state themselves (because with the now delayed
termination from the earlier commit, furhter messages can now still
arrive for a little bit).
This commit is contained in:
Robin Sommer 2012-07-26 17:15:10 -07:00
parent 86ae7d8b7c
commit f5862fb014
8 changed files with 63 additions and 7 deletions

13
CHANGES
View file

@ -1,4 +1,17 @@
2.0-891 | 2012-07-26 17:15:10 -0700
* Reader/writer API: preventing plugins from receiving further
messages after a failure. (Robin Sommer)
* New test for input framework that fails to find a file. (Robin
Sommer)
* Improving error handling for threads. (Robin Sommer)
* Tweaking the custom-rotate test to produce stable output. (Robin
Sommer)
2.0-884 | 2012-07-26 14:33:21 -0700
* Add comprehensive error handling for close() calls. (Jon Siwek)

View file

@ -1 +1 @@
2.0-884
2.0-891

View file

@ -191,6 +191,9 @@ void ReaderBackend::SendEntry(Value* *vals)
bool ReaderBackend::Init(const int arg_num_fields,
const threading::Field* const* arg_fields)
{
if ( Failed() )
return true;
num_fields = arg_num_fields;
fields = arg_fields;
@ -210,7 +213,9 @@ bool ReaderBackend::Init(const int arg_num_fields,
bool ReaderBackend::OnFinish(double network_time)
{
if ( ! Failed() )
DoClose();
disabled = true; // frontend disables itself when it gets the Close-message.
SendOut(new ReaderClosedMessage(frontend));
@ -231,6 +236,9 @@ bool ReaderBackend::Update()
if ( disabled )
return false;
if ( Failed() )
return true;
bool success = DoUpdate();
if ( ! success )
DisableFrontend();
@ -248,6 +256,9 @@ void ReaderBackend::DisableFrontend()
bool ReaderBackend::OnHeartbeat(double network_time, double current_time)
{
if ( Failed() )
return true;
return DoHeartbeat(network_time, current_time);
}

View file

@ -174,6 +174,9 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields)
num_fields = arg_num_fields;
fields = arg_fields;
if ( Failed() )
return true;
if ( ! DoInit(*info, arg_num_fields, arg_fields) )
{
DisableFrontend();
@ -222,6 +225,8 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
bool success = true;
if ( ! Failed() )
{
for ( int j = 0; j < num_writes; j++ )
{
success = DoWrite(num_fields, fields, vals[j]);
@ -229,6 +234,7 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
if ( ! success )
break;
}
}
DeleteVals(num_writes, vals);
@ -244,6 +250,9 @@ bool WriterBackend::SetBuf(bool enabled)
// No change.
return true;
if ( Failed() )
return true;
buffering = enabled;
if ( ! DoSetBuf(enabled) )
@ -258,6 +267,9 @@ bool WriterBackend::SetBuf(bool enabled)
bool WriterBackend::Rotate(const char* rotated_path, double open,
double close, bool terminating)
{
if ( Failed() )
return true;
if ( ! DoRotate(rotated_path, open, close, terminating) )
{
DisableFrontend();
@ -269,6 +281,9 @@ bool WriterBackend::Rotate(const char* rotated_path, double open,
bool WriterBackend::Flush(double network_time)
{
if ( Failed() )
return true;
if ( ! DoFlush(network_time) )
{
DisableFrontend();
@ -280,11 +295,17 @@ bool WriterBackend::Flush(double network_time)
bool WriterBackend::OnFinish(double network_time)
{
if ( Failed() )
return true;
return DoFinish(network_time);
}
bool WriterBackend::OnHeartbeat(double network_time, double current_time)
{
if ( Failed() )
return true;
SendOut(new FlushWriteBufferMessage(frontend));
return DoHeartbeat(network_time, current_time);
}

View file

@ -182,6 +182,8 @@ public:
/**
* Disables the frontend that has instantiated this backend. Once
* disabled,the frontend will not send any further message over.
*
* TODO: Do we still need this method (and the corresponding message)?
*/
void DisableFrontend();

View file

@ -154,6 +154,7 @@ MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
{
cnt_sent_in = cnt_sent_out = 0;
finished = false;
failed = false;
thread_mgr->AddMsgThread(this);
}
@ -363,6 +364,7 @@ void MsgThread::Run()
// error messages have been processed by then main
// thread).
SendOut(new KillMeMessage(this));
failed = true;
}
}

View file

@ -201,6 +201,12 @@ protected:
*/
void HeartbeatInChild();
/** Returns true if a child command has reported a failure. In that case, we'll
* be in the process of killing this thread and no further activity
* should carried out. To be called only from this child thread.
*/
bool Failed() const { return failed; }
/**
* Regulatly triggered for execution in the child thread.
*
@ -294,6 +300,7 @@ private:
uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message.
bool failed; // Set to true when a command failed.
};
/**