Move some supervisor structures to detail namespace

* StemHandle -> detail::SupervisorStemHandle
* SupervisedNode -> detail::SupervisedNode
* Supervisor::Node -> detail::SupervisorNode
This commit is contained in:
Jon Siwek 2020-06-16 21:35:17 -07:00
parent 9087621714
commit a46e24091a
2 changed files with 171 additions and 163 deletions

View file

@ -44,8 +44,11 @@ extern "C" {
#endif
using namespace zeek;
using zeek::detail::SupervisedNode;
using zeek::detail::SupervisorNode;
using zeek::detail::SupervisorStemHandle;
std::optional<Supervisor::SupervisedNode> Supervisor::supervised_node;
std::optional<SupervisedNode> Supervisor::supervised_node;
namespace {
@ -68,29 +71,29 @@ struct Stem {
~Stem();
Supervisor::SupervisedNode Run();
SupervisedNode Run();
std::optional<Supervisor::SupervisedNode> Poll();
std::optional<SupervisedNode> Poll();
std::optional<Supervisor::SupervisedNode> Revive();
std::optional<SupervisedNode> Revive();
void Reap();
std::variant<bool, Supervisor::SupervisedNode> Spawn(Supervisor::Node* node);
std::variant<bool, SupervisedNode> Spawn(SupervisorNode* node);
int AliveNodeCount() const;
void KillNodes(int signal);
void KillNode(Supervisor::Node* node, int signal) const;
void KillNode(SupervisorNode* node, int signal) const;
void Destroy(Supervisor::Node* node) const;
void Destroy(SupervisorNode* node) const;
bool Wait(Supervisor::Node* node, int options) const;
bool Wait(SupervisorNode* node, int options) const;
void Shutdown(int exit_code);
void ReportStatus(const Supervisor::Node& node) const;
void ReportStatus(const SupervisorNode& node) const;
void Log(std::string_view type, const char* format, va_list args) const;
@ -102,7 +105,7 @@ struct Stem {
int last_signal = -1;
std::unique_ptr<zeek::detail::Flare> signal_flare;
std::unique_ptr<zeek::detail::PipePair> pipe;
std::map<std::string, Supervisor::Node> nodes;
std::map<std::string, SupervisorNode> nodes;
std::string msg_buffer;
bool shutting_down = false;
};
@ -181,7 +184,7 @@ void zeek::detail::ParentProcessCheckTimer::Dispatch(double t, bool is_expire)
interval));
}
Supervisor::Supervisor(Supervisor::Config cfg, StemHandle sh)
Supervisor::Supervisor(Supervisor::Config cfg, SupervisorStemHandle sh)
: config(std::move(cfg)), stem_pid(sh.pid), stem_pipe(std::move(sh.pipe))
{
stem_stdout.pipe = std::move(sh.stdout_pipe);
@ -618,7 +621,7 @@ void Stem::Reap()
}
}
bool Stem::Wait(Supervisor::Node* node, int options) const
bool Stem::Wait(SupervisorNode* node, int options) const
{
if ( node->pid <= 0 )
{
@ -671,7 +674,7 @@ bool Stem::Wait(Supervisor::Node* node, int options) const
return true;
}
void Stem::KillNode(Supervisor::Node* node, int signal) const
void Stem::KillNode(SupervisorNode* node, int signal) const
{
if ( node->pid <= 0 )
{
@ -699,7 +702,7 @@ static int get_kill_signal(int attempts, int max_attempts)
return SIGKILL;
}
void Stem::Destroy(Supervisor::Node* node) const
void Stem::Destroy(SupervisorNode* node) const
{
constexpr auto max_term_attempts = 13;
constexpr auto kill_delay = 2;
@ -727,7 +730,7 @@ void Stem::Destroy(Supervisor::Node* node) const
}
}
std::optional<Supervisor::SupervisedNode> Stem::Revive()
std::optional<SupervisedNode> Stem::Revive()
{
constexpr auto attempts_before_delay_increase = 3;
constexpr auto delay_increase_factor = 2;
@ -763,8 +766,8 @@ std::optional<Supervisor::SupervisedNode> Stem::Revive()
auto spawn_res = Spawn(&node);
if ( std::holds_alternative<Supervisor::SupervisedNode>(spawn_res) )
return std::get<Supervisor::SupervisedNode>(spawn_res);
if ( std::holds_alternative<SupervisedNode>(spawn_res) )
return std::get<SupervisedNode>(spawn_res);
if ( std::get<bool>(spawn_res) )
LogError("Supervised node '%s' (PID %d) revived after premature exit",
@ -776,7 +779,7 @@ std::optional<Supervisor::SupervisedNode> Stem::Revive()
return {};
}
std::variant<bool, Supervisor::SupervisedNode> Stem::Spawn(Supervisor::Node* node)
std::variant<bool, SupervisedNode> Stem::Spawn(SupervisorNode* node)
{
auto ppid = getpid();
auto fork_res = fork_with_stdio_redirect(fmt("node %s", node->Name().data()));
@ -794,7 +797,7 @@ std::variant<bool, Supervisor::SupervisedNode> Stem::Spawn(Supervisor::Node* nod
setsignal(SIGCHLD, SIG_DFL);
setsignal(SIGTERM, SIG_DFL);
zeek::set_thread_name(fmt("zeek.%s", node->Name().data()));
Supervisor::SupervisedNode rval;
SupervisedNode rval;
rval.config = node->config;
rval.parent_pid = ppid;
return rval;
@ -877,7 +880,7 @@ void Stem::Shutdown(int exit_code)
}
}
void Stem::ReportStatus(const Supervisor::Node& node) const
void Stem::ReportStatus(const SupervisorNode& node) const
{
std::string msg = fmt("status %s %d", node.Name().data(), node.pid);
safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
@ -916,7 +919,7 @@ void Stem::LogError(const char* format, ...) const
va_end(args);
}
Supervisor::SupervisedNode Stem::Run()
SupervisedNode Stem::Run()
{
for ( ; ; )
{
@ -931,7 +934,7 @@ Supervisor::SupervisedNode Stem::Run()
return {};
}
std::optional<Supervisor::SupervisedNode> Stem::Poll()
std::optional<SupervisedNode> Stem::Poll()
{
std::map<std::string, int> node_pollfd_indices;
constexpr auto fixed_fd_count = 2;
@ -1062,8 +1065,8 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
DBG_STEM("Stem creating node: %s (PID %d)", node.Name().data(), node.pid);
auto spawn_res = Spawn(&node);
if ( std::holds_alternative<Supervisor::SupervisedNode>(spawn_res) )
return std::get<Supervisor::SupervisedNode>(spawn_res);
if ( std::holds_alternative<SupervisedNode>(spawn_res) )
return std::get<SupervisedNode>(spawn_res);
ReportStatus(node);
}
@ -1085,8 +1088,8 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
auto spawn_res = Spawn(&node);
if ( std::holds_alternative<Supervisor::SupervisedNode>(spawn_res) )
return std::get<Supervisor::SupervisedNode>(spawn_res);
if ( std::holds_alternative<SupervisedNode>(spawn_res) )
return std::get<SupervisedNode>(spawn_res);
ReportStatus(node);
}
@ -1097,7 +1100,7 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
return {};
}
std::optional<Supervisor::StemHandle> Supervisor::CreateStem(bool supervisor_mode)
std::optional<SupervisorStemHandle> Supervisor::CreateStem(bool supervisor_mode)
{
// If the Stem needs to be re-created via fork()/exec(), then the necessary
// state information is communicated via ZEEK_STEM env. var.
@ -1157,12 +1160,12 @@ std::optional<Supervisor::StemHandle> Supervisor::CreateStem(bool supervisor_mod
return {};
}
StemHandle sh;
SupervisorStemHandle sh;
sh.pipe = std::move(ss.pipe);
sh.pid = pid;
sh.stdout_pipe = std::move(fork_res.stdout_pipe);
sh.stderr_pipe = std::move(fork_res.stderr_pipe);
return std::optional<Supervisor::StemHandle>(std::move(sh));
return std::optional<SupervisorStemHandle>(std::move(sh));
}
static BifEnum::Supervisor::ClusterRole role_str_to_enum(std::string_view r)
@ -1356,7 +1359,7 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const
return rval;
}
RecordValPtr Supervisor::Node::ToRecord() const
RecordValPtr SupervisorNode::ToRecord() const
{
const auto& rt = zeek::BifType::Record::Supervisor::NodeStatus;
auto rval = zeek::make_intrusive<zeek::RecordVal>(rt);
@ -1388,7 +1391,7 @@ static ValPtr supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterR
}
}
bool Supervisor::SupervisedNode::InitCluster() const
bool SupervisedNode::InitCluster() const
{
if ( config.cluster.empty() )
return false;
@ -1435,7 +1438,7 @@ bool Supervisor::SupervisedNode::InitCluster() const
return true;
}
void Supervisor::SupervisedNode::Init(zeek::Options* options) const
void SupervisedNode::Init(zeek::Options* options) const
{
const auto& node_name = config.name;

View file

@ -24,6 +24,11 @@
namespace zeek {
namespace detail {
struct SupervisorStemHandle;
struct SupervisedNode;
struct SupervisorNode;
/**
* A simple wrapper around a pipe to help do line-buffered output
* of a Supervisor/Stem child process' redirected stdout/stderr.
@ -185,132 +190,6 @@ public:
std::map<std::string, ClusterEndpoint> cluster;
};
/**
* State which defines a Supervised node's understanding of itself.
*/
struct SupervisedNode {
/**
* Initialize the Supervised node within the Zeek Cluster Framework.
* This function populates the "Cluster::nodes" script-layer variable
* that otherwise is expected to be populated by a
* "cluster-layout.zeek" script in other context (e.g. ZeekCtl
* generates that cluster layout).
* @return true if the supervised node is using the Cluster Framework
* else false.
*/
bool InitCluster() const;
/**
* Initialize the Supervised node.
* @param options the Zeek options to extend/modify as appropriate
* for the node's configuration.
*/
void Init(zeek::Options* options) const;
/**
* The node's configuration options.
*/
NodeConfig config;
/**
* The process ID of the supervised node's parent process (i.e. the PID
* of the Stem process).
*/
pid_t parent_pid;
};
/**
* The state of a supervised node from the Supervisor's perspective.
*/
struct Node {
/**
* Convert the node into script-layer Supervisor::NodeStatus record
* representation.
*/
RecordValPtr ToRecord() const;
/**
* @return the name of the node.
*/
const std::string& Name() const
{ return config.name; }
/**
* Create a new node state from a given configuration.
* @param arg_config the configuration to use for the node.
*/
Node(NodeConfig arg_config) : config(std::move(arg_config))
{ }
/**
* The desired configuration for the node.
*/
NodeConfig config;
/**
* Process ID of the node (positive/non-zero are valid/live PIDs).
*/
pid_t pid = 0;
/**
* Whether the node is voluntarily marked for termination by the
* Supervisor.
*/
bool killed = false;
/**
* The last exit status of the node.
*/
int exit_status = 0;
/**
* The last signal which terminated the node.
*/
int signal_number = 0;
/**
* Number of process revival attempts made after the node first died
* prematurely.
*/
int revival_attempts = 0;
/**
* How many seconds to wait until the next revival attempt for the node.
*/
int revival_delay = 1;
/**
* The time at which the node's process was last spawned.
*/
std::chrono::time_point<std::chrono::steady_clock> spawn_time;
/**
* A pipe that the Supervisor Stem can read from to obtain
* any output written to the Nodes's stdout.
*/
zeek::detail::LineBufferedPipe stdout_pipe;
/**
* A pipe that the Supervisor Stem can read from to obtain
* any output written to the Node's stdout.
*/
zeek::detail::LineBufferedPipe stderr_pipe;
};
/**
* State used to initalialize and communicate with the Stem process.
*/
struct StemHandle {
/**
* Bidirectional pipes that allow the Supervisor and Stem to talk.
*/
std::unique_ptr<zeek::detail::PipePair> pipe;
/**
* A pipe that the Supervisor can read from to obtain
* any output written to the Stem's stdout.
*/
std::unique_ptr<zeek::detail::Pipe> stdout_pipe;
/**
* A pipe that the Supervisor can read from to obtain
* any output written to the Stem's stdout.
*/
std::unique_ptr<zeek::detail::Pipe> stderr_pipe;
/**
* The Stem's process ID.
*/
pid_t pid = 0;
};
/**
* Create and run the Stem process if necessary.
* @param supervisor_mode whether Zeek was invoked with the supervisor
@ -320,24 +199,24 @@ public:
* function but a node it spawns via fork() will return from it and
* information about it is available in ThisNode().
*/
static std::optional<StemHandle> CreateStem(bool supervisor_mode);
static std::optional<detail::SupervisorStemHandle> CreateStem(bool supervisor_mode);
/**
* @return the state which describes what a supervised node should know
* about itself if this is a supervised process. If called from a process
* that is not supervised, this returns an "empty" object.
*/
static const std::optional<SupervisedNode>& ThisNode()
static const std::optional<detail::SupervisedNode>& ThisNode()
{ return supervised_node; }
using NodeMap = std::map<std::string, Node, std::less<>>;
using NodeMap = std::map<std::string, detail::SupervisorNode, std::less<>>;
/**
* Create a new Supervisor object.
* @param stem_handle information about the Stem process that was already
* created via CreateStem()
*/
Supervisor(Config cfg, StemHandle stem_handle);
Supervisor(Config cfg, detail::SupervisorStemHandle stem_handle);
/**
* Destruction also cleanly shuts down the entire supervised process tree.
@ -424,7 +303,7 @@ private:
const char* Tag() override
{ return "zeek::Supervisor"; }
static std::optional<SupervisedNode> supervised_node;
static std::optional<detail::SupervisedNode> supervised_node;
Config config;
pid_t stem_pid;
@ -438,6 +317,132 @@ private:
};
namespace detail {
/**
* State used to initalialize and talk to the Supervisor Stem process.
*/
struct SupervisorStemHandle {
/**
* Bidirectional pipes that allow the Supervisor and Stem to talk.
*/
std::unique_ptr<zeek::detail::PipePair> pipe;
/**
* A pipe that the Supervisor can read from to obtain
* any output written to the Stem's stdout.
*/
std::unique_ptr<zeek::detail::Pipe> stdout_pipe;
/**
* A pipe that the Supervisor can read from to obtain
* any output written to the Stem's stdout.
*/
std::unique_ptr<zeek::detail::Pipe> stderr_pipe;
/**
* The Stem's process ID.
*/
pid_t pid = 0;
};
/**
* State which defines a Supervised Zeek node's understanding of itself.
*/
struct SupervisedNode {
/**
* Initialize the Supervised node within the Zeek Cluster Framework.
* This function populates the "Cluster::nodes" script-layer variable
* that otherwise is expected to be populated by a
* "cluster-layout.zeek" script in other context (e.g. ZeekCtl
* generates that cluster layout).
* @return true if the supervised node is using the Cluster Framework
* else false.
*/
bool InitCluster() const;
/**
* Initialize the Supervised node.
* @param options the Zeek options to extend/modify as appropriate
* for the node's configuration.
*/
void Init(zeek::Options* options) const;
/**
* The node's configuration options.
*/
Supervisor::NodeConfig config;
/**
* The process ID of the supervised node's parent process (i.e. the PID
* of the Stem process).
*/
pid_t parent_pid;
};
/**
* The state of a supervised node from the Supervisor's perspective.
*/
struct SupervisorNode {
/**
* Convert the node into script-layer Supervisor::NodeStatus record
* representation.
*/
RecordValPtr ToRecord() const;
/**
* @return the name of the node.
*/
const std::string& Name() const
{ return config.name; }
/**
* Create a new node state from a given configuration.
* @param arg_config the configuration to use for the node.
*/
SupervisorNode(Supervisor::NodeConfig arg_config) : config(std::move(arg_config))
{ }
/**
* The desired configuration for the node.
*/
Supervisor::NodeConfig config;
/**
* Process ID of the node (positive/non-zero are valid/live PIDs).
*/
pid_t pid = 0;
/**
* Whether the node is voluntarily marked for termination by the
* Supervisor.
*/
bool killed = false;
/**
* The last exit status of the node.
*/
int exit_status = 0;
/**
* The last signal which terminated the node.
*/
int signal_number = 0;
/**
* Number of process revival attempts made after the node first died
* prematurely.
*/
int revival_attempts = 0;
/**
* How many seconds to wait until the next revival attempt for the node.
*/
int revival_delay = 1;
/**
* The time at which the node's process was last spawned.
*/
std::chrono::time_point<std::chrono::steady_clock> spawn_time;
/**
* A pipe that the Supervisor Stem can read from to obtain
* any output written to the Nodes's stdout.
*/
zeek::detail::LineBufferedPipe stdout_pipe;
/**
* A pipe that the Supervisor Stem can read from to obtain
* any output written to the Node's stdout.
*/
zeek::detail::LineBufferedPipe stderr_pipe;
};
/**
* A timer used by supervised processes to periodically check whether their
* parent (supervisor) process has died. If it has died, the supervised