Merge branch 'topic/seth/input-thread-behavior'

* topic/seth/input-thread-behavior:
  Minor documentation fixes.
  Ascii reader error changes - fix small bugs
  Tiny fix to correct a warning message.
  Input's ascii reader is now more resilient.
  Another resilient Ascii reader checkpoint.
  In progress on ascii writer behavior change.
This commit is contained in:
Seth Hall 2017-03-09 12:23:02 -05:00
commit b9c8bae0fd
13 changed files with 292 additions and 36 deletions

10
CHANGES
View file

@ -1,4 +1,14 @@
2.5-101 | 2017-03-09 12:20:11 -0500
* Input's framework's ascii reader is now more resilient.
By default, the ASCII reader does not fail on errors anymore.
If there is a problem parsing a line, a reporter warning is
written and parsing continues. If the file is missing or can't
be read, the input thread just tries again on the next heartbeat.
(Seth Hall, Johanna Amann)
2.5-92 | 2017-03-03 10:44:14 -0800 2.5-92 | 2017-03-03 10:44:14 -0800
* Move most threading to C++11 primitives (mostly). (Johanna Amann) * Move most threading to C++11 primitives (mostly). (Johanna Amann)

View file

@ -1 +1 @@
2.5-92 2.5-101

View file

@ -18,4 +18,33 @@ export {
## String to use for an unset &optional field. ## String to use for an unset &optional field.
const unset_field = Input::unset_field &redef; const unset_field = Input::unset_field &redef;
## Fail on invalid lines. If set to false, the ascii
## input reader will jump over invalid lines, reporting
## warnings in reporter.log. If set to true, errors in
## input lines will be handled as fatal errors for the
## reader thread; reading will abort immediately and
## an error will be logged to reporter.log.
## Individual readers can use a different value using
## the $config table.
## fail_on_invalid_lines = T was the default behavior
## until Bro 2.6.
const fail_on_invalid_lines = F &redef;
## Fail on file read problems. If set to true, the ascii
## input reader will fail when encountering any problems
## while reading a file different from invalid lines.
## Examples of such problems are permission problems, or
## missing files.
## When set to false, these problems will be ignored. This
## has an especially big effect for the REREAD mode, which will
## seamlessly recover from read errors when a file is
## only temporarily inaccessible. For MANUAL or STREAM files,
## errors will most likely still be fatal since no automatic
## re-reading of the file is attempted.
## Individual readers can use a different value using
## the $config table.
## fail_on_file_problem = T was the default behavior
## until Bro 2.6.
const fail_on_file_problem = F &redef;
} }

View file

