mirror of
https://github.com/zeek/zeek.git
synced 2025-10-13 20:18:20 +00:00
Integrate Supervisor code review suggestions
This commit is contained in:
parent
10709c627b
commit
7669f560d1
7 changed files with 61 additions and 54 deletions
|
@ -30,8 +30,8 @@ function supervisor_rotation_format_func(ri: Log::RotationFmtInfo): Log::Rotatio
|
||||||
{
|
{
|
||||||
local open_str = strftime(Log::default_rotation_date_format, ri$open);
|
local open_str = strftime(Log::default_rotation_date_format, ri$open);
|
||||||
local close_str = strftime(Log::default_rotation_date_format, ri$open);
|
local close_str = strftime(Log::default_rotation_date_format, ri$open);
|
||||||
local prefix = fmt("%s__%s__%s__", ri$path, open_str, close_str);
|
local base = fmt("%s__%s__%s__", ri$path, open_str, close_str);
|
||||||
local rval = Log::RotationPath($file_prefix=prefix);
|
local rval = Log::RotationPath($file_basename=base);
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -154,19 +154,19 @@ export {
|
||||||
## just-in-time, as the log rotation is about to happen. If it
|
## just-in-time, as the log rotation is about to happen. If it
|
||||||
## cannot be created, an error is emitted and the rotation process
|
## cannot be created, an error is emitted and the rotation process
|
||||||
## tries to proceed with rotation inside the working directory. When
|
## tries to proceed with rotation inside the working directory. When
|
||||||
## setting this field, beware that renaming files across systems will
|
## setting this field, beware that renaming files across file systems
|
||||||
## generally fail.
|
## will generally fail.
|
||||||
dir: string &default = default_rotation_dir;
|
dir: string &default = default_rotation_dir;
|
||||||
|
|
||||||
## A prefix to use for the the rotated log. Log writers may later
|
## A base name to use for the the rotated log. Log writers may later
|
||||||
## append a file extension of their choosing to this user-chosen
|
## append a file extension of their choosing to this user-chosen
|
||||||
## prefix (e.g. if using the default ASCII writer and you want
|
## base (e.g. if using the default ASCII writer and you want
|
||||||
## rotated files of the format "foo-<date>.log", then this prefix
|
## rotated files of the format "foo-<date>.log", then this basename
|
||||||
## can be set to "foo-<date>" and the ".log" is added later (there's
|
## can be set to "foo-<date>" and the ".log" is added later (there's
|
||||||
## also generally means of customizing the file extension, too,
|
## also generally means of customizing the file extension, too,
|
||||||
## like the ``ZEEK_LOG_SUFFIX`` environment variable or
|
## like the ``ZEEK_LOG_SUFFIX`` environment variable or
|
||||||
## writer-dependent configuration options.
|
## writer-dependent configuration options.
|
||||||
file_prefix: string;
|
file_basename: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
## A function that one may use to customize log file rotation paths.
|
## A function that one may use to customize log file rotation paths.
|
||||||
|
@ -633,12 +633,12 @@ function Log::rotation_format_func(ri: Log::RotationFmtInfo): Log::RotationPath
|
||||||
default_rotation_postprocessors[WRITER_ASCII] == default_ascii_rotation_postprocessor_func)
|
default_rotation_postprocessors[WRITER_ASCII] == default_ascii_rotation_postprocessor_func)
|
||||||
{
|
{
|
||||||
open_str = strftime(Log::default_rotation_date_format, ri$open);
|
open_str = strftime(Log::default_rotation_date_format, ri$open);
|
||||||
rval = RotationPath($file_prefix=fmt("%s.%s", ri$path, open_str));
|
rval = RotationPath($file_basename=fmt("%s.%s", ri$path, open_str));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
open_str = strftime("%y-%m-%d_%H.%M.%S", ri$open);
|
open_str = strftime("%y-%m-%d_%H.%M.%S", ri$open);
|
||||||
rval = RotationPath($file_prefix=fmt("%s-%s", ri$path, open_str));
|
rval = RotationPath($file_basename=fmt("%s-%s", ri$path, open_str));
|
||||||
}
|
}
|
||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
|
|
|
@ -118,7 +118,9 @@ export {
|
||||||
## a supervised one.
|
## a supervised one.
|
||||||
global node: function(): NodeConfig;
|
global node: function(): NodeConfig;
|
||||||
|
|
||||||
## Hooks intercepted stdout stream for all supervisor's child processes.
|
## Hooks into the stdout stream for all supervisor's child processes.
|
||||||
|
## If a hook terminates with `break`, that will suppress output to the
|
||||||
|
## associated stream.
|
||||||
##
|
##
|
||||||
## node: the name of a previously created node via
|
## node: the name of a previously created node via
|
||||||
## :zeek:see:`Supervisor::create` indicating to which
|
## :zeek:see:`Supervisor::create` indicating to which
|
||||||
|
@ -130,7 +132,9 @@ export {
|
||||||
## msg: line-buffered contents from the stdout of a child process.
|
## msg: line-buffered contents from the stdout of a child process.
|
||||||
global stdout_hook: hook(node: string, msg: string);
|
global stdout_hook: hook(node: string, msg: string);
|
||||||
|
|
||||||
## Hooks intercepted stderr stream for all supervisor's child processes.
|
## Hooks into the stderr stream for all supervisor's child processes.
|
||||||
|
## If a hook terminates with `break`, that will suppress output to the
|
||||||
|
## associated stream.
|
||||||
##
|
##
|
||||||
## node: the name of a previously created node via
|
## node: the name of a previously created node via
|
||||||
## :zeek:see:`Supervisor::create` indicating to which
|
## :zeek:see:`Supervisor::create` indicating to which
|
||||||
|
|
|
@ -1505,8 +1505,8 @@ std::string Manager::FormatRotationPath(zeek::EnumValPtr writer,
|
||||||
{
|
{
|
||||||
auto ri = zeek::make_intrusive<zeek::RecordVal>(zeek::BifType::Record::Log::RotationFmtInfo);
|
auto ri = zeek::make_intrusive<zeek::RecordVal>(zeek::BifType::Record::Log::RotationFmtInfo);
|
||||||
ri->Assign(0, std::move(writer));
|
ri->Assign(0, std::move(writer));
|
||||||
ri->Assign<zeek::TimeVal>(2, open);
|
|
||||||
ri->Assign<zeek::StringVal>(1, path.size(), path.data());
|
ri->Assign<zeek::StringVal>(1, path.size(), path.data());
|
||||||
|
ri->Assign<zeek::TimeVal>(2, open);
|
||||||
ri->Assign<zeek::TimeVal>(3, close);
|
ri->Assign<zeek::TimeVal>(3, close);
|
||||||
ri->Assign(4, zeek::val_mgr->Bool(terminating));
|
ri->Assign(4, zeek::val_mgr->Bool(terminating));
|
||||||
ri->Assign<zeek::Val>(5, std::move(postprocessor));
|
ri->Assign<zeek::Val>(5, std::move(postprocessor));
|
||||||
|
|
|
@ -81,11 +81,10 @@ struct LeftoverLog {
|
||||||
std::string error;
|
std::string error;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the name of the log without file extension (this is
|
* Return the "path" (logging framework parlance) of the log without the
|
||||||
* what's called "path" in other logging framework parlance).
|
* file extension. E.g. the "path" of "conn.log" is just "conn".
|
||||||
* E.g. the name of "conn.log" is just "conn".
|
|
||||||
*/
|
*/
|
||||||
std::string Name() const
|
std::string Path() const
|
||||||
{ return filename.substr(0, filename.size() - extension.size()); }
|
{ return filename.substr(0, filename.size() - extension.size()); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,7 +96,6 @@ struct LeftoverLog {
|
||||||
|
|
||||||
static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
{
|
{
|
||||||
char errbuf[512];
|
|
||||||
auto sfname = shadow_file_prefix + fname;
|
auto sfname = shadow_file_prefix + fname;
|
||||||
|
|
||||||
LeftoverLog rval;
|
LeftoverLog rval;
|
||||||
|
@ -108,8 +106,8 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
|
|
||||||
if ( ! sf_stream )
|
if ( ! sf_stream )
|
||||||
{
|
{
|
||||||
bro_strerror_r(errno, errbuf, sizeof(errbuf));
|
rval.error = fmt("Failed to open %s: %s",
|
||||||
rval.error = "Failed to open " + rval.shadow_filename + ": " + errbuf;
|
rval.shadow_filename.data(), strerror(errno));
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,6 +117,7 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
|
|
||||||
auto sf_content = std::make_unique<char[]>(sf_len);
|
auto sf_content = std::make_unique<char[]>(sf_len);
|
||||||
auto bytes_read = fread(sf_content.get(), 1, sf_len, sf_stream);
|
auto bytes_read = fread(sf_content.get(), 1, sf_len, sf_stream);
|
||||||
|
fclose(sf_stream);
|
||||||
|
|
||||||
if ( bytes_read != static_cast<size_t>(sf_len) )
|
if ( bytes_read != static_cast<size_t>(sf_len) )
|
||||||
{
|
{
|
||||||
|
@ -126,17 +125,14 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
fclose(sf_stream);
|
|
||||||
std::string_view sf_view(sf_content.get(), sf_len);
|
std::string_view sf_view(sf_content.get(), sf_len);
|
||||||
auto sf_lines = tokenize_string(sf_view, '\n');
|
auto sf_lines = tokenize_string(sf_view, '\n');
|
||||||
|
|
||||||
if ( sf_lines.size() < 2 )
|
if ( sf_lines.size() < 2 )
|
||||||
{
|
{
|
||||||
snprintf(errbuf, sizeof(errbuf),
|
rval.error = fmt("Found leftover log, '%s', but the associated shadow "
|
||||||
"Found leftover log, '%s', but the associated shadow file, "
|
" file, '%s', required to process it is invalid",
|
||||||
"'%s', required to process it is invalid",
|
|
||||||
rval.filename.data(), rval.shadow_filename.data());
|
rval.filename.data(), rval.shadow_filename.data());
|
||||||
rval.error = errbuf;
|
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,8 +144,8 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
// Use shadow file's modification time as creation time.
|
// Use shadow file's modification time as creation time.
|
||||||
if ( stat(rval.shadow_filename.data(), &st) != 0 )
|
if ( stat(rval.shadow_filename.data(), &st) != 0 )
|
||||||
{
|
{
|
||||||
bro_strerror_r(errno, errbuf, sizeof(errbuf));
|
rval.error = fmt("Failed to stat %s: %s",
|
||||||
rval.error = "Failed to stat " + rval.shadow_filename + ": " + errbuf;
|
rval.shadow_filename.data(), strerror(errno));
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,8 +154,8 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
|
||||||
// Use log file's modification time for closing time.
|
// Use log file's modification time for closing time.
|
||||||
if ( stat(rval.filename.data(), &st) != 0 )
|
if ( stat(rval.filename.data(), &st) != 0 )
|
||||||
{
|
{
|
||||||
bro_strerror_r(errno, errbuf, sizeof(errbuf));
|
rval.error = fmt("Failed to stat %s: %s",
|
||||||
rval.error = "Failed to stat " + rval.filename + ": " + errbuf;
|
rval.filename.data(), strerror(errno));
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -755,21 +751,21 @@ void Ascii::RotateLeftoverLogs()
|
||||||
if ( func )
|
if ( func )
|
||||||
ppf = std::move(func);
|
ppf = std::move(func);
|
||||||
else
|
else
|
||||||
reporter->Warning("Could no postprocess log '%s' with intended "
|
reporter->Warning("Could not postprocess log '%s' with intended "
|
||||||
"postprocessor function '%s', proceeding "
|
"postprocessor function '%s', proceeding "
|
||||||
" with the default function",
|
" with the default function",
|
||||||
ll.filename.data(), ll.post_proc_func.data());
|
ll.filename.data(), ll.post_proc_func.data());
|
||||||
}
|
}
|
||||||
|
|
||||||
auto rotation_path = log_mgr->FormatRotationPath(
|
auto rotation_path = log_mgr->FormatRotationPath(
|
||||||
writer_val, ll.Name(), ll.open_time, ll.close_time, false, ppf);
|
writer_val, ll.Path(), ll.open_time, ll.close_time, false, ppf);
|
||||||
|
|
||||||
rotation_path += ll.extension;
|
rotation_path += ll.extension;
|
||||||
|
|
||||||
auto rot_info = zeek::make_intrusive<zeek::RecordVal>(rot_info_type);
|
auto rot_info = zeek::make_intrusive<zeek::RecordVal>(rot_info_type);
|
||||||
rot_info->Assign(0, writer_val);
|
rot_info->Assign(0, writer_val);
|
||||||
rot_info->Assign<zeek::StringVal>(1, rotation_path);
|
rot_info->Assign<zeek::StringVal>(1, rotation_path);
|
||||||
rot_info->Assign<zeek::StringVal>(2, ll.Name());
|
rot_info->Assign<zeek::StringVal>(2, ll.Path());
|
||||||
rot_info->Assign<zeek::TimeVal>(3, ll.open_time);
|
rot_info->Assign<zeek::TimeVal>(3, ll.open_time);
|
||||||
rot_info->Assign<zeek::TimeVal>(4, ll.close_time);
|
rot_info->Assign<zeek::TimeVal>(4, ll.close_time);
|
||||||
rot_info->Assign(5, zeek::val_mgr->False());
|
rot_info->Assign(5, zeek::val_mgr->False());
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
#include <cstdarg>
|
#include <cstdarg>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "iosource/Manager.h"
|
#include "iosource/Manager.h"
|
||||||
#include "ZeekString.h"
|
#include "ZeekString.h"
|
||||||
|
@ -79,6 +80,13 @@ struct Stem {
|
||||||
|
|
||||||
void Reap();
|
void Reap();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This performs fork() to initialize the supervised-node structure.
|
||||||
|
* There's three possible outcomes:
|
||||||
|
* - return value is SupervisedNode: we are the child process
|
||||||
|
* - return value is True: we are the parent and fork() succeeded
|
||||||
|
* - return value is False: we are the parent and fork() failed
|
||||||
|
*/
|
||||||
std::variant<bool, SupervisedNode> Spawn(SupervisorNode* node);
|
std::variant<bool, SupervisedNode> Spawn(SupervisorNode* node);
|
||||||
|
|
||||||
int AliveNodeCount() const;
|
int AliveNodeCount() const;
|
||||||
|
@ -154,6 +162,21 @@ static std::vector<std::string> extract_msgs(std::string* buffer, char delim)
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::pair<int, std::vector<std::string>>
|
||||||
|
read_msgs(int fd, std::string* buffer, char delim)
|
||||||
|
{
|
||||||
|
constexpr auto buf_size = 256;
|
||||||
|
char buf[buf_size];
|
||||||
|
|
||||||
|
int bytes_read = read(fd, buf, buf_size);
|
||||||
|
|
||||||
|
if ( bytes_read <= 0 )
|
||||||
|
return {bytes_read, {}};
|
||||||
|
|
||||||
|
buffer->append(buf, bytes_read);
|
||||||
|
return {bytes_read, extract_msgs(buffer, delim)};
|
||||||
|
}
|
||||||
|
|
||||||
static std::string make_create_message(const Supervisor::NodeConfig& node)
|
static std::string make_create_message(const Supervisor::NodeConfig& node)
|
||||||
{
|
{
|
||||||
auto json_str = node.ToJSON();
|
auto json_str = node.ToJSON();
|
||||||
|
@ -551,17 +574,11 @@ size_t zeek::detail::LineBufferedPipe::Process()
|
||||||
if ( ! pipe )
|
if ( ! pipe )
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
char buf[256];
|
auto [bytes_read, msgs] = read_msgs(pipe->ReadFD(), &buffer, '\n');
|
||||||
|
|
||||||
int bytes_read = read(pipe->ReadFD(), buf, 256);
|
|
||||||
|
|
||||||
if ( bytes_read <= 0 )
|
if ( bytes_read <= 0 )
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
buffer.append(buf, bytes_read);
|
|
||||||
|
|
||||||
auto msgs = extract_msgs(&buffer, '\n');
|
|
||||||
|
|
||||||
for ( const auto& msg : msgs )
|
for ( const auto& msg : msgs )
|
||||||
Emit(msg.data());
|
Emit(msg.data());
|
||||||
|
|
||||||
|
@ -570,13 +587,7 @@ size_t zeek::detail::LineBufferedPipe::Process()
|
||||||
|
|
||||||
size_t Supervisor::ProcessMessages()
|
size_t Supervisor::ProcessMessages()
|
||||||
{
|
{
|
||||||
char buf[256];
|
auto [bytes_read, msgs] = read_msgs(stem_pipe->InFD(), &msg_buffer, '\0');
|
||||||
int bytes_read = read(stem_pipe->InFD(), buf, 256);
|
|
||||||
|
|
||||||
if ( bytes_read > 0 )
|
|
||||||
msg_buffer.append(buf, bytes_read);
|
|
||||||
|
|
||||||
auto msgs = extract_msgs(&msg_buffer, '\0');
|
|
||||||
|
|
||||||
for ( auto& msg : msgs )
|
for ( auto& msg : msgs )
|
||||||
{
|
{
|
||||||
|
@ -1061,8 +1072,7 @@ std::optional<SupervisedNode> Stem::Poll()
|
||||||
// No messages from supervisor to process, so return early.
|
// No messages from supervisor to process, so return early.
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
char buf[256];
|
auto [bytes_read, msgs] = read_msgs(pipe->InFD(), &msg_buffer, '\0');
|
||||||
int bytes_read = read(pipe->InFD(), buf, 256);
|
|
||||||
|
|
||||||
if ( bytes_read == 0 )
|
if ( bytes_read == 0 )
|
||||||
{
|
{
|
||||||
|
@ -1077,9 +1087,6 @@ std::optional<SupervisedNode> Stem::Poll()
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
msg_buffer.append(buf, bytes_read);
|
|
||||||
auto msgs = extract_msgs(&msg_buffer, '\0');
|
|
||||||
|
|
||||||
for ( auto& msg : msgs )
|
for ( auto& msg : msgs )
|
||||||
{
|
{
|
||||||
std::vector<std::string> msg_tokens;
|
std::vector<std::string> msg_tokens;
|
||||||
|
|
|
@ -22,8 +22,8 @@ function my_rotation_format_func(ri: Log::RotationFmtInfo): Log::RotationPath
|
||||||
{
|
{
|
||||||
local open_str = strftime(Log::default_rotation_date_format, ri$open);
|
local open_str = strftime(Log::default_rotation_date_format, ri$open);
|
||||||
local close_str = strftime(Log::default_rotation_date_format, ri$open);
|
local close_str = strftime(Log::default_rotation_date_format, ri$open);
|
||||||
local prefix =fmt("%s__%s__%s__", ri$path, open_str, close_str);
|
local base = fmt("%s__%s__%s__", ri$path, open_str, close_str);
|
||||||
local rval = Log::RotationPath($file_prefix=prefix);
|
local rval = Log::RotationPath($file_basename=base);
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue