Improve supervisor checks for parent process termination

Comparing parent process ID to 1 to detect loss of parent process was
not necessarily portable, so now it stores parent PID pre-fork and then
monitors for any change.
This commit is contained in:
Jon Siwek 2020-01-15 14:24:53 -08:00
parent 899a987527
commit 7ddd311583
5 changed files with 101 additions and 65 deletions

View file

@ -290,7 +290,15 @@ void net_run()
while ( iosource_mgr->Size() || while ( iosource_mgr->Size() ||
(BifConst::exit_only_after_terminate && ! terminating) ) (BifConst::exit_only_after_terminate && ! terminating) )
{ {
if ( zeek::supervised_node && getppid() == 1 ) // Note: only simple + portable way of detecting loss of parent
// process seems to be polling for change in PPID. There's platform
// specific ways if we do end up needing something more responsive
// and/or have to avoid overhead of polling, but maybe not worth
// the additional complexity:
// Linux: prctl(PR_SET_PDEATHSIG, ...)
// FreeBSD: procctl(PROC_PDEATHSIG_CTL)
// TODO: make this a proper timer
if ( zeek::supervised_node && zeek::supervised_node->parent_pid != getppid() )
zeek_terminate_loop("supervised cluster node was orphaned"); zeek_terminate_loop("supervised cluster node was orphaned");
double ts; double ts;

View file

@ -33,19 +33,19 @@ using namespace zeek;
namespace { namespace {
struct Stem { struct Stem {
Stem(std::unique_ptr<bro::PipePair> p); Stem(std::unique_ptr<bro::PipePair> p, pid_t parent_pid);
~Stem(); ~Stem();
std::optional<Supervisor::NodeConfig> Run(); std::optional<Supervisor::SupervisedNode> Run();
std::optional<Supervisor::NodeConfig> Poll(); std::optional<Supervisor::SupervisedNode> Poll();
std::optional<Supervisor::NodeConfig> Revive(); std::optional<Supervisor::SupervisedNode> Revive();
void Reap(); void Reap();
bool Spawn(Supervisor::Node* node); std::optional<Supervisor::SupervisedNode> Spawn(Supervisor::Node* node);
int AliveNodeCount() const; int AliveNodeCount() const;
@ -65,6 +65,7 @@ struct Stem {
void LogError(const char* format, ...) const __attribute__((format(printf, 2, 3))); void LogError(const char* format, ...) const __attribute__((format(printf, 2, 3)));
pid_t parent_pid;
int last_signal = -1; int last_signal = -1;
std::unique_ptr<bro::Flare> signal_flare; std::unique_ptr<bro::Flare> signal_flare;
std::unique_ptr<bro::PipePair> pipe; std::unique_ptr<bro::PipePair> pipe;
@ -260,6 +261,7 @@ void Supervisor::HandleChildSignal()
return; return;
// Revive the Stem process // Revive the Stem process
auto stem_ppid = getpid();
stem_pid = fork(); stem_pid = fork();
if ( stem_pid == -1 ) if ( stem_pid == -1 )
@ -277,7 +279,7 @@ void Supervisor::HandleChildSignal()
if ( stem_pid == 0 ) if ( stem_pid == 0 )
{ {
// Child stem process needs to exec() // Child stem process needs to exec()
auto stem_env = fmt("%d,%d,%d,%d", auto stem_env = fmt("%d,%d,%d,%d,%d", stem_ppid,
stem_pipe->In().ReadFD(), stem_pipe->In().WriteFD(), stem_pipe->In().ReadFD(), stem_pipe->In().WriteFD(),
stem_pipe->Out().ReadFD(), stem_pipe->Out().WriteFD()); stem_pipe->Out().ReadFD(), stem_pipe->Out().WriteFD());
@ -376,8 +378,8 @@ size_t Supervisor::ProcessMessages()
return msgs.size(); return msgs.size();
} }
Stem::Stem(std::unique_ptr<bro::PipePair> p) Stem::Stem(std::unique_ptr<bro::PipePair> p, pid_t ppid)
: signal_flare(new bro::Flare()), pipe(std::move(p)) : parent_pid(ppid), signal_flare(new bro::Flare()), pipe(std::move(p))
{ {
zeek::set_thread_name("zeek.stem"); zeek::set_thread_name("zeek.stem");
pipe->Swap(); pipe->Swap();
@ -486,7 +488,7 @@ void Stem::Destroy(Supervisor::Node* node) const
} }
} }
std::optional<Supervisor::NodeConfig> Stem::Revive() std::optional<Supervisor::SupervisedNode> Stem::Revive()
{ {
constexpr auto attempts_before_delay_increase = 3; constexpr auto attempts_before_delay_increase = 3;
constexpr auto delay_increase_factor = 2; constexpr auto delay_increase_factor = 2;
@ -520,8 +522,10 @@ std::optional<Supervisor::NodeConfig> Stem::Revive()
if ( node.revival_attempts % attempts_before_delay_increase == 0 ) if ( node.revival_attempts % attempts_before_delay_increase == 0 )
node.revival_delay *= delay_increase_factor; node.revival_delay *= delay_increase_factor;
if ( Spawn(&node) ) auto sn = Spawn(&node);
return node.config;
if ( sn )
return sn;
ReportStatus(node); ReportStatus(node);
} }
@ -529,15 +533,16 @@ std::optional<Supervisor::NodeConfig> Stem::Revive()
return {}; return {};
} }
bool Stem::Spawn(Supervisor::Node* node) std::optional<Supervisor::SupervisedNode> Stem::Spawn(Supervisor::Node* node)
{ {
auto ppid = getpid();
auto node_pid = fork(); auto node_pid = fork();
if ( node_pid == -1 ) if ( node_pid == -1 )
{ {
LogError("failed to fork Zeek node '%s': %s", LogError("failed to fork Zeek node '%s': %s",
node->Name().data(), strerror(errno)); node->Name().data(), strerror(errno));
return false; return {};
} }
if ( node_pid == 0 ) if ( node_pid == 0 )
@ -545,13 +550,16 @@ bool Stem::Spawn(Supervisor::Node* node)
setsignal(SIGCHLD, SIG_DFL); setsignal(SIGCHLD, SIG_DFL);
setsignal(SIGTERM, SIG_DFL); setsignal(SIGTERM, SIG_DFL);
zeek::set_thread_name(fmt("zeek.%s", node->Name().data())); zeek::set_thread_name(fmt("zeek.%s", node->Name().data()));
return true; Supervisor::SupervisedNode rval;
rval.config = node->config;
rval.parent_pid = ppid;
return rval;
} }
node->pid = node_pid; node->pid = node_pid;
node->spawn_time = std::chrono::steady_clock::now(); node->spawn_time = std::chrono::steady_clock::now();
DBG_STEM("Stem spawned node: %s (%d)", node->Name().data(), node->pid); DBG_STEM("Stem spawned node: %s (%d)", node->Name().data(), node->pid);
return false; return {};
} }
int Stem::AliveNodeCount() const int Stem::AliveNodeCount() const
@ -660,7 +668,7 @@ void Stem::LogError(const char* format, ...) const
#endif #endif
} }
std::optional<Supervisor::NodeConfig> Stem::Run() std::optional<Supervisor::SupervisedNode> Stem::Run()
{ {
for ( ; ; ) for ( ; ; )
{ {
@ -673,10 +681,12 @@ std::optional<Supervisor::NodeConfig> Stem::Run()
return {}; return {};
} }
std::optional<Supervisor::NodeConfig> Stem::Poll() std::optional<Supervisor::SupervisedNode> Stem::Poll()
{ {
pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 },
{ signal_flare->FD(), POLLIN, 0} }; { signal_flare->FD(), POLLIN, 0} };
// Note: the poll timeout here is for periodically checking if the parent
// process died (see below).
constexpr auto poll_timeout_ms = 1000; constexpr auto poll_timeout_ms = 1000;
auto res = poll(fds, 2, poll_timeout_ms); auto res = poll(fds, 2, poll_timeout_ms);
@ -695,11 +705,15 @@ std::optional<Supervisor::NodeConfig> Stem::Poll()
last_signal = -1; last_signal = -1;
} }
if ( getppid() == 1 ) if ( getppid() != parent_pid )
{ {
// TODO: better way to detect loss of parent than polling ? // Note: only simple + portable way of detecting loss of parent
// e.g. prctl(PR_SET_PDEATHSIG, ...) on Linux // process seems to be polling for change in PPID. There's platform
// or procctl(PROC_PDEATHSIG_CTL) on FreeBSD // specific ways if we do end up needing something more responsive
// and/or have to avoid overhead of polling, but maybe not worth
// the additional complexity:
// Linux: prctl(PR_SET_PDEATHSIG, ...)
// FreeBSD: procctl(PROC_PDEATHSIG_CTL)
DBG_STEM("Stem suicide"); DBG_STEM("Stem suicide");
Shutdown(13); Shutdown(13);
} }
@ -761,8 +775,10 @@ std::optional<Supervisor::NodeConfig> Stem::Poll()
auto it = nodes.emplace(node_name, std::move(node_config)).first; auto it = nodes.emplace(node_name, std::move(node_config)).first;
auto& node = it->second; auto& node = it->second;
if ( Spawn(&node) ) auto sn = Spawn(&node);
return node.config;
if ( sn )
return sn;
DBG_STEM("Stem created node: %s (%d)", node.Name().data(), node.pid); DBG_STEM("Stem created node: %s (%d)", node.Name().data(), node.pid);
ReportStatus(node); ReportStatus(node);
@ -784,8 +800,10 @@ std::optional<Supervisor::NodeConfig> Stem::Poll()
DBG_STEM("Stem restarting node: %s", node_name.data()); DBG_STEM("Stem restarting node: %s", node_name.data());
Destroy(&node); Destroy(&node);
if ( Spawn(&node) ) auto sn = Spawn(&node);
return node.config;
if ( sn )
return sn;
ReportStatus(node); ReportStatus(node);
} }
@ -796,9 +814,9 @@ std::optional<Supervisor::NodeConfig> Stem::Poll()
return {}; return {};
} }
std::optional<Supervisor::NodeConfig> Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe) std::optional<Supervisor::SupervisedNode> Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe, pid_t parent_pid)
{ {
Stem s(std::move(pipe)); Stem s(std::move(pipe), parent_pid);
return s.Run(); return s.Run();
} }
@ -1028,8 +1046,11 @@ static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRol
} }
} }
void Supervisor::NodeConfig::InitCluster() bool Supervisor::SupervisedNode::InitCluster()
{ {
if ( supervised_node->config.cluster.empty() )
return false;
auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType(); auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType();
auto cluster_nodes_id = global_scope()->Lookup("Cluster::nodes"); auto cluster_nodes_id = global_scope()->Lookup("Cluster::nodes");
auto cluster_manager_is_logger_id = global_scope()->Lookup("Cluster::manager_is_logger"); auto cluster_manager_is_logger_id = global_scope()->Lookup("Cluster::manager_is_logger");
@ -1037,7 +1058,7 @@ void Supervisor::NodeConfig::InitCluster()
auto has_logger = false; auto has_logger = false;
std::optional<std::string> manager_name; std::optional<std::string> manager_name;
for ( const auto& e : supervised_node->cluster ) for ( const auto& e : supervised_node->config.cluster )
{ {
if ( e.second.role == BifEnum::Supervisor::MANAGER ) if ( e.second.role == BifEnum::Supervisor::MANAGER )
manager_name = e.first; manager_name = e.first;
@ -1045,7 +1066,7 @@ void Supervisor::NodeConfig::InitCluster()
has_logger = true; has_logger = true;
} }
for ( const auto& e : supervised_node->cluster ) for ( const auto& e : supervised_node->config.cluster )
{ {
const auto& node_name = e.first; const auto& node_name = e.first;
const auto& ep = e.second; const auto& ep = e.second;
@ -1069,6 +1090,7 @@ void Supervisor::NodeConfig::InitCluster()
} }
cluster_manager_is_logger_id->SetVal(val_mgr->GetBool(! has_logger)); cluster_manager_is_logger_id->SetVal(val_mgr->GetBool(! has_logger));
return true;
} }
RecordVal* Supervisor::Status(std::string_view node_name) RecordVal* Supervisor::Status(std::string_view node_name)