@ -61,6 +61,8 @@ void Ascii::DoClose()
bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
suppress_warnings = false;
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
BifConst::InputAscii::separator->Len()); BifConst::InputAscii::separator->Len());
@ -73,6 +75,9 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(),
BifConst::InputAscii::unset_field->Len()); BifConst::InputAscii::unset_field->Len());
fail_on_invalid_lines = BifConst::InputAscii::fail_on_invalid_lines;
fail_on_file_problem = BifConst::InputAscii::fail_on_file_problem;
// Set per-filter configuration options. // Set per-filter configuration options.
for ( ReaderInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ ) for ( ReaderInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
{ {
@ -87,6 +92,12 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
else if ( strcmp(i->first, "unset_field") == 0 ) else if ( strcmp(i->first, "unset_field") == 0 )
unset_field.assign(i->second); unset_field.assign(i->second);
else if ( strcmp(i->first, "fail_on_invalid_lines") == 0 )
fail_on_invalid_lines = (strncmp(i->second, "T", 1) == 0);
else if ( strcmp(i->first, "fail_on_file_problem") == 0 )
fail_on_file_problem = (strncmp(i->second, "T", 1) == 0);
} }
if ( separator.size() != 1 ) if ( separator.size() != 1 )
@ -98,26 +109,51 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
formatter::Ascii::SeparatorInfo sep_info(separator, set_separator, unset_field, empty_field); formatter::Ascii::SeparatorInfo sep_info(separator, set_separator, unset_field, empty_field);
formatter = unique_ptr<threading::formatter::Formatter>(new formatter::Ascii(this, sep_info)); formatter = unique_ptr<threading::formatter::Formatter>(new formatter::Ascii(this, sep_info));
file.open(info.source); return DoUpdate();
}
void Ascii::FailWarn(bool is_error, const char *msg, bool suppress_future)
{
if ( is_error )
Error(msg);
else
{
// suppress error message when we are already in error mode.
// There is no reason to repeat it every second.
if ( ! suppress_warnings )
Warning(msg);
if ( suppress_future )
suppress_warnings = true;
}
}
bool Ascii::OpenFile()
{
if ( file.is_open() )
return true;
file.open(Info().source);
if ( ! file.is_open() ) if ( ! file.is_open() )
{ {
Error(Fmt("Init: cannot open %s", info.source)); FailWarn(fail_on_file_problem, Fmt("Init: cannot open %s", Info().source), true);
return false;
return ! fail_on_file_problem;
} }
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", info.source)); FailWarn(fail_on_file_problem, Fmt("Init: cannot open %s; problem reading file header", Info().source), true);
file.close(); file.close();
return false; return ! fail_on_file_problem;
} }
DoUpdate(); suppress_warnings = false;
return true; return true;
} }
bool Ascii::ReadHeader(bool useCached) bool Ascii::ReadHeader(bool useCached)
{ {
// try to read the header line... // try to read the header line...
@ -128,7 +164,8 @@ bool Ascii::ReadHeader(bool useCached)
{ {
if ( ! GetLine(line) ) if ( ! GetLine(line) )
{ {
Error("could not read first line"); FailWarn(fail_on_file_problem, Fmt("Could not read input data file %s; first line could not be read",
Info().source), true);
return false; return false;
} }
@ -170,8 +207,9 @@ bool Ascii::ReadHeader(bool useCached)
continue; continue;
} }
Error(Fmt("Did not find requested field %s in input data file %s.", FailWarn(fail_on_file_problem, Fmt("Did not find requested field %s in input data file %s.",
field->name, Info().source)); field->name, Info().source), true);
return false; return false;
} }
@ -182,8 +220,9 @@ bool Ascii::ReadHeader(bool useCached)
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name); map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() ) if ( fit2 == ifields.end() )
{ {
Error(Fmt("Could not find requested port type field %s in input data file.", FailWarn(fail_on_file_problem, Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name)); field->secondary_name), true);
return false; return false;
} }
@ -193,7 +232,6 @@ bool Ascii::ReadHeader(bool useCached)
columnMap.push_back(f); columnMap.push_back(f);
} }
// well, that seems to have worked... // well, that seems to have worked...
return true; return true;
} }
@ -224,6 +262,9 @@ bool Ascii::GetLine(string& str)
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() bool Ascii::DoUpdate()
{ {
if ( ! OpenFile() )
return ! fail_on_file_problem;
switch ( Info().mode ) { switch ( Info().mode ) {
case MODE_REREAD: case MODE_REREAD:
{ {
@ -231,8 +272,10 @@ bool Ascii::DoUpdate()
struct stat sb; struct stat sb;
if ( stat(Info().source, &sb) == -1 ) if ( stat(Info().source, &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", Info().source)); FailWarn(fail_on_file_problem, Fmt("Could not get stat for %s", Info().source), true);
return false;
file.close();
return ! fail_on_file_problem;
} }
if ( sb.st_mtime <= mtime ) // no change if ( sb.st_mtime <= mtime ) // no change
@ -254,8 +297,10 @@ bool Ascii::DoUpdate()
if ( Info().mode == MODE_STREAM ) if ( Info().mode == MODE_STREAM )
{ {
file.clear(); // remove end of file evil bits file.clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) if ( ! ReadHeader(true) )
return false; // header reading failed {
return ! fail_on_file_problem; // header reading failed
}
break; break;
} }
@ -263,17 +308,7 @@ bool Ascii::DoUpdate()
file.close(); file.close();
} }
file.open(Info().source); OpenFile();
if ( ! file.is_open() )
{
Error(Fmt("cannot open %s", Info().source));
return false;
}
if ( ReadHeader(false) == false )
{
return false;
}
break; break;
} }
@ -327,14 +362,23 @@ bool Ascii::DoUpdate()
if ( (*fit).position > pos || (*fit).secondary_position > pos ) if ( (*fit).position > pos || (*fit).secondary_position > pos )
{ {
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", FailWarn(fail_on_invalid_lines, Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d",
line.c_str(), pos, (*fit).position, (*fit).secondary_position)); line.c_str(), pos, (*fit).position, (*fit).secondary_position));
for ( int i = 0; i < fpos; i++ ) if ( fail_on_invalid_lines )
delete fields[i]; {
for ( int i = 0; i < fpos; i++ )
delete fields[i];
delete [] fields; delete [] fields;
return false;
return false;
}
else
{
error = true;
break;
}
} }
Value* val = formatter->ParseValue(stringfields[(*fit).position], (*fit).name, (*fit).type, (*fit).subtype); Value* val = formatter->ParseValue(stringfields[(*fit).position], (*fit).name, (*fit).type, (*fit).subtype);
@ -390,6 +434,9 @@ bool Ascii::DoUpdate()
bool Ascii::DoHeartbeat(double network_time, double current_time) bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
if ( ! OpenFile() )
return ! fail_on_file_problem;
switch ( Info().mode ) switch ( Info().mode )
{ {
case MODE_MANUAL: case MODE_MANUAL:

View file

@ -55,6 +55,11 @@ protected:
private: private:
bool ReadHeader(bool useCached); bool ReadHeader(bool useCached);
bool GetLine(string& str); bool GetLine(string& str);
bool OpenFile();
// Call Warning or Error, depending on the is_error boolean.
// In case of a warning, setting suppress_future to true will suppress all future warnings
// (by setting suppress_warnings to true, until suppress_warnings is set back to false)
void FailWarn(bool is_error, const char *msg, bool suppress_future = false);
ifstream file; ifstream file;
time_t mtime; time_t mtime;
@ -70,6 +75,12 @@ private:
string set_separator; string set_separator;
string empty_field; string empty_field;
string unset_field; string unset_field;
bool fail_on_invalid_lines;
bool fail_on_file_problem;
// this is an internal indicator in case the read is currently in a failed state
// it's used to suppress duplicate error messages.
bool suppress_warnings;
std::unique_ptr<threading::formatter::Formatter> formatter; std::unique_ptr<threading::formatter::Formatter> formatter;
}; };

