Address supervisor code re-factoring feedback from Robin

This commit is contained in:
Jon Siwek 2020-01-21 18:55:59 -08:00
parent 172456fac0
commit 718879735e
11 changed files with 425 additions and 293 deletions

View file

@ -247,6 +247,7 @@ set(MAIN_SRCS
NetVar.cc
Obj.cc
OpaqueVal.cc
Options.cc
PacketFilter.cc
Pipe.cc
PolicyFile.cc

View file

@ -27,14 +27,14 @@ public:
/**
* Put the object in the "ready" state.
* @param signal_safe whether to skip error-reporting functionality that
* is not async-signal-safe
* is not async-signal-safe (errors still abort the process regardless)
*/
void Fire(bool signal_safe = false);
/**
* Take the object out of the "ready" state.
* @param signal_safe whether to skip error-reporting functionality that
* is not async-signal-safe
* is not async-signal-safe (errors still abort the process regardless)
* @return number of bytes read from the pipe, corresponds to the number
* of times Fire() was called.
*/

52
src/Options.cc Normal file
View file

@ -0,0 +1,52 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "Options.h"
void zeek::Options::filter_supervisor_options()
{
pcap_filter = {};
interfaces = {};
pcap_files = {};
signature_files = {};
pcap_output_file = {};
}
void zeek::Options::filter_supervised_node_options()
{
auto og = *this;
*this = {};
debug_log_streams = og.debug_log_streams;
debug_script_tracing_file = og.debug_script_tracing_file;
script_code_to_exec = og.script_code_to_exec;
script_prefixes = og.script_prefixes;
signature_re_level = og.signature_re_level;
ignore_checksums = og.ignore_checksums;
use_watchdog = og.use_watchdog;
pseudo_realtime = og.pseudo_realtime;
dns_mode = og.dns_mode;
bare_mode = og.bare_mode;
perftools_check_leaks = og.perftools_check_leaks;
perftools_profile = og.perftools_profile;
pcap_filter = og.pcap_filter;
signature_files = og.signature_files;
// TODO: These are likely to be handled in a node-specific or
// use-case-specific way. e.g. interfaces is already handled for the
// "cluster" use-case, but don't have supervised-pcap-reading
// functionality yet.
/* interfaces = og.interfaces; */
/* pcap_files = og.pcap_files; */
pcap_output_file = og.pcap_output_file;
random_seed_input_file = og.random_seed_input_file;
random_seed_output_file = og.random_seed_output_file;
process_status_file = og.process_status_file;
plugins_to_load = og.plugins_to_load;
scripts_to_load = og.scripts_to_load;
script_options_to_set = og.script_options_to_set;
}

77
src/Options.h Normal file
View file

@ -0,0 +1,77 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <optional>
#include <string>
#include <vector>
#include "DNS_Mgr.h"
namespace zeek {
/**
* Options that define general Zeek processing behavior, usually determined
* from command-line arguments.
*/
struct Options {
/**
* Unset options that aren't meant to be used by the supervisor, but may
* make sense for supervised nodes to inherit (as opposed to flagging
* as an error an exiting outright if used in supervisor-mode).
*/
void filter_supervisor_options();
/**
* Inherit certain options set in the original supervisor parent process
* and discard the rest.
*/
void filter_supervised_node_options();
bool print_version = false;
bool print_usage = false;
bool print_execution_time = false;
bool print_signature_debug_info = false;
int print_plugins = 0;
std::optional<std::string> debug_log_streams;
std::optional<std::string> debug_script_tracing_file;
std::optional<std::string> identifier_to_print;
std::optional<std::string> script_code_to_exec;
std::vector<std::string> script_prefixes = { "" }; // "" = "no prefix"
int signature_re_level = 4;
bool ignore_checksums = false;
bool use_watchdog = false;
double pseudo_realtime = 0;
DNS_MgrMode dns_mode = DNS_DEFAULT;
bool supervisor_mode = false;
bool parse_only = false;
bool bare_mode = false;
bool debug_scripts = false;
bool perftools_check_leaks = false;
bool perftools_profile = false;
bool run_unit_tests = false;
std::vector<std::string> doctest_args;
std::optional<std::string> pcap_filter;
std::vector<std::string> interfaces;
std::vector<std::string> pcap_files;
std::vector<std::string> signature_files;
std::optional<std::string> pcap_output_file;
std::optional<std::string> random_seed_input_file;
std::optional<std::string> random_seed_output_file;
std::optional<std::string> process_status_file;
std::optional<std::string> zeekygen_config_file;
std::string libidmef_dtd_file = "idmef-message.dtd";
std::set<std::string> plugins_to_load;
std::vector<std::string> scripts_to_load;
std::vector<std::string> script_options_to_set;
};
} // namespace zeek

View file

@ -66,32 +66,72 @@ private:
int status_flags[2];
};
/**
* A pair of pipes that can be used for bi-directinoal IPC.
*/
class PipePair {
public:
/**
* Create a pair of pipes
* @param flags file descriptor flags to set on pipes
* @status_flags descriptor status flags to set on pipes
* @fds may be supplied to open existing file descriptors rather
* than create ones from a new pair of pipes. Should point to memory
* containing four consecutive file descriptors, "read" end and "write" end
* of the first pipe followed by the "read" end and "write" end of the
* second pipe.
*/
PipePair(int flags, int status_flags, int* fds = nullptr);
/**
* @return the pipe used for receiving input
*/
Pipe& In()
{ return pipes[swapped]; }
/**
* @return the pipe used for sending output
*/
Pipe& Out()
{ return pipes[!swapped]; }
/**
* @return the pipe used for receiving input
*/
const Pipe& In() const
{ return pipes[swapped]; }
/**
* @return the pipe used for sending output
*/
const Pipe& Out() const
{ return pipes[!swapped]; }
/**
* @return a file descriptor that may used for receiving messages by
* polling/reading it.
*/
int InFD() const
{ return In().ReadFD(); }
/**
* @return a file descriptor that may be used for sending messages by
* writing to it.
*/
int OutFD() const
{ return Out().WriteFD(); }
/**
* Swaps the meaning of the pipes in the pair. E.g. call this after
* fork()'ing so that the child process uses the right pipe for
* reading/writing.
*/
void Swap()
{ swapped = ! swapped; }
private:
Pipe pipes[2];
bool swapped = false;
};

View file

@ -1,3 +1,4 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include <sys/types.h>
#include <sys/wait.h>
@ -17,6 +18,7 @@
#include "NetVar.h"
#include "zeek-config.h"
#include "util.h"
#include "zeek-affinity.h"
#define RAPIDJSON_HAS_STDSTRING 1
#include "3rdparty/rapidjson/include/rapidjson/document.h"
@ -33,9 +35,11 @@ extern "C" {
using namespace zeek;
std::optional<Supervisor::SupervisedNode> Supervisor::supervised_node;
namespace {
struct Stem {
Stem(std::unique_ptr<bro::PipePair> p, pid_t parent_pid);
Stem(Supervisor::StemState stem_state);
~Stem();
@ -98,7 +102,7 @@ static RETSIGTYPE stem_signal_handler(int signo)
static RETSIGTYPE supervisor_signal_handler(int signo)
{
supervisor->ObserveChildSignal(signo);
supervisor_mgr->ObserveChildSignal(signo);
return RETSIGVAL;
}
@ -143,7 +147,7 @@ void ParentProcessCheckTimer::Dispatch(double t, int is_expire)
// Linux: prctl(PR_SET_PDEATHSIG, ...)
// FreeBSD: procctl(PROC_PDEATHSIG_CTL)
// Also note the Stem process has its own polling loop with similar logic.
if ( zeek::supervised_node->parent_pid != getppid() )
if ( zeek::Supervisor::ThisNode()->parent_pid != getppid() )
zeek_terminate_loop("supervised node was orphaned");
if ( ! is_expire )
@ -151,10 +155,8 @@ void ParentProcessCheckTimer::Dispatch(double t, int is_expire)
interval));
}
Supervisor::Supervisor(Supervisor::Config cfg,
std::unique_ptr<bro::PipePair> pipe,
pid_t arg_stem_pid)
: config(std::move(cfg)), stem_pid(arg_stem_pid), stem_pipe(std::move(pipe))
Supervisor::Supervisor(Supervisor::Config cfg, StemState ss)
: config(std::move(cfg)), stem_pid(ss.pid), stem_pipe(std::move(ss.pipe))
{
DBG_LOG(DBG_SUPERVISOR, "forked stem process %d", stem_pid);
setsignal(SIGCHLD, supervisor_signal_handler);
@ -179,8 +181,7 @@ Supervisor::Supervisor(Supervisor::Config cfg,
fprintf(stderr, "Supervisor stem died early by signal %d\n",
WTERMSIG(status));
else
fprintf(stderr, "Supervisor stem died early for unknown reason\n",
WTERMSIG(status));
fprintf(stderr, "Supervisor stem died early for unknown reason\n");
}
exit(1);
@ -411,8 +412,8 @@ size_t Supervisor::ProcessMessages()
return msgs.size();
}
Stem::Stem(std::unique_ptr<bro::PipePair> p, pid_t ppid)
: parent_pid(ppid), signal_flare(new bro::Flare()), pipe(std::move(p))
Stem::Stem(Supervisor::StemState ss)
: parent_pid(ss.parent_pid), signal_flare(new bro::Flare()), pipe(std::move(ss.pipe))
{
zeek::set_thread_name("zeek.stem");
pipe->Swap();
@ -857,10 +858,66 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
return {};
}
Supervisor::SupervisedNode Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe, pid_t parent_pid)
std::optional<Supervisor::StemState> Supervisor::CreateStem(bool supervisor_mode)
{
Stem s(std::move(pipe), parent_pid);
return s.Run();
// If the Stem needs to be re-created via fork()/exec(), then the necessary
// state information is communicated via ZEEK_STEM env. var.
auto zeek_stem_env = getenv("ZEEK_STEM");
if ( zeek_stem_env )
{
std::vector<std::string> zeek_stem_nums;
tokenize_string(zeek_stem_env, ",", &zeek_stem_nums);
if ( zeek_stem_nums.size() != 5 )
{
fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n",
zeek_stem_env);
exit(1);
}
pid_t stem_ppid = std::stoi(zeek_stem_nums[0]);
int fds[4];
for ( auto i = 0; i < 4; ++i )
fds[i] = std::stoi(zeek_stem_nums[i + 1]);
StemState ss;
ss.pipe = std::make_unique<bro::PipePair>(FD_CLOEXEC, O_NONBLOCK, fds);
ss.parent_pid = stem_ppid;
zeek::Supervisor::RunStem(std::move(ss));
return {};
}
if ( ! supervisor_mode )
return {};
StemState ss;
ss.pipe = std::make_unique<bro::PipePair>(FD_CLOEXEC, O_NONBLOCK);
ss.parent_pid = getpid();
ss.pid = fork();
if ( ss.pid == -1 )
{
fprintf(stderr, "failed to fork Zeek supervisor stem process: %s\n",
strerror(errno));
exit(1);
}
if ( ss.pid == 0 )
{
zeek::Supervisor::RunStem(std::move(ss));
return {};
}
return std::optional<Supervisor::StemState>(std::move(ss));
}
Supervisor::SupervisedNode Supervisor::RunStem(StemState stem_state)
{
Stem s(std::move(stem_state));
supervised_node = s.Run();
return *supervised_node;
}
static BifEnum::Supervisor::ClusterRole role_str_to_enum(std::string_view r)
@ -908,7 +965,7 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node)
auto scripts_val = node->Lookup("scripts")->AsVectorVal();
for ( auto i = 0; i < scripts_val->Size(); ++i )
for ( auto i = 0u; i < scripts_val->Size(); ++i )
{
auto script = scripts_val->Lookup(i)->AsStringVal()->ToStdString();
rval.scripts.emplace_back(std::move(script));
@ -1088,9 +1145,9 @@ static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRol
}
}
bool Supervisor::SupervisedNode::InitCluster()
bool Supervisor::SupervisedNode::InitCluster() const
{
if ( supervised_node->config.cluster.empty() )
if ( config.cluster.empty() )
return false;
auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType();
@ -1100,7 +1157,7 @@ bool Supervisor::SupervisedNode::InitCluster()
auto has_logger = false;
std::optional<std::string> manager_name;
for ( const auto& e : supervised_node->config.cluster )
for ( const auto& e : config.cluster )
{
if ( e.second.role == BifEnum::Supervisor::MANAGER )
manager_name = e.first;
@ -1108,7 +1165,7 @@ bool Supervisor::SupervisedNode::InitCluster()
has_logger = true;
}
for ( const auto& e : supervised_node->config.cluster )
for ( const auto& e : config.cluster )
{
const auto& node_name = e.first;
const auto& ep = e.second;
@ -1135,6 +1192,79 @@ bool Supervisor::SupervisedNode::InitCluster()
return true;
}
void Supervisor::SupervisedNode::Init(zeek::Options* options) const
{
const auto& node_name = config.name;
if ( config.directory )
{
if ( chdir(config.directory->data()) )
{
fprintf(stderr, "node '%s' failed to chdir to %s: %s\n",
node_name.data(), config.directory->data(),
strerror(errno));
exit(1);
}
}
if ( config.stderr_file )
{
auto fd = open(config.stderr_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n",
node_name.data(), config.stderr_file->data(),
strerror(errno));
exit(1);
}
}
if ( config.stdout_file )
{
auto fd = open(config.stdout_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n",
node_name.data(), config.stdout_file->data(),
strerror(errno));
exit(1);
}
}
if ( config.cpu_affinity )
{
auto res = zeek::set_affinity(*config.cpu_affinity);
if ( ! res )
fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n",
node_name.data(), strerror(errno));
}
if ( ! config.cluster.empty() )
{
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{
fprintf(stderr, "node '%s' failed to setenv: %s\n",
node_name.data(), strerror(errno));
exit(1);
}
}
options->filter_supervised_node_options();
if ( config.interface )
options->interfaces.emplace_back(*config.interface);
for ( const auto& s : config.scripts )
options->scripts_to_load.emplace_back(s);
}
RecordVal* Supervisor::Status(std::string_view node_name)
{
auto rval = new RecordVal(BifType::Record::Supervisor::Status);

View file

@ -1,3 +1,5 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <sys/types.h>
@ -17,6 +19,7 @@
#include "Flare.h"
#include "NetVar.h"
#include "IntrusivePtr.h"
#include "Options.h"
namespace zeek {
@ -149,8 +152,17 @@ public:
* that otherwise is expected to be populated by a
* "cluster-layout.zeek" script in other context (e.g. ZeekCtl
* generates that cluster layout).
* @return true if the supervised node is using the Cluster Framework
* else false.
*/
static bool InitCluster();
bool InitCluster() const;
/**
* Initialize the Supervised node.
* @param options the Zeek options to extend/modify as appropriate
* for the node's configuration.
*/
void Init(zeek::Options* options) const;
/**
* The node's configuration options.
@ -223,28 +235,50 @@ public:
};
/**
* Run the Stem process. The Stem process will receive instructions from
* the Supervisor to manipulate the process hierarchy and it's in charge
* of directly monitoring for whether any nodes die premature and need
* to be revived.
* @param pipe bidirectional pipes that allow the Supervisor and Stem
* process to communicate.
* @param pid the Stem's parent process ID (i.e. the PID of the Supervisor)
* @return state which describes what a supervised node should know about
* itself. I.e. this function only returns from a fork()'d child process.
* State used to initalialize the Stem process.
*/
static SupervisedNode RunStem(std::unique_ptr<bro::PipePair> pipe,
pid_t parent_pid);
struct StemState {
/**
* Bidirectional pipes that allow the Supervisor and Stem to talk.
*/
std::unique_ptr<bro::PipePair> pipe;
/**
* The Stem's parent process ID (i.e. PID of the Supervisor).
*/
pid_t parent_pid = 0;
/**
* The Stem's process ID.
*/
pid_t pid = 0;
};
/**
* Create and run the Stem process if necessary.
* @param supervisor_mode whether Zeek was invoked with the supervisor
* mode specified as command-line argument/option.
* @return state that defines the Stem process if called from the
* Supervisor process. The Stem process itself will not return from this,
* function but a node it spawns via fork() will return from it and
* information about it is available in ThisNode().
*/
static std::optional<StemState> CreateStem(bool supervisor_mode);
/**
* @return the state which describes what a supervised node should know
* about itself if this is a supervised process. If called from a process
* that is not supervised, this returns an "empty" object.
*/
static const std::optional<SupervisedNode>& ThisNode()
{ return supervised_node; }
using NodeMap = std::map<std::string, Node, std::less<>>;
/**
* Create a new Supervisor object.
* @param stem_pipe bidirectional pipe that allow the Supervisor and Stem
* process to communicate.
* @param stem_pid the Stem's process ID.
* @param stem_state information about the Stem process that was already
* created via CreateStem()
*/
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
Supervisor(Config cfg, StemState stem_state);
/**
* Destruction also cleanly shuts down the entire supervised process tree.
@ -329,6 +363,21 @@ private:
const char* Tag() override
{ return "zeek::Supervisor"; }
/**
* Run the Stem process. The Stem process will receive instructions from
* the Supervisor to manipulate the process hierarchy and it's in charge
* of directly monitoring for whether any nodes die premature and need
* to be revived.
* @param pipe bidirectional pipes that allow the Supervisor and Stem
* process to communicate.
* @param pid the Stem's parent process ID (i.e. the PID of the Supervisor)
* @return state which describes what a supervised node should know about
* itself. I.e. this function only returns from a fork()'d child process.
*/
static SupervisedNode RunStem(StemState stem_state);
static std::optional<SupervisedNode> supervised_node;
Config config;
pid_t stem_pid;
std::unique_ptr<bro::PipePair> stem_pipe;
@ -360,7 +409,6 @@ protected:
double interval;
};
extern Supervisor* supervisor;
extern std::optional<Supervisor::SupervisedNode> supervised_node;
extern Supervisor* supervisor_mgr;
} // namespace zeek

View file

@ -23,6 +23,7 @@ extern "C" {
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "Options.h"
#include "bsd-getopt-long.h"
#include "input.h"
#include "DNS_Mgr.h"
@ -78,8 +79,6 @@ extern "C" {
#include "setsignal.h"
};
#include "zeek-affinity.h"
#ifdef USE_PERFTOOLS_DEBUG
HeapLeakChecker* heap_checker = 0;
int perftools_leaks = 0;
@ -99,8 +98,7 @@ file_analysis::Manager* file_mgr = 0;
zeekygen::Manager* zeekygen_mgr = 0;
iosource::Manager* iosource_mgr = 0;
bro_broker::Manager* broker_mgr = 0;
zeek::Supervisor* zeek::supervisor = 0;
std::optional<zeek::Supervisor::SupervisedNode> zeek::supervised_node;
zeek::Supervisor* zeek::supervisor_mgr = 0;
std::vector<std::string> zeek_script_prefixes;
Stmt* stmts;
@ -197,7 +195,7 @@ static void usage(const char* prog, int code = 1)
fprintf(stderr, " -M|--mem-profile | record heap [perftools]\n");
#endif
fprintf(stderr, " --pseudo-realtime[=<speedup>] | enable pseudo-realtime for performance evaluation (default 1)\n");
fprintf(stderr, " -j|--jobs[=<worker count>] | enable supervisor mode with N workers (default 1)\n");
fprintf(stderr, " -j|--jobs | enable supervisor mode\n");
#ifdef USE_IDMEF
fprintf(stderr, " -n|--idmef-dtd <idmef-msg.dtd> | specify path to IDMEF DTD file\n");
@ -221,185 +219,6 @@ static void usage(const char* prog, int code = 1)
exit(code);
}
struct zeek_options {
bool print_version = false;
bool print_usage = false;
bool print_execution_time = false;
bool print_signature_debug_info = false;
int print_plugins = 0;
std::optional<std::string> debug_log_streams;
std::optional<std::string> debug_script_tracing_file;
std::optional<std::string> identifier_to_print;
std::optional<std::string> script_code_to_exec;
std::vector<std::string> script_prefixes = { "" }; // "" = "no prefix"
int signature_re_level = 4;
bool ignore_checksums = false;
bool use_watchdog = false;
double pseudo_realtime = 0;
DNS_MgrMode dns_mode = DNS_DEFAULT;
bool supervisor_mode = false;
bool parse_only = false;
bool bare_mode = false;
bool debug_scripts = false;
bool perftools_check_leaks = false;
bool perftools_profile = false;
bool run_unit_tests = false;
std::vector<std::string> doctest_args;
std::optional<std::string> pcap_filter;
std::vector<std::string> interfaces;
std::vector<std::string> pcap_files;
std::vector<std::string> signature_files;
std::optional<std::string> pcap_output_file;
std::optional<std::string> random_seed_input_file;
std::optional<std::string> random_seed_output_file;
std::optional<std::string> process_status_file;
std::optional<std::string> zeekygen_config_file;
std::string libidmef_dtd_file = "idmef-message.dtd";
std::set<std::string> plugins_to_load;
std::vector<std::string> scripts_to_load;
std::vector<std::string> script_options_to_set;
/**
* Unset options that aren't meant to be used by the supervisor, but may
* make sense for supervised nodes to inherit (as opposed to flagging
* as an error an exiting outright if used in supervisor-mode).
*/
void filter_supervisor_options()
{
pcap_filter = {};
interfaces = {};
pcap_files = {};
signature_files = {};
pcap_output_file = {};
}
/**
* Inherit certain options set in the original supervisor parent process
* and discard the rest.
*/
void filter_supervised_node_options()
{
auto og = *this;
*this = {};
debug_log_streams = og.debug_log_streams;
debug_script_tracing_file = og.debug_script_tracing_file;
script_code_to_exec = og.script_code_to_exec;
script_prefixes = og.script_prefixes;
signature_re_level = og.signature_re_level;
ignore_checksums = og.ignore_checksums;
use_watchdog = og.use_watchdog;
pseudo_realtime = og.pseudo_realtime;
dns_mode = og.dns_mode;
bare_mode = og.bare_mode;
perftools_check_leaks = og.perftools_check_leaks;
perftools_profile = og.perftools_profile;
pcap_filter = og.pcap_filter;
signature_files = og.signature_files;
// TODO: These are likely to be handled in a node-specific or
// use-case-specific way. e.g. interfaces is already handled for the
// "cluster" use-case, but don't have supervised-pcap-reading
// functionality yet.
/* interfaces = og.interfaces; */
/* pcap_files = og.pcap_files; */
pcap_output_file = og.pcap_output_file;
random_seed_input_file = og.random_seed_input_file;
random_seed_output_file = og.random_seed_output_file;
process_status_file = og.process_status_file;
plugins_to_load = og.plugins_to_load;
scripts_to_load = og.scripts_to_load;
script_options_to_set = og.script_options_to_set;
}
};
static void init_supervised_node(zeek_options* options)
{
const auto& config = zeek::supervised_node->config;
const auto& node_name = config.name;
if ( config.directory )
{
if ( chdir(config.directory->data()) )
{
fprintf(stderr, "node '%s' failed to chdir to %s: %s\n",
node_name.data(), config.directory->data(),
strerror(errno));
exit(1);
}
}
if ( config.stderr_file )
{
auto fd = open(config.stderr_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n",
node_name.data(), config.stderr_file->data(),
strerror(errno));
exit(1);
}
}
if ( config.stdout_file )
{
auto fd = open(config.stdout_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n",
node_name.data(), config.stdout_file->data(),
strerror(errno));
exit(1);
}
}
if ( config.cpu_affinity )
{
auto res = zeek::set_affinity(*config.cpu_affinity);
if ( ! res )
fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n",
node_name.data(), strerror(errno));
}
options->filter_supervised_node_options();
if ( config.interface )
options->interfaces.emplace_back(*config.interface);
if ( ! config.cluster.empty() )
{
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{
fprintf(stderr, "node '%s' failed to setenv: %s\n",
node_name.data(), strerror(errno));
exit(1);
}
}
for ( const auto& s : config.scripts )
options->scripts_to_load.emplace_back(s);
}
static std::vector<const char*> to_cargs(const std::vector<std::string>& args)
{
std::vector<const char*> rval;
@ -411,9 +230,9 @@ static std::vector<const char*> to_cargs(const std::vector<std::string>& args)
return rval;
}
static zeek_options parse_cmdline(int argc, char** argv)
static zeek::Options parse_cmdline(int argc, char** argv)
{
zeek_options rval = {};
zeek::Options rval;
// When running unit tests, the first argument on the command line must be
// --test, followed by doctest options. Optionally, users can use "--" as
@ -522,7 +341,7 @@ static zeek_options parse_cmdline(int argc, char** argv)
// getopt may permute the array, so need yet another array
auto zargs = std::make_unique<char*[]>(zeek_args.size());
for ( auto i = 0; i < zeek_args.size(); ++i )
for ( auto i = 0u; i < zeek_args.size(); ++i )
zargs[i] = zeek_args[i].data();
while ( (op = getopt_long(zeek_args.size(), zargs.get(), opts, long_opts, &long_optsind)) != EOF )
@ -671,7 +490,7 @@ static zeek_options parse_cmdline(int argc, char** argv)
// Process remaining arguments. X=Y arguments indicate script
// variable/parameter assignments. X::Y arguments indicate plugins to
// activate/query. The remainder are treated as scripts to load.
while ( optind < zeek_args.size() )
while ( optind < static_cast<int>(zeek_args.size()) )
{
if ( strchr(zargs[optind], '=') )
rval.script_options_to_set.emplace_back(zargs[optind++]);
@ -1017,52 +836,10 @@ int main(int argc, char** argv)
return context.run();
}
pid_t stem_pid = 0;
std::unique_ptr<bro::PipePair> supervisor_pipe;
auto zeek_stem_env = getenv("ZEEK_STEM");
auto stem_state = zeek::Supervisor::CreateStem(options.supervisor_mode);
if ( zeek_stem_env )
{
std::vector<std::string> zeek_stem_nums;
tokenize_string(zeek_stem_env, ",", &zeek_stem_nums);
if ( zeek_stem_nums.size() != 5 )
{
fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n",
zeek_stem_env);
exit(1);
}
pid_t stem_ppid = std::stoi(zeek_stem_nums[0]);
int fds[4];
for ( auto i = 0; i < 4; ++i )
fds[i] = std::stoi(zeek_stem_nums[i + 1]);
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds});
zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe),
stem_ppid);
}
else if ( options.supervisor_mode )
{
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK});
auto stem_ppid = getpid();
stem_pid = fork();
if ( stem_pid == -1 )
{
fprintf(stderr, "failed to fork Zeek supervisor stem process: %s\n",
strerror(errno));
exit(1);
}
if ( stem_pid == 0 )
zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe),
stem_ppid);
}
if ( zeek::supervised_node )
init_supervised_node(&options);
if ( zeek::Supervisor::ThisNode() )
zeek::Supervisor::ThisNode()->Init(&options);
double time_start = current_time(true);
@ -1134,9 +911,8 @@ int main(int argc, char** argv)
zeek::Supervisor::Config cfg = {};
cfg.zeek_exe_path = zeek_exe_path;
options.filter_supervisor_options();
zeek::supervisor = new zeek::Supervisor(std::move(cfg),
std::move(supervisor_pipe),
stem_pid);
zeek::supervisor_mgr = new zeek::Supervisor(std::move(cfg),
std::move(*stem_state));
}
const char* seed_load_file = zeekenv("ZEEK_SEED_FILE");
@ -1191,7 +967,7 @@ int main(int argc, char** argv)
options.interfaces.size() == 0 &&
! options.identifier_to_print &&
! command_line_policy && ! options.print_plugins &&
! options.supervisor_mode && ! zeek::supervised_node )
! options.supervisor_mode && ! zeek::Supervisor::ThisNode() )
add_input_file("-");
for ( const auto& script_option : options.script_options_to_set )
@ -1510,8 +1286,8 @@ int main(int argc, char** argv)
iosource_mgr->Register(thread_mgr, true);
if ( zeek::supervisor )
iosource_mgr->Register(zeek::supervisor);
if ( zeek::supervisor_mgr )
iosource_mgr->Register(zeek::supervisor_mgr);
if ( iosource_mgr->Size() > 0 ||
have_pending_timers ||
@ -1532,7 +1308,7 @@ int main(int argc, char** argv)
#endif
if ( zeek::supervised_node )
if ( zeek::Supervisor::ThisNode() )
timer_mgr->Add(new zeek::ParentProcessCheckTimer(1, 1));
double time_net_start = current_time(true);;

View file

@ -21,67 +21,67 @@ type Supervisor::NodeStatus: record;
function Supervisor::__status%(node: string%): Supervisor::Status
%{
if ( ! zeek::supervisor )
if ( ! zeek::supervisor_mgr )
{
builtin_error("supervisor mode not enabled");
return new RecordVal(BifType::Record::Supervisor::Status);
}
return zeek::supervisor->Status(node->CheckString());
return zeek::supervisor_mgr->Status(node->CheckString());
%}
function Supervisor::__create%(node: Supervisor::NodeConfig%): string
%{
if ( ! zeek::supervisor )
if ( ! zeek::supervisor_mgr )
{
builtin_error("supervisor mode not enabled");
return new StringVal("supervisor mode not enabled");
}
auto rval = zeek::supervisor->Create(node->AsRecordVal());
auto rval = zeek::supervisor_mgr->Create(node->AsRecordVal());
return new StringVal(rval);
%}
function Supervisor::__destroy%(node: string%): bool
%{
if ( ! zeek::supervisor )
if ( ! zeek::supervisor_mgr )
{
builtin_error("supervisor mode not enabled");
return val_mgr->GetBool(false);
}
auto rval = zeek::supervisor->Destroy(node->CheckString());
auto rval = zeek::supervisor_mgr->Destroy(node->CheckString());
return val_mgr->GetBool(rval);
%}
function Supervisor::__restart%(node: string%): bool
%{
if ( ! zeek::supervisor )
if ( ! zeek::supervisor_mgr )
{
builtin_error("supervisor mode not enabled");
return val_mgr->GetBool(false);
}
auto rval = zeek::supervisor->Restart(node->CheckString());
auto rval = zeek::supervisor_mgr->Restart(node->CheckString());
return val_mgr->GetBool(rval);
%}
function Supervisor::__init_cluster%(%): bool
%{
if ( zeek::supervised_node )
return val_mgr->GetBool(zeek::supervised_node->InitCluster());
if ( zeek::Supervisor::ThisNode() )
return val_mgr->GetBool(zeek::Supervisor::ThisNode()->InitCluster());
return val_mgr->GetBool(false);
%}
function Supervisor::__is_supervised%(%): bool
%{
return val_mgr->GetBool(zeek::supervised_node.has_value());
return val_mgr->GetBool(zeek::Supervisor::ThisNode().has_value());
%}
function Supervisor::__node%(%): Supervisor::NodeConfig
%{
if ( ! zeek::supervised_node )
if ( ! zeek::Supervisor::ThisNode() )
{
builtin_error("not a supervised process");
auto rt = BifType::Record::Supervisor::NodeConfig;
@ -90,11 +90,11 @@ function Supervisor::__node%(%): Supervisor::NodeConfig
return rval.detach();
}
auto rval = zeek::supervised_node->config.ToRecord();
auto rval = zeek::Supervisor::ThisNode()->config.ToRecord();
return rval.detach();
%}
function Supervisor::__is_supervisor%(%): bool
%{
return val_mgr->GetBool(zeek::supervisor != nullptr);
return val_mgr->GetBool(zeek::supervisor_mgr != nullptr);
%}

View file

@ -1,5 +1,11 @@
// See the file "COPYING" in the main distribution directory for copyright.
// This is all in its own source file primarily because the Linux
// implementation uses the _GNU_SOURCE feature test macro which must be
// defined before including any header file and lumping this together with
// other util functions makes that requirement less apparent and less
// self-contained.
#if defined(__linux__)
#if !defined(_GNU_SOURCE)

View file

@ -1,5 +1,7 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
namespace zeek {
/**