Exec module changes/fixes.

- Give Dir::monitor() a param for the polling interval, so different
  dirs can be monitored at different frequencies.

- Fix race in Exec::run() when reading extra output files produced by
  a process -- it was possible for Exec::run() to return before all
  extra output files had been fully read.

- Add test cases.
This commit is contained in:
Jon Siwek 2013-07-23 14:16:39 -05:00
parent 325f0c2a3f
commit 73eb87a41e
10 changed files with 299 additions and 42 deletions

View file

@ -14,6 +14,8 @@ export {
## If additional files are required to be read in as part of the output
## of the command they can be defined here.
read_files: set[string] &optional;
# The unique id for tracking executors.
uid: string &default=unique_id("");
};
type Result: record {
@ -44,14 +46,11 @@ export {
const tmp_dir = "/tmp" &redef;
}
redef record Command += {
# The unique id for tracking executors.
uid: string &optional;
};
# Indexed by command uid.
global results: table[string] of Result;
global pending_commands: set[string];
global pending_files: table[string] of set[string];
global results: table[string] of Result = table();
global finished_commands: set[string];
global currently_tracked_files: set[string] = set();
type OneLine: record {
s: string;
is_stderr: bool;
@ -96,39 +95,63 @@ event Exec::file_line(description: Input::EventDescription, tpe: Input::Event, s
result$files[track_file][|result$files[track_file]|] = s;
}
event Input::end_of_data(name: string, source:string)
{
local parts = split1(name, /_/);
name = parts[1];
if ( name !in pending_commands || |parts| < 2 )
return;
local track_file = parts[2];
Input::remove(name);
if ( name !in pending_files )
delete pending_commands[name];
else
{
delete pending_files[name][track_file];
if ( |pending_files[name]| == 0 )
delete pending_commands[name];
system(fmt("rm \"%s\"", str_shell_escape(track_file)));
}
}
event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool)
{
if ( name !in pending_commands )
return;
Input::remove(name);
results[name]$exit_code = exit_code;
results[name]$signal_exit = signal_exit;
Input::remove(name);
# Indicate to the "when" async watcher that this command is done.
add finished_commands[name];
}
event Exec::start_watching_file(uid: string, read_file: string)
{
Input::add_event([$source=fmt("%s", read_file),
$name=fmt("%s_%s", uid, read_file),
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$want_record=F,
$fields=FileLine,
$ev=Exec::file_line]);
if ( name !in pending_files || |pending_files[name]| == 0 )
# No extra files to read, command is done.
delete pending_commands[name];
else
for ( read_file in pending_files[name] )
Input::add_event([$source=fmt("%s", read_file),
$name=fmt("%s_%s", name, read_file),
$reader=Input::READER_RAW,
$want_record=F,
$fields=FileLine,
$ev=Exec::file_line]);
}
function run(cmd: Command): Result
{
cmd$uid = unique_id("");
add pending_commands[cmd$uid];
results[cmd$uid] = [];
if ( cmd?$read_files )
{
for ( read_file in cmd$read_files )
{
add currently_tracked_files[read_file];
system(fmt("touch \"%s\" 2>/dev/null", str_shell_escape(read_file)));
schedule 1msec { Exec::start_watching_file(cmd$uid, read_file) };
if ( cmd$uid !in pending_files )
pending_files[cmd$uid] = set();
add pending_files[cmd$uid][read_file];
}
}
@ -144,9 +167,8 @@ function run(cmd: Command): Result
$want_record=F,
$config=config_strings]);
return when ( cmd$uid in finished_commands )
return when ( cmd$uid !in pending_commands )
{
delete finished_commands[cmd$uid];
local result = results[cmd$uid];
delete results[cmd$uid];
return result;
@ -155,9 +177,8 @@ function run(cmd: Command): Result
event bro_done()
{
# We are punting here and just deleting any files that haven't been processed yet.
for ( fname in currently_tracked_files )
{
system(fmt("rm \"%s\"", str_shell_escape(fname)));
}
# We are punting here and just deleting any unprocessed files.
for ( uid in pending_files )
for ( fname in pending_files[uid] )
system(fmt("rm \"%s\"", str_shell_escape(fname)));
}