View file

@ -5,3 +5,5 @@ const separator: string;
const set_separator: string; const set_separator: string;
const empty_field: string; const empty_field: string;
const unset_field: string; const unset_field: string;
const fail_on_invalid_lines: bool;
const fail_on_file_problem: bool;

View file

@ -0,0 +1,26 @@
{
[-43] = [b=T, e=SSH::LOG, c=21, p=123/unknown, sn=10.0.0.0/24, a=1.2.3.4, d=3.14, t=1315801931.273616, iv=100.0, s=hurz, ns=4242 HOHOHO, sc={
2,
4,
1,
3
}, ss={
BB,
AA,
CC
}, se={
}, vc=[10, 20, 30], ve=[]],
[-42] = [b=T, e=SSH::LOG, c=21, p=123/unknown, sn=10.0.0.0/24, a=1.2.3.4, d=3.14, t=1315801931.273616, iv=100.0, s=hurz, ns=4242, sc={
2,
4,
1,
3
}, ss={
BB,
AA,
CC
}, se={
}, vc=[10, 20, 30], ve=[]]
}

View file

@ -0,0 +1,8 @@
warning: ../does-not-exist.dat/Input::READER_ASCII: Init: cannot open ../does-not-exist.dat
warning: ../does-not-exist.dat/Input::READER_ASCII: Init: cannot open ../does-not-exist.dat
warning: ../does-not-exist.dat/Input::READER_ASCII: Init: cannot open ../does-not-exist.dat
error: ../does-not-exist.dat/Input::READER_ASCII: Init: cannot open ../does-not-exist.dat
error: ../does-not-exist.dat/Input::READER_ASCII: Init failed
error: ../does-not-exist.dat/Input::READER_ASCII: terminating thread
warning: ../does-not-exist.dat/Input::READER_ASCII: Could not get stat for ../does-not-exist.dat
received termination signal

View file

@ -0,0 +1,5 @@
now it does
and more!
now it does
and more!
Streaming still works

View file

