Input's ascii reader is now more resilient.

By default, the ASCII reader does not fail on errors anymore.
If there is a problem parsing a line, a reporter warning is
written and parsing continues.  If the file is missing or can't
be read, the input thread just tries again on the next heartbeat.

Options have been added to recreate the previous behavior...

const InputAscii::fail_on_invalid_lines: bool;
and
const InputAscii::fail_on_file_problem: bool;

They are both set to `F` by default which makes the input readers
resilient to failure.
This commit is contained in:
Seth Hall 2017-02-23 23:13:12 -05:00
parent 2b15ec1069
commit 75744d22bc
8 changed files with 127 additions and 32 deletions

View file

@ -61,7 +61,6 @@ void Ascii::DoClose()
bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{
continue_on_failure = true;
is_failed = false;
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
@ -76,6 +75,9 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(),
BifConst::InputAscii::unset_field->Len());
fail_on_invalid_lines = BifConst::InputAscii::fail_on_invalid_lines;
fail_on_file_problem = BifConst::InputAscii::fail_on_file_problem;
// Set per-filter configuration options.
for ( ReaderInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
{
@ -90,6 +92,12 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
else if ( strcmp(i->first, "unset_field") == 0 )
unset_field.assign(i->second);
else if ( strcmp(i->first, "fail_on_invalid_lines") == 0 )
fail_on_invalid_lines = (strncmp(i->second, "T", 1) == 0);
else if ( strcmp(i->first, "fail_on_file_problem") == 0 )
fail_on_file_problem = (strncmp(i->second, "T", 1) == 0);
}
if ( separator.size() != 1 )
@ -101,34 +109,40 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
formatter::Ascii::SeparatorInfo sep_info(separator, set_separator, unset_field, empty_field);
formatter = unique_ptr<threading::formatter::Formatter>(new formatter::Ascii(this, sep_info));
OpenFile();
if ( is_failed )
return continue_on_failure;
DoUpdate();
return true;
}
void Ascii::FailWarn(bool is_error, const char *msg)
{
if ( is_error )
Error(msg);
else
Warning(msg);
}
bool Ascii::OpenFile()
{
if ( file.is_open() && ! is_failed )
return true;
file.open(Info().source);
if ( ! file.is_open() )
{
if ( ! is_failed )
Warning(Fmt("Init: cannot open %s", Info().source));
FailWarn(fail_on_file_problem, Fmt("Init: cannot open %s", Info().source));
is_failed = true;
return continue_on_failure;
return !fail_on_file_problem;
}
if ( ReadHeader(false) == false )
{
if ( ! is_failed )
Warning(Fmt("Init: cannot open %s; headers are incorrect", Info().source));
FailWarn(fail_on_file_problem, Fmt("Init: cannot open %s; headers are incorrect", Info().source));
file.close();
is_failed = true;
return continue_on_failure;
return !fail_on_file_problem;
}
is_failed = false;
@ -141,7 +155,7 @@ bool Ascii::ReadHeader(bool useCached)
string line;
map<string, uint32_t> ifields;
if ( headerline == "" )
if ( ! useCached )
{
if ( ! GetLine(line) )
return false;
@ -184,11 +198,12 @@ bool Ascii::ReadHeader(bool useCached)
continue;
}
Warning(Fmt("Did not find requested field %s in input data file %s.",
field->name, Info().source));
if ( ! is_failed )
FailWarn(fail_on_file_problem, Fmt("Did not find requested field %s in input data file %s.",
field->name, Info().source));
is_failed = true;
return continue_on_failure;
return !fail_on_file_problem;
}
FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]);
@ -198,11 +213,12 @@ bool Ascii::ReadHeader(bool useCached)
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() )
{
Warning(Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name));
if ( ! is_failed )
FailWarn(fail_on_file_problem, Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name));
is_failed = true;
return continue_on_failure;
return !fail_on_file_problem;
}
f.secondary_position = ifields[field->secondary_name];
@ -241,6 +257,9 @@ bool Ascii::GetLine(string& str)
// read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate()
{
if ( ! OpenFile() )
return !fail_on_file_problem;
switch ( Info().mode ) {
case MODE_REREAD:
{
@ -248,10 +267,11 @@ bool Ascii::DoUpdate()
struct stat sb;
if ( stat(Info().source, &sb) == -1 )
{
Warning(Fmt("Could not get stat for %s", Info().source));
if ( ! is_failed )
FailWarn(fail_on_file_problem, Fmt("Could not get stat for %s", Info().source));
file.close();
is_failed = true;
return continue_on_failure;
return !fail_on_file_problem;
}
if ( sb.st_mtime <= mtime ) // no change
@ -276,7 +296,7 @@ bool Ascii::DoUpdate()
if ( !ReadHeader(true) )
{
is_failed = true;
return continue_on_failure; // header reading failed
return !fail_on_file_problem; // header reading failed
}
break;
@ -339,7 +359,7 @@ bool Ascii::DoUpdate()
if ( (*fit).position > pos || (*fit).secondary_position > pos )
{
Warning(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d",
FailWarn(fail_on_invalid_lines, Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d",
line.c_str(), pos, (*fit).position, (*fit).secondary_position));
for ( int i = 0; i < fpos; i++ )
@ -347,8 +367,15 @@ bool Ascii::DoUpdate()
delete [] fields;
is_failed = true;
return continue_on_failure;
if ( fail_on_invalid_lines )
{
return false;
}
else
{
error = true;
break;
}
}
Value* val = formatter->ParseValue(stringfields[(*fit).position], (*fit).name, (*fit).type, (*fit).subtype);
@ -404,11 +431,8 @@ bool Ascii::DoUpdate()
bool Ascii::DoHeartbeat(double network_time, double current_time)
{
if ( ! file.is_open() )
OpenFile();
if ( is_failed )
return continue_on_failure;
if ( ! OpenFile() )
return !fail_on_file_problem;
switch ( Info().mode )
{