Another resilient Ascii reader checkpoint.

This works correctly now (as a prototype at least). If a file
disappears, the thread complains once and once the file reappears
the thread will once again begin watching it.
This commit is contained in:
Seth Hall 2017-02-21 23:35:29 -05:00
parent b0d812812f
commit 2b15ec1069
2 changed files with 19 additions and 45 deletions

View file

@ -64,8 +64,6 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
continue_on_failure = true; continue_on_failure = true;
is_failed = false; is_failed = false;
filename = info.source;
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
BifConst::InputAscii::separator->Len()); BifConst::InputAscii::separator->Len());
@ -103,11 +101,10 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
formatter::Ascii::SeparatorInfo sep_info(separator, set_separator, unset_field, empty_field); formatter::Ascii::SeparatorInfo sep_info(separator, set_separator, unset_field, empty_field);
formatter = unique_ptr<threading::formatter::Formatter>(new formatter::Ascii(this, sep_info)); formatter = unique_ptr<threading::formatter::Formatter>(new formatter::Ascii(this, sep_info));
if ( ! OpenFile() ) OpenFile();
{
is_failed = true; if ( is_failed )
return continue_on_failure; return continue_on_failure;
}
DoUpdate(); DoUpdate();
@ -116,22 +113,25 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
bool Ascii::OpenFile() bool Ascii::OpenFile()
{ {
file.open(filename); file.open(Info().source);
if ( ! file.is_open() ) if ( ! file.is_open() )
{ {
Error(Fmt("Init: cannot open %s", filename.c_str())); if ( ! is_failed )
Warning(Fmt("Init: cannot open %s", Info().source));
is_failed = true; is_failed = true;
return continue_on_failure; return continue_on_failure;
} }
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", filename.c_str())); if ( ! is_failed )
Warning(Fmt("Init: cannot open %s; headers are incorrect", Info().source));
file.close(); file.close();
is_failed = true; is_failed = true;
return continue_on_failure; return continue_on_failure;
} }
is_failed = false;
return true; return true;
} }
@ -141,14 +141,10 @@ bool Ascii::ReadHeader(bool useCached)
string line; string line;
map<string, uint32_t> ifields; map<string, uint32_t> ifields;
if ( ! useCached ) if ( headerline == "" )
{ {
if ( ! GetLine(line) ) if ( ! GetLine(line) )
{ return false;
Error("could not read first line");
is_failed = true;
return continue_on_failure;
}
headerline = line; headerline = line;
} }
@ -188,7 +184,7 @@ bool Ascii::ReadHeader(bool useCached)
continue; continue;
} }
Error(Fmt("Did not find requested field %s in input data file %s.", Warning(Fmt("Did not find requested field %s in input data file %s.",
field->name, Info().source)); field->name, Info().source));
is_failed = true; is_failed = true;
@ -202,7 +198,7 @@ bool Ascii::ReadHeader(bool useCached)
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name); map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() ) if ( fit2 == ifields.end() )
{ {
Error(Fmt("Could not find requested port type field %s in input data file.", Warning(Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name)); field->secondary_name));
is_failed = true; is_failed = true;
@ -215,7 +211,6 @@ bool Ascii::ReadHeader(bool useCached)
columnMap.push_back(f); columnMap.push_back(f);
} }
// well, that seems to have worked... // well, that seems to have worked...
return true; return true;
} }
@ -246,12 +241,6 @@ bool Ascii::GetLine(string& str)
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() bool Ascii::DoUpdate()
{ {
if ( is_failed )
if ( ! OpenFile() )
{
printf("do updates after failure?!\n");
}
switch ( Info().mode ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
@ -259,7 +248,8 @@ bool Ascii::DoUpdate()
struct stat sb; struct stat sb;
if ( stat(Info().source, &sb) == -1 ) if ( stat(Info().source, &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", Info().source)); Warning(Fmt("Could not get stat for %s", Info().source));
file.close();
is_failed = true; is_failed = true;
return continue_on_failure; return continue_on_failure;
} }
@ -295,19 +285,7 @@ bool Ascii::DoUpdate()
file.close(); file.close();
} }
file.open(Info().source); OpenFile();
if ( ! file.is_open() )
{
Error(Fmt("cannot open %s", Info().source));
is_failed = true;
return continue_on_failure;
}
if ( ReadHeader(false) == false )
{
is_failed = true;
return continue_on_failure;
}
break; break;
} }
@ -361,7 +339,7 @@ bool Ascii::DoUpdate()
if ( (*fit).position > pos || (*fit).secondary_position > pos ) if ( (*fit).position > pos || (*fit).secondary_position > pos )
{ {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", Warning(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d",
line.c_str(), pos, (*fit).position, (*fit).secondary_position)); line.c_str(), pos, (*fit).position, (*fit).secondary_position));
for ( int i = 0; i < fpos; i++ ) for ( int i = 0; i < fpos; i++ )
@ -426,14 +404,11 @@ bool Ascii::DoUpdate()
bool Ascii::DoHeartbeat(double network_time, double current_time) bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
printf("heartbeat\n");
is_failed = false;
if ( ! file.is_open() ) if ( ! file.is_open() )
OpenFile(); OpenFile();
//if ( is_failed ) if ( is_failed )
// return continue_on_failure; return continue_on_failure;
switch ( Info().mode ) switch ( Info().mode )
{ {

View file

@ -57,7 +57,6 @@ private:
bool GetLine(string& str); bool GetLine(string& str);
bool OpenFile(); bool OpenFile();
string filename;
ifstream file; ifstream file;
time_t mtime; time_t mtime;