diff --git a/doc b/doc index 6fe61dbd20..0d99039474 160000 --- a/doc +++ b/doc @@ -1 +1 @@ -Subproject commit 6fe61dbd2045fb766f8649f6c4c7b50618ddd06c +Subproject commit 0d990394746a323f9dcff668e5f0372c4b4d7877 diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 7bd967c79e..2a93148ac3 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -117,4 +117,28 @@ export { ## It's an error to call this function from a process other than ## a supervised one. global node: function(): NodeConfig; + + ## Hooks intercepted stdout stream for all supervisor's child processes. + ## + ## node: the name of a previously created node via + ## :zeek:see:`Supervisor::create` indicating to which + ## child process the stdout line is associated. + ## An empty value is used to indicate the message + ## came from the internal supervisor stem process + ## (this should typically never happen). + ## + ## msg: line-buffered contents from the stdout of a child process. + global stdout_hook: hook(node: string, msg: string); + + ## Hooks intercepted stderr stream for all supervisor's child processes. + ## + ## node: the name of a previously created node via + ## :zeek:see:`Supervisor::create` indicating to which + ## child process the stdout line is associated. + ## A empty value is used to indicate the message + ## came from the internal supervisor stem process. + ## (this should typically never happen). + ## + ## msg: line-buffered contents from the stderr of a child process. + global stderr_hook: hook(node: string, msg: string); } diff --git a/src/supervisor/Supervisor.cc b/src/supervisor/Supervisor.cc index d2aa958eb5..9766b2e9b8 100644 --- a/src/supervisor/Supervisor.cc +++ b/src/supervisor/Supervisor.cc @@ -468,6 +468,9 @@ void Supervisor::HandleChildSignal() void Supervisor::InitPostScript() { + stem_stdout.hook = id::find_func("Supervisor::stdout_hook"); + stem_stderr.hook = id::find_func("Supervisor::stderr_hook"); + iosource_mgr->Register(this); if ( ! iosource_mgr->RegisterFd(signal_flare.FD(), this) ) @@ -498,16 +501,47 @@ void Supervisor::Process() void zeek::detail::LineBufferedPipe::Emit(const char* msg) const { - fprintf(stream, "%s%s\n", prefix.data(), msg); + if ( ! msg[0] ) + // Skip empty lines. + return; + + auto msg_start = msg; + auto do_print = true; + + if ( hook ) + { + auto node = ""; + auto node_len = 0; + + if ( msg[0] == '[' ) + { + auto end = strchr(msg, ']'); + + if ( end ) + { + node = msg + 1; + node_len = end - node; + msg = end + 1; + + if ( msg[0] == ' ' ) + ++msg; + } + } + + auto res = hook->Invoke(make_intrusive(node_len, node), + make_intrusive(msg)); + do_print = res->AsBool(); + } + + if ( do_print ) + fprintf(stream, "%s%s\n", prefix.data(), msg_start); } void zeek::detail::LineBufferedPipe::Drain() { while ( Process() != 0 ); - if ( ! buffer.empty() ) - Emit(buffer.data()); - + Emit(buffer.data()); buffer.clear(); pipe = nullptr; } @@ -529,8 +563,7 @@ size_t zeek::detail::LineBufferedPipe::Process() auto msgs = extract_msgs(&buffer, '\n'); for ( const auto& msg : msgs ) - if ( ! msg.empty() ) - Emit(msg.data()); + Emit(msg.data()); return bytes_read; } diff --git a/src/supervisor/Supervisor.h b/src/supervisor/Supervisor.h index 3c3f777e24..6bf932224b 100644 --- a/src/supervisor/Supervisor.h +++ b/src/supervisor/Supervisor.h @@ -17,6 +17,7 @@ #include "Timer.h" #include "Pipe.h" #include "Flare.h" +#include "Func.h" #include "NetVar.h" #include "IntrusivePtr.h" #include "Options.h" @@ -60,14 +61,21 @@ struct LineBufferedPipe { void Drain(); /** - * Read lines from the pipe and emit them to associate stream. + * Read lines from the pipe and emit them. */ size_t Process(); /** - * Adds a prefix to given data and emits to associate stream. + * Emits a message: either by calling a hook, or if there is no hook + * or the hook returns true (no early "break"), printing it to the + * associated stream. */ void Emit(const char* msg) const; + + /** + * A hook to call when emitting messages read from the pipe. + */ + FuncPtr hook; }; } // namespace zeek::detail diff --git a/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stderr b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stderr new file mode 100644 index 0000000000..0bb6ef5bec --- /dev/null +++ b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stderr @@ -0,0 +1,4 @@ +[supervisor:STDERR] [grault] (stderr) supervised node zeek_init() +received termination signal +[supervisor:STDERR] [grault] received termination signal +[supervisor:STDERR] [grault] (stderr) supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stdout b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stdout new file mode 100644 index 0000000000..f4925ffea4 --- /dev/null +++ b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek..stdout @@ -0,0 +1,5 @@ +hooked stdout of (grault): (stdout) supervised node zeek_init() +hooked stderr of (grault): (stderr) supervised node zeek_init() +hooked stdout of (grault): (stdout) supervised node zeek_done() +hooked stderr of (grault): received termination signal +hooked stderr of (grault): (stderr) supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.output-redirect-hook/zeek.supervisor.out b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek.supervisor.out new file mode 100644 index 0000000000..295c7211d6 --- /dev/null +++ b/testing/btest/Baseline/supervisor.output-redirect-hook/zeek.supervisor.out @@ -0,0 +1,3 @@ +supervisor zeek_init() +destroying node +supervisor zeek_done() diff --git a/testing/btest/supervisor/output-redirect-hook.zeek b/testing/btest/supervisor/output-redirect-hook.zeek new file mode 100644 index 0000000000..81cb0f6f54 --- /dev/null +++ b/testing/btest/supervisor/output-redirect-hook.zeek @@ -0,0 +1,78 @@ +# @TEST-PORT: BROKER_PORT +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff zeek/supervisor.out +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff zeek/.stdout +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff zeek/.stderr + +# This test checks the default stdout/stderr redirection will get intercepted +# by the supervisor process and sent through the hook mechanisms + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global supervisor_output_file: file; +global topic = "test-topic"; +global stderr = open("/dev/stderr"); + +event do_destroy() + { + print supervisor_output_file, "destroying node"; + Supervisor::destroy("grault"); + } + +hook Supervisor::stdout_hook(node: string, msg: string) + { + print fmt("hooked stdout of (%s): %s", node, msg); + break; + } + +hook Supervisor::stderr_hook(node: string, msg: string) + { + print fmt("hooked stderr of (%s): %s", node, msg); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + supervisor_output_file = open("supervisor.out"); + print supervisor_output_file, "supervisor zeek_init()"; + local sn = Supervisor::NodeConfig($name="grault", $directory="qux"); + local res = Supervisor::create(sn); + + if ( res != "" ) + print supervisor_output_file, res; + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + print "(stdout) supervised node zeek_init()"; + print stderr, "(stderr) supervised node zeek_init()"; + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + # Should only be run by supervisor + terminate(); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + { + print "(stdout) supervised node zeek_done()"; + print stderr, "(stderr) supervised node zeek_done()"; + } + else + print supervisor_output_file, "supervisor zeek_done()"; + }