View file

@ -34,7 +34,6 @@ public:
}; };
struct NodeConfig { struct NodeConfig {
static void InitCluster();
static NodeConfig FromRecord(const RecordVal* node_val); static NodeConfig FromRecord(const RecordVal* node_val);
static NodeConfig FromJSON(std::string_view json); static NodeConfig FromJSON(std::string_view json);
@ -51,6 +50,13 @@ public:
std::map<std::string, ClusterEndpoint> cluster; std::map<std::string, ClusterEndpoint> cluster;
}; };
struct SupervisedNode {
static bool InitCluster();
NodeConfig config;
pid_t parent_pid;
};
struct Node { struct Node {
IntrusivePtr<RecordVal> ToRecord() const; IntrusivePtr<RecordVal> ToRecord() const;
@ -69,7 +75,8 @@ public:
std::chrono::time_point<std::chrono::steady_clock> spawn_time; std::chrono::time_point<std::chrono::steady_clock> spawn_time;
}; };
static std::optional<NodeConfig> RunStem(std::unique_ptr<bro::PipePair> pipe); static std::optional<SupervisedNode> RunStem(std::unique_ptr<bro::PipePair> pipe,
pid_t parent_pid);
using NodeMap = std::map<std::string, Node, std::less<>>; using NodeMap = std::map<std::string, Node, std::less<>>;
@ -120,6 +127,6 @@ private:
}; };
extern Supervisor* supervisor; extern Supervisor* supervisor;
extern std::optional<Supervisor::NodeConfig> supervised_node; extern std::optional<Supervisor::SupervisedNode> supervised_node;
} // namespace zeek } // namespace zeek

