mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Add Supervisor::{stdout,stderr}_hook
These allow capturing/handling the stdout/stderr of child processes via Zeek scripts.
This commit is contained in:
parent
a06ef66edc
commit
10709c627b
8 changed files with 164 additions and 9 deletions
2
doc
2
doc
|
@ -1 +1 @@
|
||||||
Subproject commit 6fe61dbd2045fb766f8649f6c4c7b50618ddd06c
|
Subproject commit 0d990394746a323f9dcff668e5f0372c4b4d7877
|
|
@ -117,4 +117,28 @@ export {
|
||||||
## It's an error to call this function from a process other than
|
## It's an error to call this function from a process other than
|
||||||
## a supervised one.
|
## a supervised one.
|
||||||
global node: function(): NodeConfig;
|
global node: function(): NodeConfig;
|
||||||
|
|
||||||
|
## Hooks intercepted stdout stream for all supervisor's child processes.
|
||||||
|
##
|
||||||
|
## node: the name of a previously created node via
|
||||||
|
## :zeek:see:`Supervisor::create` indicating to which
|
||||||
|
## child process the stdout line is associated.
|
||||||
|
## An empty value is used to indicate the message
|
||||||
|
## came from the internal supervisor stem process
|
||||||
|
## (this should typically never happen).
|
||||||
|
##
|
||||||
|
## msg: line-buffered contents from the stdout of a child process.
|
||||||
|
global stdout_hook: hook(node: string, msg: string);
|
||||||
|
|
||||||
|
## Hooks intercepted stderr stream for all supervisor's child processes.
|
||||||
|
##
|
||||||
|
## node: the name of a previously created node via
|
||||||
|
## :zeek:see:`Supervisor::create` indicating to which
|
||||||
|
## child process the stdout line is associated.
|
||||||
|
## A empty value is used to indicate the message
|
||||||
|
## came from the internal supervisor stem process.
|
||||||
|
## (this should typically never happen).
|
||||||
|
##
|
||||||
|
## msg: line-buffered contents from the stderr of a child process.
|
||||||
|
global stderr_hook: hook(node: string, msg: string);
|
||||||
}
|
}
|
||||||
|
|
|
@ -468,6 +468,9 @@ void Supervisor::HandleChildSignal()
|
||||||
|
|
||||||
void Supervisor::InitPostScript()
|
void Supervisor::InitPostScript()
|
||||||
{
|
{
|
||||||
|
stem_stdout.hook = id::find_func("Supervisor::stdout_hook");
|
||||||
|
stem_stderr.hook = id::find_func("Supervisor::stderr_hook");
|
||||||
|
|
||||||
iosource_mgr->Register(this);
|
iosource_mgr->Register(this);
|
||||||
|
|
||||||
if ( ! iosource_mgr->RegisterFd(signal_flare.FD(), this) )
|
if ( ! iosource_mgr->RegisterFd(signal_flare.FD(), this) )
|
||||||
|
@ -498,16 +501,47 @@ void Supervisor::Process()
|
||||||
|
|
||||||
void zeek::detail::LineBufferedPipe::Emit(const char* msg) const
|
void zeek::detail::LineBufferedPipe::Emit(const char* msg) const
|
||||||
{
|
{
|
||||||
fprintf(stream, "%s%s\n", prefix.data(), msg);
|
if ( ! msg[0] )
|
||||||
|
// Skip empty lines.
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto msg_start = msg;
|
||||||
|
auto do_print = true;
|
||||||
|
|
||||||
|
if ( hook )
|
||||||
|
{
|
||||||
|
auto node = "";
|
||||||
|
auto node_len = 0;
|
||||||
|
|
||||||
|
if ( msg[0] == '[' )
|
||||||
|
{
|
||||||
|
auto end = strchr(msg, ']');
|
||||||
|
|
||||||
|
if ( end )
|
||||||
|
{
|
||||||
|
node = msg + 1;
|
||||||
|
node_len = end - node;
|
||||||
|
msg = end + 1;
|
||||||
|
|
||||||
|
if ( msg[0] == ' ' )
|
||||||
|
++msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto res = hook->Invoke(make_intrusive<StringVal>(node_len, node),
|
||||||
|
make_intrusive<StringVal>(msg));
|
||||||
|
do_print = res->AsBool();
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( do_print )
|
||||||
|
fprintf(stream, "%s%s\n", prefix.data(), msg_start);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zeek::detail::LineBufferedPipe::Drain()
|
void zeek::detail::LineBufferedPipe::Drain()
|
||||||
{
|
{
|
||||||
while ( Process() != 0 );
|
while ( Process() != 0 );
|
||||||
|
|
||||||
if ( ! buffer.empty() )
|
|
||||||
Emit(buffer.data());
|
Emit(buffer.data());
|
||||||
|
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
pipe = nullptr;
|
pipe = nullptr;
|
||||||
}
|
}
|
||||||
|
@ -529,7 +563,6 @@ size_t zeek::detail::LineBufferedPipe::Process()
|
||||||
auto msgs = extract_msgs(&buffer, '\n');
|
auto msgs = extract_msgs(&buffer, '\n');
|
||||||
|
|
||||||
for ( const auto& msg : msgs )
|
for ( const auto& msg : msgs )
|
||||||
if ( ! msg.empty() )
|
|
||||||
Emit(msg.data());
|
Emit(msg.data());
|
||||||
|
|
||||||
return bytes_read;
|
return bytes_read;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "Timer.h"
|
#include "Timer.h"
|
||||||
#include "Pipe.h"
|
#include "Pipe.h"
|
||||||
#include "Flare.h"
|
#include "Flare.h"
|
||||||
|
#include "Func.h"
|
||||||
#include "NetVar.h"
|
#include "NetVar.h"
|
||||||
#include "IntrusivePtr.h"
|
#include "IntrusivePtr.h"
|
||||||
#include "Options.h"
|
#include "Options.h"
|
||||||
|
@ -60,14 +61,21 @@ struct LineBufferedPipe {
|
||||||
void Drain();
|
void Drain();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read lines from the pipe and emit them to associate stream.
|
* Read lines from the pipe and emit them.
|
||||||
*/
|
*/
|
||||||
size_t Process();
|
size_t Process();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a prefix to given data and emits to associate stream.
|
* Emits a message: either by calling a hook, or if there is no hook
|
||||||
|
* or the hook returns true (no early "break"), printing it to the
|
||||||
|
* associated stream.
|
||||||
*/
|
*/
|
||||||
void Emit(const char* msg) const;
|
void Emit(const char* msg) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A hook to call when emitting messages read from the pipe.
|
||||||
|
*/
|
||||||
|
FuncPtr hook;
|
||||||
};
|
};
|
||||||
} // namespace zeek::detail
|
} // namespace zeek::detail
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
[supervisor:STDERR] [grault] (stderr) supervised node zeek_init()
|
||||||
|
received termination signal
|
||||||
|
[supervisor:STDERR] [grault] received termination signal
|
||||||
|
[supervisor:STDERR] [grault] (stderr) supervised node zeek_done()
|
|
@ -0,0 +1,5 @@
|
||||||
|
hooked stdout of (grault): (stdout) supervised node zeek_init()
|
||||||
|
hooked stderr of (grault): (stderr) supervised node zeek_init()
|
||||||
|
hooked stdout of (grault): (stdout) supervised node zeek_done()
|
||||||
|
hooked stderr of (grault): received termination signal
|
||||||
|
hooked stderr of (grault): (stderr) supervised node zeek_done()
|
|
@ -0,0 +1,3 @@
|
||||||
|
supervisor zeek_init()
|
||||||
|
destroying node
|
||||||
|
supervisor zeek_done()
|
78
testing/btest/supervisor/output-redirect-hook.zeek
Normal file
78
testing/btest/supervisor/output-redirect-hook.zeek
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
# @TEST-PORT: BROKER_PORT
|
||||||
|
# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT
|
||||||
|
# @TEST-EXEC: btest-bg-wait 30
|
||||||
|
# @TEST-EXEC: btest-diff zeek/supervisor.out
|
||||||
|
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff zeek/.stdout
|
||||||
|
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-sort btest-diff zeek/.stderr
|
||||||
|
|
||||||
|
# This test checks the default stdout/stderr redirection will get intercepted
|
||||||
|
# by the supervisor process and sent through the hook mechanisms
|
||||||
|
|
||||||
|
# So the supervised node doesn't terminate right away.
|
||||||
|
redef exit_only_after_terminate=T;
|
||||||
|
|
||||||
|
global supervisor_output_file: file;
|
||||||
|
global topic = "test-topic";
|
||||||
|
global stderr = open("/dev/stderr");
|
||||||
|
|
||||||
|
event do_destroy()
|
||||||
|
{
|
||||||
|
print supervisor_output_file, "destroying node";
|
||||||
|
Supervisor::destroy("grault");
|
||||||
|
}
|
||||||
|
|
||||||
|
hook Supervisor::stdout_hook(node: string, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("hooked stdout of (%s): %s", node, msg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
hook Supervisor::stderr_hook(node: string, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("hooked stderr of (%s): %s", node, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
if ( Supervisor::is_supervisor() )
|
||||||
|
{
|
||||||
|
Broker::subscribe(topic);
|
||||||
|
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
supervisor_output_file = open("supervisor.out");
|
||||||
|
print supervisor_output_file, "supervisor zeek_init()";
|
||||||
|
local sn = Supervisor::NodeConfig($name="grault", $directory="qux");
|
||||||
|
local res = Supervisor::create(sn);
|
||||||
|
|
||||||
|
if ( res != "" )
|
||||||
|
print supervisor_output_file, res;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
print "(stdout) supervised node zeek_init()";
|
||||||
|
print stderr, "(stderr) supervised node zeek_init()";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
if ( Supervisor::is_supervised() )
|
||||||
|
Broker::publish(topic, do_destroy);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
# Should only be run by supervisor
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_done()
|
||||||
|
{
|
||||||
|
if ( Supervisor::is_supervised() )
|
||||||
|
{
|
||||||
|
print "(stdout) supervised node zeek_done()";
|
||||||
|
print stderr, "(stderr) supervised node zeek_done()";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
print supervisor_output_file, "supervisor zeek_done()";
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue