Factor Supervisor initialization data to separate struct from Stem

This commit is contained in:
Jon Siwek 2020-06-15 17:19:26 -07:00
parent d00b3a8daa
commit 0acd5fea0c
3 changed files with 41 additions and 45 deletions

View file

@ -46,8 +46,23 @@ using namespace zeek;
std::optional<Supervisor::SupervisedNode> Supervisor::supervised_node; std::optional<Supervisor::SupervisedNode> Supervisor::supervised_node;
namespace { namespace {
struct Stem { struct Stem {
Stem(Supervisor::StemState stem_state); /**
* State used to initalialize the Stem process.
*/
struct State {
/**
* Bidirectional pipes that allow the Supervisor and Stem to talk.
*/
std::unique_ptr<zeek::detail::PipePair> pipe;
/**
* The Stem's parent process ID (i.e. PID of the Supervisor).
*/
pid_t parent_pid = 0;
};
Stem(State stem_state);
~Stem(); ~Stem();
@ -164,8 +179,8 @@ void zeek::detail::ParentProcessCheckTimer::Dispatch(double t, bool is_expire)
interval)); interval));
} }
Supervisor::Supervisor(Supervisor::Config cfg, StemState ss) Supervisor::Supervisor(Supervisor::Config cfg, StemHandle sh)
: config(std::move(cfg)), stem_pid(ss.pid), stem_pipe(std::move(ss.pipe)) : config(std::move(cfg)), stem_pid(sh.pid), stem_pipe(std::move(sh.pipe))
{ {
DBG_LOG(DBG_SUPERVISOR, "forked stem process %d", stem_pid); DBG_LOG(DBG_SUPERVISOR, "forked stem process %d", stem_pid);
setsignal(SIGCHLD, supervisor_signal_handler); setsignal(SIGCHLD, supervisor_signal_handler);
@ -426,7 +441,7 @@ size_t Supervisor::ProcessMessages()
return msgs.size(); return msgs.size();
} }
Stem::Stem(Supervisor::StemState ss) Stem::Stem(State ss)
: parent_pid(ss.parent_pid), signal_flare(new zeek::detail::Flare()), pipe(std::move(ss.pipe)) : parent_pid(ss.parent_pid), signal_flare(new zeek::detail::Flare()), pipe(std::move(ss.pipe))
{ {
zeek::set_thread_name("zeek.stem"); zeek::set_thread_name("zeek.stem");
@ -904,7 +919,7 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
return {}; return {};
} }
std::optional<Supervisor::StemState> Supervisor::CreateStem(bool supervisor_mode) std::optional<Supervisor::StemHandle> Supervisor::CreateStem(bool supervisor_mode)
{ {
// If the Stem needs to be re-created via fork()/exec(), then the necessary // If the Stem needs to be re-created via fork()/exec(), then the necessary
// state information is communicated via ZEEK_STEM env. var. // state information is communicated via ZEEK_STEM env. var.
@ -928,42 +943,41 @@ std::optional<Supervisor::StemState> Supervisor::CreateStem(bool supervisor_mode
for ( auto i = 0; i < 4; ++i ) for ( auto i = 0; i < 4; ++i )
fds[i] = std::stoi(zeek_stem_nums[i + 1]); fds[i] = std::stoi(zeek_stem_nums[i + 1]);
StemState ss; Stem::State ss;
ss.pipe = std::make_unique<zeek::detail::PipePair>(FD_CLOEXEC, O_NONBLOCK, fds); ss.pipe = std::make_unique<zeek::detail::PipePair>(FD_CLOEXEC, O_NONBLOCK, fds);
ss.parent_pid = stem_ppid; ss.parent_pid = stem_ppid;
zeek::Supervisor::RunStem(std::move(ss));
Stem stem{std::move(ss)};
supervised_node = stem.Run();
return {}; return {};
} }
if ( ! supervisor_mode ) if ( ! supervisor_mode )
return {}; return {};
StemState ss; Stem::State ss;
ss.pipe = std::make_unique<zeek::detail::PipePair>(FD_CLOEXEC, O_NONBLOCK); ss.pipe = std::make_unique<zeek::detail::PipePair>(FD_CLOEXEC, O_NONBLOCK);
ss.parent_pid = getpid(); ss.parent_pid = getpid();
ss.pid = fork(); auto pid = fork();
if ( ss.pid == -1 ) if ( pid == -1 )
{ {
fprintf(stderr, "failed to fork Zeek supervisor stem process: %s\n", fprintf(stderr, "failed to fork Zeek supervisor stem process: %s\n",
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
if ( ss.pid == 0 ) if ( pid == 0 )
{ {
zeek::Supervisor::RunStem(std::move(ss)); Stem stem{std::move(ss)};
supervised_node = stem.Run();
return {}; return {};
} }
return std::optional<Supervisor::StemState>(std::move(ss)); StemHandle sh;
} sh.pipe = std::move(ss.pipe);
sh.pid = pid;
Supervisor::SupervisedNode Supervisor::RunStem(StemState stem_state) return std::optional<Supervisor::StemHandle>(std::move(sh));
{
Stem s(std::move(stem_state));
supervised_node = s.Run();
return *supervised_node;
} }
static BifEnum::Supervisor::ClusterRole role_str_to_enum(std::string_view r) static BifEnum::Supervisor::ClusterRole role_str_to_enum(std::string_view r)

View file

@ -235,17 +235,13 @@ public:
}; };
/** /**
* State used to initalialize the Stem process. * State used to initalialize and communicate with the Stem process.
*/ */
struct StemState { struct StemHandle {
/** /**
* Bidirectional pipes that allow the Supervisor and Stem to talk. * Bidirectional pipes that allow the Supervisor and Stem to talk.
*/ */
std::unique_ptr<zeek::detail::PipePair> pipe; std::unique_ptr<zeek::detail::PipePair> pipe;
/**
* The Stem's parent process ID (i.e. PID of the Supervisor).
*/
pid_t parent_pid = 0;
/** /**
* The Stem's process ID. * The Stem's process ID.
*/ */
@ -261,7 +257,7 @@ public:
* function but a node it spawns via fork() will return from it and * function but a node it spawns via fork() will return from it and
* information about it is available in ThisNode(). * information about it is available in ThisNode().
*/ */
static std::optional<StemState> CreateStem(bool supervisor_mode); static std::optional<StemHandle> CreateStem(bool supervisor_mode);
/** /**
* @return the state which describes what a supervised node should know * @return the state which describes what a supervised node should know
@ -275,10 +271,10 @@ public:
/** /**
* Create a new Supervisor object. * Create a new Supervisor object.
* @param stem_state information about the Stem process that was already * @param stem_handle information about the Stem process that was already
* created via CreateStem() * created via CreateStem()
*/ */
Supervisor(Config cfg, StemState stem_state); Supervisor(Config cfg, StemHandle stem_handle);
/** /**
* Destruction also cleanly shuts down the entire supervised process tree. * Destruction also cleanly shuts down the entire supervised process tree.
@ -365,19 +361,6 @@ private:
const char* Tag() override const char* Tag() override
{ return "zeek::Supervisor"; } { return "zeek::Supervisor"; }
/**
* Run the Stem process. The Stem process will receive instructions from
* the Supervisor to manipulate the process hierarchy and it's in charge
* of directly monitoring for whether any nodes die premature and need
* to be revived.
* @param pipe bidirectional pipes that allow the Supervisor and Stem
* process to communicate.
* @param pid the Stem's parent process ID (i.e. the PID of the Supervisor)
* @return state which describes what a supervised node should know about
* itself. I.e. this function only returns from a fork()'d child process.
*/
static SupervisedNode RunStem(StemState stem_state);
static std::optional<SupervisedNode> supervised_node; static std::optional<SupervisedNode> supervised_node;
Config config; Config config;

View file

@ -415,7 +415,7 @@ zeek::detail::SetupResult zeek::detail::setup(int argc, char** argv,
exit(context.run()); exit(context.run());
} }
auto stem_state = zeek::Supervisor::CreateStem(options.supervisor_mode); auto stem = zeek::Supervisor::CreateStem(options.supervisor_mode);
if ( zeek::Supervisor::ThisNode() ) if ( zeek::Supervisor::ThisNode() )
zeek::Supervisor::ThisNode()->Init(&options); zeek::Supervisor::ThisNode()->Init(&options);
@ -487,8 +487,7 @@ zeek::detail::SetupResult zeek::detail::setup(int argc, char** argv,
zeek::Supervisor::Config cfg = {}; zeek::Supervisor::Config cfg = {};
cfg.zeek_exe_path = zeek_exe_path; cfg.zeek_exe_path = zeek_exe_path;
options.filter_supervisor_options(); options.filter_supervisor_options();
zeek::supervisor_mgr = new zeek::Supervisor(std::move(cfg), zeek::supervisor_mgr = new zeek::Supervisor(std::move(cfg), std::move(*stem));
std::move(*stem_state));
} }
const char* seed_load_file = zeekenv("ZEEK_SEED_FILE"); const char* seed_load_file = zeekenv("ZEEK_SEED_FILE");