@ -0,0 +1,67 @@
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
redef InputAscii::fail_on_invalid_lines = F;
@TEST-START-FILE input.log
#separator \x09
#path ssh
#fields b i e c p sn a d t iv s sc ss se vc ve ns
#types bool int enum count port subnet addr double time interval string table table table vector vector string
T -42 SSH::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1315801931.273616 100.000000 hurz 2,4,1,3 CC,AA,BB EMPTY 10,20,30
T -42 SSH::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1315801931.273616 100.000000 hurz 2,4,1,3 CC,AA,BB EMPTY 10,20,30 EMPTY 4242
T -43 SSH::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1315801931.273616 100.000000 hurz 2,4,1,3 CC,AA,BB EMPTY 10,20,30 EMPTY 4242 HOHOHO
T -41
@TEST-END-FILE
@load base/protocols/ssh
global outfile: file;
redef InputAscii::empty_field = "EMPTY";
module A;
type Idx: record {
i: int;
};
type Val: record {
b: bool;
e: Log::ID;
c: count;
p: port;
sn: subnet;
a: addr;
d: double;
t: time;
iv: interval;
s: string;
ns: string;
sc: set[count];
ss: set[string];
se: set[string];
vc: vector of int;
ve: vector of int;
};
global servers: table[int] of Val = table();
global servers2: table[int] of Val = table();
event bro_init()
{
outfile = open("../out");
# first read in the old stuff into the table...
Input::add_table([$source="../input.log", $name="ssh", $idx=Idx, $val=Val, $destination=servers]);
Input::add_table([$source="../input.log", $name="ssh2", $idx=Idx, $val=Val, $destination=servers2, $config=table(["fail_on_invalid_lines"] = "T")]);
}
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
Input::remove("ssh");
close(outfile);
terminate();
}

View file

@ -13,6 +13,7 @@
@TEST-END-FILE @TEST-END-FILE
redef exit_only_after_terminate = T; redef exit_only_after_terminate = T;
redef InputAscii::fail_on_invalid_lines = T;
global outfile: file; global outfile: file;

View file

@ -0,0 +1,49 @@
# This tests files that don't exist initially and then do later during
# runtime to make sure the ascii reader is resilient to files missing.
# It does a second test at the same time which configures the old
# failing behavior.
# @TEST-EXEC: btest-bg-run bro bro %INPUT
# @TEST-EXEC: sleep 2; cp does-exist.dat does-not-exist.dat
# @TEST-EXEC: sleep 2; mv does-not-exist.dat does-not-exist-again.dat; echo "Streaming still works" >> does-not-exist-again.dat
# @TEST-EXEC: btest-bg-wait -k 3
# @TEST-EXEC: btest-diff bro/.stdout
# @TEST-EXEC: btest-diff bro/.stderr
@TEST-START-FILE does-exist.dat
#separator \x09
#fields line
#types string
now it does
and more!
@TEST-END-FILE
redef exit_only_after_terminate = T;
@load base/frameworks/input
module A;
type Val: record {
line: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, v: Val)
{
print v$line;
}
event line2(description: Input::EventDescription, tpe: Input::Event, v: Val)
{
print "DONT PRINT THIS LINE";
}
event bro_init()
{
Input::add_event([$source="../does-not-exist.dat", $name="input", $reader=Input::READER_ASCII, $mode=Input::REREAD, $fields=Val, $ev=line, $want_record=T]);
Input::add_event([$source="../does-not-exist.dat", $name="inputstream", $reader=Input::READER_ASCII, $mode=Input::STREAM, $fields=Val, $ev=line, $want_record=T]);
Input::add_event([$source="../does-not-exist.dat", $name="inputmanual", $reader=Input::READER_ASCII, $mode=Input::MANUAL, $fields=Val, $ev=line, $want_record=T]);
Input::add_event([$source="../does-not-exist.dat", $name="input2", $reader=Input::READER_ASCII, $mode=Input::REREAD, $fields=Val, $ev=line2, $want_record=T,
$config=table(["fail_on_file_problem"] = "T")]);
}

View file

@ -3,6 +3,7 @@
# @TEST-EXEC: btest-diff bro/.stderr # @TEST-EXEC: btest-diff bro/.stderr
redef exit_only_after_terminate = T; redef exit_only_after_terminate = T;
redef InputAscii::fail_on_file_problem = T;
global outfile: file; global outfile: file;
global try: count; global try: count;