View file

@ -100,7 +100,7 @@ zeekygen::Manager* zeekygen_mgr = 0;
iosource::Manager* iosource_mgr = 0; iosource::Manager* iosource_mgr = 0;
bro_broker::Manager* broker_mgr = 0; bro_broker::Manager* broker_mgr = 0;
zeek::Supervisor* zeek::supervisor = 0; zeek::Supervisor* zeek::supervisor = 0;
std::optional<zeek::Supervisor::NodeConfig> zeek::supervised_node; std::optional<zeek::Supervisor::SupervisedNode> zeek::supervised_node;
std::vector<std::string> zeek_script_prefixes; std::vector<std::string> zeek_script_prefixes;
Stmt* stmts; Stmt* stmts;
@ -328,55 +328,53 @@ struct zeek_options {
static void init_supervised_node(zeek_options* options) static void init_supervised_node(zeek_options* options)
{ {
const auto& node_name = zeek::supervised_node->name; const auto& config = zeek::supervised_node->config;
const auto& node_name = config.name;
if ( zeek::supervised_node->directory ) if ( config.directory )
{ {
if ( chdir(zeek::supervised_node->directory->data()) ) if ( chdir(config.directory->data()) )
{ {
fprintf(stderr, "node '%s' failed to chdir to %s: %s\n", fprintf(stderr, "node '%s' failed to chdir to %s: %s\n",
node_name.data(), node_name.data(), config.directory->data(),
zeek::supervised_node->directory->data(),
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
} }
if ( zeek::supervised_node->stderr_file ) if ( config.stderr_file )
{ {
auto fd = open(zeek::supervised_node->stderr_file->data(), auto fd = open(config.stderr_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600); 0600);
if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 ) if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 )
{ {
fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n", fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n",
node_name.data(), node_name.data(), config.stderr_file->data(),
zeek::supervised_node->stderr_file->data(),
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
} }
if ( zeek::supervised_node->stdout_file ) if ( config.stdout_file )
{ {
auto fd = open(zeek::supervised_node->stdout_file->data(), auto fd = open(config.stdout_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600); 0600);
if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 ) if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 )
{ {
fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n", fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n",
node_name.data(), node_name.data(), config.stdout_file->data(),
zeek::supervised_node->stdout_file->data(),
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
} }
if ( zeek::supervised_node->cpu_affinity ) if ( config.cpu_affinity )
{ {
auto res = zeek::set_affinity(*zeek::supervised_node->cpu_affinity); auto res = zeek::set_affinity(*config.cpu_affinity);
if ( ! res ) if ( ! res )
fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n", fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n",
@ -385,10 +383,10 @@ static void init_supervised_node(zeek_options* options)
options->filter_supervised_node_options(); options->filter_supervised_node_options();
if ( zeek::supervised_node->interface ) if ( config.interface )
options->interfaces.emplace_back(*zeek::supervised_node->interface); options->interfaces.emplace_back(*config.interface);
if ( ! zeek::supervised_node->cluster.empty() ) if ( ! config.cluster.empty() )
{ {
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 ) if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{ {
@ -398,7 +396,7 @@ static void init_supervised_node(zeek_options* options)
} }
} }
for ( const auto& s : zeek::supervised_node->scripts ) for ( const auto& s : config.scripts )
options->scripts_to_load.emplace_back(s); options->scripts_to_load.emplace_back(s);
} }
@ -974,27 +972,30 @@ int main(int argc, char** argv)
if ( zeek_stem_env ) if ( zeek_stem_env )
{ {
std::vector<std::string> fd_strings; std::vector<std::string> zeek_stem_nums;
tokenize_string(zeek_stem_env, ",", &fd_strings); tokenize_string(zeek_stem_env, ",", &zeek_stem_nums);
if ( fd_strings.size() != 4 ) if ( zeek_stem_nums.size() != 5 )
{ {
fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n", fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n",
zeek_stem_env); zeek_stem_env);
exit(1); exit(1);
} }
pid_t stem_ppid = std::stoi(zeek_stem_nums[0]);
int fds[4]; int fds[4];
for ( auto i = 0; i < 4; ++i ) for ( auto i = 0; i < 4; ++i )
fds[i] = std::stoi(fd_strings[i]); fds[i] = std::stoi(zeek_stem_nums[i + 1]);
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds}); supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds});
zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe),
stem_ppid);
} }
else if ( options.supervisor_mode ) else if ( options.supervisor_mode )
{ {
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK}); supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK});
auto stem_ppid = getpid();
stem_pid = fork(); stem_pid = fork();
if ( stem_pid == -1 ) if ( stem_pid == -1 )
@ -1005,7 +1006,8 @@ int main(int argc, char** argv)
} }
if ( stem_pid == 0 ) if ( stem_pid == 0 )
zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); zeek::supervised_node = zeek::Supervisor::RunStem(std::move(supervisor_pipe),
stem_ppid);
} }
if ( zeek::supervised_node ) if ( zeek::supervised_node )

View file

@ -68,11 +68,8 @@ function Supervisor::__restart%(node: string%): bool
function Supervisor::__init_cluster%(%): bool function Supervisor::__init_cluster%(%): bool
%{ %{
if ( zeek::supervised_node && ! zeek::supervised_node->cluster.empty() ) if ( zeek::supervised_node )
{ return val_mgr->GetBool(zeek::supervised_node->InitCluster());
zeek::supervised_node->InitCluster();
return val_mgr->GetBool(true);
}
return val_mgr->GetBool(false); return val_mgr->GetBool(false);
%} %}