mirror of
https://github.com/zeek/zeek.git
synced 2025-10-15 04:58:21 +00:00

* topic/christian/management-config-validation:
Management framework: bump external cluster testsuite
Management framework: bump zeek-client
Management framework: add config validation
Management framework: improvements to port auto-enumeration
Management framework: control output-to-console in Supervisor
Management framework: handle no-instances corner case in set-config correctly
(cherry picked from commit 4deacefa4c
)
122 lines
3.3 KiB
Text
122 lines
3.3 KiB
Text
##! This module provides functionality the Management framework places directly
|
|
##! in the Supervisor.
|
|
|
|
@load base/utils/paths
|
|
@load base/utils/queue
|
|
|
|
@load policy/frameworks/management/types
|
|
@load policy/frameworks/management/node/config
|
|
|
|
@load ./api
|
|
@load ./config
|
|
|
|
module Management::Supervisor;
|
|
|
|
# stdout/stderr state for a given node.
|
|
type NodeOutputStreams: record {
|
|
# Line buffers for stdout and stderr. Their length is capped
|
|
# to the most recent Management::Supervisor::output_max_lines.
|
|
stdout: Queue::Queue;
|
|
stderr: Queue::Queue;
|
|
|
|
#
|
|
stdout_file: file &optional;
|
|
stderr_file: file &optional;
|
|
};
|
|
|
|
# This tracks output state for the current nodes.
|
|
global g_outputs: table[string] of NodeOutputStreams;
|
|
|
|
function make_node_output_streams(node: string): NodeOutputStreams
|
|
{
|
|
local stdout = Queue::init([$max_len = Management::Supervisor::output_max_lines]);
|
|
local stderr = Queue::init([$max_len = Management::Supervisor::output_max_lines]);
|
|
|
|
local res = NodeOutputStreams($stdout=stdout, $stderr=stderr);
|
|
local status = Supervisor::status(node);
|
|
|
|
if ( node !in status$nodes )
|
|
return res;
|
|
|
|
local ns = status$nodes[node];
|
|
local directory = ".";
|
|
|
|
if ( ns$node?$directory )
|
|
directory = ns$node$directory;
|
|
|
|
if ( Management::Node::stdout_file != "" )
|
|
res$stdout_file = open(build_path(directory, Management::Node::stdout_file));
|
|
if ( Management::Node::stderr_file != "" )
|
|
res$stderr_file = open(build_path(directory, Management::Node::stderr_file));
|
|
|
|
return res;
|
|
}
|
|
|
|
hook Supervisor::stdout_hook(node: string, msg: string)
|
|
{
|
|
if ( node !in g_outputs )
|
|
g_outputs[node] = make_node_output_streams(node);
|
|
|
|
# Write to the stdout file if we have one. The flush is clunky, but
|
|
# seems worth it: it's too confusing for errors to have happened and not
|
|
# yet shown up in the file. (The Supervisor's built-in file redirection
|
|
# does this too.)
|
|
if ( g_outputs[node]?$stdout_file )
|
|
{
|
|
print g_outputs[node]$stdout_file, msg;
|
|
flush_all();
|
|
}
|
|
|
|
# Update the sliding window of recent output lines.
|
|
Queue::put(g_outputs[node]$stdout, msg);
|
|
|
|
if ( ! print_stdout )
|
|
break;
|
|
}
|
|
|
|
hook Supervisor::stderr_hook(node: string, msg: string)
|
|
{
|
|
if ( node !in g_outputs )
|
|
g_outputs[node] = make_node_output_streams(node);
|
|
|
|
if ( g_outputs[node]?$stderr_file )
|
|
{
|
|
print g_outputs[node]$stderr_file, msg;
|
|
flush_all();
|
|
}
|
|
|
|
Queue::put(g_outputs[node]$stderr, msg);
|
|
|
|
if ( ! print_stderr )
|
|
break;
|
|
}
|
|
|
|
event Supervisor::node_status(node: string, pid: count)
|
|
{
|
|
# The node just started or restarted. If we have collected any output
|
|
# for its past life, send it via a notify_node_exit event.
|
|
if ( node in g_outputs )
|
|
{
|
|
local stdout_lines: vector of string;
|
|
local stderr_lines: vector of string;
|
|
|
|
Queue::get_vector(g_outputs[node]$stdout, stdout_lines);
|
|
Queue::get_vector(g_outputs[node]$stderr, stderr_lines);
|
|
|
|
if ( |stdout_lines| > 0 || |stderr_lines| > 0 )
|
|
{
|
|
local outputs = Management::NodeOutputs(
|
|
$stdout = join_string_vec(stdout_lines, "\n"),
|
|
$stderr = join_string_vec(stderr_lines, "\n"));
|
|
|
|
Broker::publish(topic_prefix, Management::Supervisor::API::notify_node_exit, node, outputs);
|
|
}
|
|
|
|
if ( g_outputs[node]?$stdout_file )
|
|
close(g_outputs[node]$stdout_file);
|
|
if ( g_outputs[node]?$stderr_file )
|
|
close(g_outputs[node]$stderr_file);
|
|
}
|
|
|
|
g_outputs[node] = make_node_output_streams(node);
|
|
}
|