mirror of
https://github.com/zeek/zeek.git
synced 2025-10-13 12:08:20 +00:00
Updates to use new input framework mechanism to execute command line programs.
This commit is contained in:
parent
f2ac938603
commit
035b668f73
1 changed files with 60 additions and 100 deletions
|
@ -23,6 +23,8 @@ export {
|
|||
type Result: record {
|
||||
## Exit code from the program.
|
||||
exit_code: count &default=0;
|
||||
## True if the command was terminated with a signal.
|
||||
signal_exit: bool &default=F;
|
||||
## Each line of standard out.
|
||||
stdout: vector of string &optional;
|
||||
## Each line of standard error.
|
||||
|
@ -41,39 +43,45 @@ export {
|
|||
## returns: A record representing the full results from the
|
||||
## external program execution.
|
||||
global run: function(cmd: Command): Result;
|
||||
|
||||
## The system directory for temp files.
|
||||
const tmp_dir = "/tmp" &redef;
|
||||
}
|
||||
|
||||
redef record Command += {
|
||||
# The prefix name for tracking temp files.
|
||||
prefix_name: string &optional;
|
||||
# The unique id for tracking executors.
|
||||
uid: string &optional;
|
||||
};
|
||||
|
||||
global results: table[string] of Result = table();
|
||||
global finished_commands: set[string];
|
||||
global tmp_files: set[string] = set();
|
||||
global currently_tracked_files: set[string] = set();
|
||||
type OneLine: record {
|
||||
s: string;
|
||||
is_stderr: bool;
|
||||
};
|
||||
|
||||
type OneLine: record { line: string; };
|
||||
type FileLine: record {
|
||||
s: string;
|
||||
};
|
||||
|
||||
event Exec::stdout_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||
event Exec::line(description: Input::EventDescription, tpe: Input::Event, s: string, is_stderr: bool)
|
||||
{
|
||||
local name = sub(description$name, /_[^_]*$/, "");
|
||||
|
||||
local result = results[name];
|
||||
if ( ! results[name]?$stdout )
|
||||
result$stdout = vector(s);
|
||||
local result = results[description$name];
|
||||
if ( is_stderr )
|
||||
{
|
||||
if ( ! result?$stderr )
|
||||
result$stderr = vector(s);
|
||||
else
|
||||
result$stderr[|result$stderr|] = s;
|
||||
}
|
||||
else
|
||||
result$stdout[|result$stdout|] = s;
|
||||
}
|
||||
|
||||
event Exec::stderr_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||
{
|
||||
local name = sub(description$name, /_[^_]*$/, "");
|
||||
|
||||
local result = results[name];
|
||||
if ( ! results[name]?$stderr )
|
||||
result$stderr = vector(s);
|
||||
else
|
||||
result$stderr[|result$stderr|] = s;
|
||||
{
|
||||
if ( ! result?$stdout )
|
||||
result$stdout = vector(s);
|
||||
else
|
||||
result$stdout[|result$stdout|] = s;
|
||||
}
|
||||
}
|
||||
|
||||
event Exec::file_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||
|
@ -92,107 +100,59 @@ event Exec::file_line(description: Input::EventDescription, tpe: Input::Event, s
|
|||
result$files[track_file][|result$files[track_file]|] = s;
|
||||
}
|
||||
|
||||
event Exec::cleanup_and_do_callback(name: string)
|
||||
event InputRaw::process_finished(name: string, source:string, exit_code:count, signal_exit:bool)
|
||||
{
|
||||
Input::remove(fmt("%s_stdout", name));
|
||||
system(fmt("rm %s_stdout", name));
|
||||
delete tmp_files[fmt("%s_stdout", name)];
|
||||
|
||||
Input::remove(fmt("%s_stderr", name));
|
||||
system(fmt("rm %s_stderr", name));
|
||||
delete tmp_files[fmt("%s_stderr", name)];
|
||||
|
||||
Input::remove(fmt("%s_done", name));
|
||||
system(fmt("rm %s_done", name));
|
||||
delete tmp_files[fmt("%s_done", name)];
|
||||
results[name]$exit_code = exit_code;
|
||||
results[name]$signal_exit = signal_exit;
|
||||
|
||||
Input::remove(name);
|
||||
# Indicate to the "when" async watcher that this command is done.
|
||||
add finished_commands[name];
|
||||
}
|
||||
|
||||
event Exec::run_done(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||
event Exec::start_watching_file(uid: string, read_file: string)
|
||||
{
|
||||
local name = sub(description$name, /_[^_]*$/, "");
|
||||
|
||||
if ( /^exit_code:/ in s )
|
||||
results[name]$exit_code = to_count(split1(s, /:/)[2]);
|
||||
else if ( s == "done" )
|
||||
# Wait one second to allow all threads to read all of their input
|
||||
# and forward it.
|
||||
schedule 1sec { Exec::cleanup_and_do_callback(name) };
|
||||
}
|
||||
|
||||
event Exec::start_watching_files(cmd: Command)
|
||||
{
|
||||
Input::add_event([$source=fmt("%s_done", cmd$prefix_name),
|
||||
$name=fmt("%s_done", cmd$prefix_name),
|
||||
Input::add_event([$source=fmt("%s", read_file),
|
||||
$name=fmt("%s_%s", uid, read_file),
|
||||
$reader=Input::READER_RAW,
|
||||
$mode=Input::STREAM,
|
||||
$want_record=F,
|
||||
$fields=OneLine,
|
||||
$ev=Exec::run_done]);
|
||||
|
||||
Input::add_event([$source=fmt("%s_stdout", cmd$prefix_name),
|
||||
$name=fmt("%s_stdout", cmd$prefix_name),
|
||||
$reader=Input::READER_RAW,
|
||||
$mode=Input::STREAM,
|
||||
$want_record=F,
|
||||
$fields=OneLine,
|
||||
$ev=Exec::stdout_line]);
|
||||
|
||||
Input::add_event([$source=fmt("%s_stderr", cmd$prefix_name),
|
||||
$name=fmt("%s_stderr", cmd$prefix_name),
|
||||
$reader=Input::READER_RAW,
|
||||
$mode=Input::STREAM,
|
||||
$want_record=F,
|
||||
$fields=OneLine,
|
||||
$ev=Exec::stderr_line]);
|
||||
|
||||
if ( cmd?$read_files )
|
||||
{
|
||||
for ( read_file in cmd$read_files )
|
||||
{
|
||||
Input::add_event([$source=fmt("%s", read_file),
|
||||
$name=fmt("%s_%s", cmd$prefix_name, read_file),
|
||||
$reader=Input::READER_RAW,
|
||||
$mode=Input::STREAM,
|
||||
$want_record=F,
|
||||
$fields=OneLine,
|
||||
$ev=Exec::file_line]);
|
||||
}
|
||||
}
|
||||
$fields=FileLine,
|
||||
$ev=Exec::file_line]);
|
||||
}
|
||||
|
||||
function run(cmd: Command): Result
|
||||
{
|
||||
cmd$prefix_name = "/tmp/bro-exec-" + unique_id("");
|
||||
system(fmt("touch %s_done %s_stdout %s_stderr 2>/dev/null", cmd$prefix_name, cmd$prefix_name, cmd$prefix_name));
|
||||
add tmp_files[fmt("%s_done", cmd$prefix_name)];
|
||||
add tmp_files[fmt("%s_stdout", cmd$prefix_name)];
|
||||
add tmp_files[fmt("%s_stderr", cmd$prefix_name)];
|
||||
cmd$uid = unique_id("");
|
||||
results[cmd$uid] = [];
|
||||
|
||||
if ( cmd?$read_files )
|
||||
{
|
||||
for ( read_file in cmd$read_files )
|
||||
{
|
||||
system(fmt("touch %s 2>/dev/null", read_file));
|
||||
add tmp_files[read_file];
|
||||
add currently_tracked_files[read_file];
|
||||
system(fmt("touch \"%s\" 2>/dev/null", str_shell_escape(read_file)));
|
||||
schedule 1msec { Exec::start_watching_file(cmd$uid, read_file) };
|
||||
}
|
||||
}
|
||||
|
||||
piped_exec(fmt("%s 2>> %s_stderr 1>> %s_stdout; echo \"exit_code:${?}\" >> %s_done; echo \"done\" >> %s_done",
|
||||
cmd$cmd, cmd$prefix_name, cmd$prefix_name, cmd$prefix_name, cmd$prefix_name),
|
||||
cmd$stdin);
|
||||
local config_strings: table[string] of string = {
|
||||
["stdin"] = cmd$stdin,
|
||||
["read_stderr"] = "1",
|
||||
};
|
||||
Input::add_event([$name=cmd$uid,
|
||||
$source=fmt("%s |", cmd$cmd),
|
||||
$reader=Input::READER_RAW,
|
||||
$fields=Exec::OneLine,
|
||||
$ev=Exec::line,
|
||||
$want_record=F,
|
||||
$config=config_strings]);
|
||||
|
||||
results[cmd$prefix_name] = [];
|
||||
|
||||
schedule 1msec { Exec::start_watching_files(cmd) };
|
||||
|
||||
return when ( cmd$prefix_name in finished_commands )
|
||||
return when ( cmd$uid in finished_commands )
|
||||
{
|
||||
delete finished_commands[cmd$prefix_name];
|
||||
local result = results[cmd$prefix_name];
|
||||
delete results[cmd$prefix_name];
|
||||
delete finished_commands[cmd$uid];
|
||||
local result = results[cmd$uid];
|
||||
delete results[cmd$uid];
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -200,7 +160,7 @@ function run(cmd: Command): Result
|
|||
event bro_done()
|
||||
{
|
||||
# We are punting here and just deleting any files that haven't been processed yet.
|
||||
for ( fname in tmp_files )
|
||||
for ( fname in currently_tracked_files )
|
||||
{
|
||||
system(fmt("rm \"%s\"", str_shell_escape(fname)));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue