mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Add Supervisor documentation
Minor additions/changes to improve API I noticed along the way
This commit is contained in:
parent
1972190b89
commit
8247c42368
11 changed files with 405 additions and 34 deletions
2
doc
2
doc
|
@ -1 +1 @@
|
||||||
Subproject commit 7192dbedf3ca9ce49294057262074f0e888177f3
|
Subproject commit 0cb30512c52990fcdb1e93b5219f65c9b3d18dce
|
|
@ -1,9 +1,9 @@
|
||||||
##! The Zeek process supervision API.
|
##! The Zeek process supervision API.
|
||||||
# TODO: add proper docs
|
|
||||||
|
|
||||||
module Supervisor;
|
module Supervisor;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
## The role a supervised-node will play in Zeek's Cluster Framework.
|
||||||
type ClusterRole: enum {
|
type ClusterRole: enum {
|
||||||
NONE,
|
NONE,
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
@ -12,52 +12,178 @@ export {
|
||||||
WORKER,
|
WORKER,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Describes configuration of a supervised-node within Zeek's Cluster
|
||||||
|
## Framework.
|
||||||
type ClusterEndpoint: record {
|
type ClusterEndpoint: record {
|
||||||
|
## The role a supervised-node will play in Zeek's Cluster Framework.
|
||||||
role: ClusterRole;
|
role: ClusterRole;
|
||||||
|
## The host/IP at which the cluster node runs.
|
||||||
host: addr;
|
host: addr;
|
||||||
|
## The TCP port at which the cluster node listens for connections.
|
||||||
p: port;
|
p: port;
|
||||||
|
## The interface name from which the node will read/analyze packets.
|
||||||
|
## Typically used by worker nodes.
|
||||||
interface: string &optional;
|
interface: string &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Configuration options that influence behavior of a supervised Zeek node.
|
||||||
type NodeConfig: record {
|
type NodeConfig: record {
|
||||||
|
## The name of the supervised node. These are unique within a given
|
||||||
|
## supervised process tree and typically human-readable.
|
||||||
name: string;
|
name: string;
|
||||||
|
## The interface name from which the node will read/analyze packets.
|
||||||
interface: string &optional;
|
interface: string &optional;
|
||||||
|
## The working directory that the node should use.
|
||||||
directory: string &optional;
|
directory: string &optional;
|
||||||
|
## The filename/path to which the node's stdout will be redirected.
|
||||||
stdout_file: string &optional;
|
stdout_file: string &optional;
|
||||||
|
## The filename/path to which the node's stderr will be redirected.
|
||||||
stderr_file: string &optional;
|
stderr_file: string &optional;
|
||||||
|
## Additional script filenames/paths that the node should load.
|
||||||
scripts: vector of string &default = vector();
|
scripts: vector of string &default = vector();
|
||||||
|
## A cpu/core number to which the node will try to pin itself.
|
||||||
cpu_affinity: int &optional;
|
cpu_affinity: int &optional;
|
||||||
|
## The Cluster Layout definition. Each node in the Cluster Framework
|
||||||
|
## knows about the full, static cluster topology to which it belongs.
|
||||||
|
## Entries use node names for keys. The Supervisor framework will
|
||||||
|
## automatically translate this table into the right Cluster Framework
|
||||||
|
## configuration when spawning supervised-nodes. E.g. it will
|
||||||
|
## populate the both the CLUSTER_NODE environment variable and
|
||||||
|
## :zeek:see:`Cluster::nodes` table.
|
||||||
cluster: table[string] of ClusterEndpoint &default=table();
|
cluster: table[string] of ClusterEndpoint &default=table();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## The current status of a supervised node.
|
||||||
type NodeStatus: record {
|
type NodeStatus: record {
|
||||||
|
## The desired node configuration.
|
||||||
node: NodeConfig;
|
node: NodeConfig;
|
||||||
|
## The current or last known process ID of the node. This may not
|
||||||
|
## be initialized if the process has not yet started.
|
||||||
pid: int &optional;
|
pid: int &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## The current status of a set of supervised nodes.
|
||||||
type Status: record {
|
type Status: record {
|
||||||
|
## The status of supervised nodes, keyed by node names.
|
||||||
nodes: table[string] of NodeStatus;
|
nodes: table[string] of NodeStatus;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
## Create a new supervised node process.
|
||||||
|
## It's an error to call this from a process other than a Supervisor.
|
||||||
|
##
|
||||||
|
## node: the desired configuration for the new supervised node process.
|
||||||
|
##
|
||||||
|
## Returns: an empty string on success or description of the error/failure.
|
||||||
global create: function(node: NodeConfig): string;
|
global create: function(node: NodeConfig): string;
|
||||||
|
|
||||||
|
## Retrieve current status of a supervised node process.
|
||||||
|
## It's an error to call this from a process other than a Supervisor.
|
||||||
|
##
|
||||||
|
## node: the name of the node to get the status of or an empty string
|
||||||
|
## to mean "all nodes".
|
||||||
|
##
|
||||||
|
## Returns: the current status of a set of nodes.
|
||||||
global status: function(node: string &default=""): Status;
|
global status: function(node: string &default=""): Status;
|
||||||
|
|
||||||
|
## Restart a supervised node process by destroying (killing) and
|
||||||
|
## re-recreating it.
|
||||||
|
## It's an error to call this from a process other than a Supervisor.
|
||||||
|
##
|
||||||
|
## node: the name of the node to restart or an empty string to mean
|
||||||
|
## "all nodes".
|
||||||
|
##
|
||||||
|
## Returns: true on success.
|
||||||
global restart: function(node: string &default=""): bool;
|
global restart: function(node: string &default=""): bool;
|
||||||
|
|
||||||
|
## Destroy and remove a supervised node process.
|
||||||
|
## It's an error to call this from a process other than a Supervisor.
|
||||||
|
##
|
||||||
|
## node: the name of the node to destroy or an empty string to mean
|
||||||
|
## "all nodes".
|
||||||
|
##
|
||||||
|
## Returns: true on success.
|
||||||
global destroy: function(node: string &default=""): bool;
|
global destroy: function(node: string &default=""): bool;
|
||||||
|
|
||||||
|
## Returns: true if this is the Supervisor process.
|
||||||
global is_supervisor: function(): bool;
|
global is_supervisor: function(): bool;
|
||||||
|
|
||||||
|
## Returns: true if this is a supervised node process.
|
||||||
global is_supervised: function(): bool;
|
global is_supervised: function(): bool;
|
||||||
|
|
||||||
|
## Returns: the node configuration if this is a supervised node.
|
||||||
|
## It's an error to call this function from a process other than
|
||||||
|
## a supervised one.
|
||||||
|
global node: function(): NodeConfig;
|
||||||
|
|
||||||
|
## Send a request to a remote Supervisor process to create a node.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string that will be directly echoed in the response
|
||||||
|
##
|
||||||
|
## node: the desired configuration for the new supervised node process.
|
||||||
global Supervisor::create_request: event(reqid: string, node: NodeConfig);
|
global Supervisor::create_request: event(reqid: string, node: NodeConfig);
|
||||||
|
|
||||||
|
## Handle a response from a Supervisor process that received
|
||||||
|
## :zeek:see:`Supervisor::create_request`.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string matching the value in the original request.
|
||||||
|
##
|
||||||
|
## result: the return value of the remote call to
|
||||||
|
## :zeek:see:`Supervisor::create`.
|
||||||
global Supervisor::create_response: event(reqid: string, result: string);
|
global Supervisor::create_response: event(reqid: string, result: string);
|
||||||
|
|
||||||
|
## Send a request to a remote Supervisor process to retrieve node status.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string that will be directly echoed in the response
|
||||||
|
##
|
||||||
|
## node: the name of the node to get status of or empty string to mean "all
|
||||||
|
## nodes".
|
||||||
global Supervisor::status_request: event(reqid: string, node: string);
|
global Supervisor::status_request: event(reqid: string, node: string);
|
||||||
|
|
||||||
|
## Handle a response from a Supervisor process that received
|
||||||
|
## :zeek:see:`Supervisor::status_request`.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string matching the value in the original request.
|
||||||
|
##
|
||||||
|
## result: the return value of the remote call to
|
||||||
|
## :zeek:see:`Supervisor::status`.
|
||||||
global Supervisor::status_response: event(reqid: string, result: Status);
|
global Supervisor::status_response: event(reqid: string, result: Status);
|
||||||
|
|
||||||
|
## Send a request to a remote Supervisor process to restart a node.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string that will be directly echoed in the response
|
||||||
|
##
|
||||||
|
## node: the name of the node to restart or empty string to mean "all
|
||||||
|
## nodes".
|
||||||
global Supervisor::restart_request: event(reqid: string, node: string);
|
global Supervisor::restart_request: event(reqid: string, node: string);
|
||||||
|
|
||||||
|
## Handle a response from a Supervisor process that received
|
||||||
|
## :zeek:see:`Supervisor::restart_request`.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string matching the value in the original request.
|
||||||
|
##
|
||||||
|
## result: the return value of the remote call to
|
||||||
|
## :zeek:see:`Supervisor::restart`.
|
||||||
global Supervisor::restart_response: event(reqid: string, result: bool);
|
global Supervisor::restart_response: event(reqid: string, result: bool);
|
||||||
|
|
||||||
|
## Send a request to a remote Supervisor process to destroy a node.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string that will be directly echoed in the response
|
||||||
|
##
|
||||||
|
## node: the name of the node to destory or empty string to mean "all
|
||||||
|
## nodes".
|
||||||
global Supervisor::destroy_request: event(reqid: string, node: string);
|
global Supervisor::destroy_request: event(reqid: string, node: string);
|
||||||
|
|
||||||
|
## Handle a response from a Supervisor process that received
|
||||||
|
## :zeek:see:`Supervisor::destroy_request`.
|
||||||
|
##
|
||||||
|
## reqid: an arbitrary string matching the value in the original request.
|
||||||
|
##
|
||||||
|
## result: the return value of the remote call to
|
||||||
|
## :zeek:see:`Supervisor::destroy`.
|
||||||
global Supervisor::destroy_response: event(reqid: string, result: bool);
|
global Supervisor::destroy_response: event(reqid: string, result: bool);
|
||||||
|
|
||||||
|
## Send a request to a remote Supervisor to stop and shutdown its
|
||||||
|
## process tree. There is no response to this message as the Supervisor
|
||||||
|
## simply terminates on receipt.
|
||||||
global Supervisor::stop_request: event();
|
global Supervisor::stop_request: event();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
##! Implements Zeek process supervision configuration options and default
|
##! Implements Zeek process supervision configuration options and default
|
||||||
##! behavior.
|
##! behavior.
|
||||||
# TODO: add proper docs
|
|
||||||
|
|
||||||
@load ./api
|
@load ./api
|
||||||
@load base/frameworks/broker
|
@load base/frameworks/broker
|
||||||
|
@ -8,6 +7,10 @@
|
||||||
module Supervisor;
|
module Supervisor;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
## The Broker topic prefix to use when subscribing to Supervisor API
|
||||||
|
## requests and when publishing Supervisor API responses. If you are
|
||||||
|
## publishing Supervisor requests, this is also the prefix string to use
|
||||||
|
## for their topic names.
|
||||||
const topic_prefix = "zeek/supervisor" &redef;
|
const topic_prefix = "zeek/supervisor" &redef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,3 +81,8 @@ function is_supervised(): bool
|
||||||
{
|
{
|
||||||
return Supervisor::__is_supervised();
|
return Supervisor::__is_supervised();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function node(): NodeConfig
|
||||||
|
{
|
||||||
|
return Supervisor::__node();
|
||||||
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ struct Stem {
|
||||||
|
|
||||||
~Stem();
|
~Stem();
|
||||||
|
|
||||||
std::optional<Supervisor::SupervisedNode> Run();
|
Supervisor::SupervisedNode Run();
|
||||||
|
|
||||||
std::optional<Supervisor::SupervisedNode> Poll();
|
std::optional<Supervisor::SupervisedNode> Poll();
|
||||||
|
|
||||||
|
@ -707,16 +707,18 @@ void Stem::LogError(const char* format, ...) const
|
||||||
va_end(args);
|
va_end(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<Supervisor::SupervisedNode> Stem::Run()
|
Supervisor::SupervisedNode Stem::Run()
|
||||||
{
|
{
|
||||||
for ( ; ; )
|
for ( ; ; )
|
||||||
{
|
{
|
||||||
auto new_node = Poll();
|
auto new_node = Poll();
|
||||||
|
|
||||||
if ( new_node )
|
if ( new_node )
|
||||||
return new_node;
|
return *new_node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shouldn't be reached.
|
||||||
|
assert(false);
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -854,7 +856,7 @@ std::optional<Supervisor::SupervisedNode> Stem::Poll()
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<Supervisor::SupervisedNode> Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe, pid_t parent_pid)
|
Supervisor::SupervisedNode Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe, pid_t parent_pid)
|
||||||
{
|
{
|
||||||
Stem s(std::move(pipe), parent_pid);
|
Stem s(std::move(pipe), parent_pid);
|
||||||
return s.Run();
|
return s.Run();
|
||||||
|
|
264
src/Supervisor.h
264
src/Supervisor.h
|
@ -20,98 +20,296 @@
|
||||||
|
|
||||||
namespace zeek {
|
namespace zeek {
|
||||||
|
|
||||||
class ParentProcessCheckTimer : public Timer {
|
/**
|
||||||
public:
|
* A Supervisor object manages a tree of persistent Zeek processes. If any
|
||||||
|
* child process dies it will be re-created with its original configuration.
|
||||||
ParentProcessCheckTimer(double t, double arg_interval);
|
* The Supervisor process itself actually only manages a single child process,
|
||||||
|
* called the Stem process. That Stem is created via a fork() just after the
|
||||||
void Dispatch(double t, int is_expire) override;
|
* command-line arguments have been parsed. The Stem process is used as the
|
||||||
|
* baseline image for spawning and supervising further Zeek child nodes since
|
||||||
protected:
|
* it has the purest global state without having to risk an exec() using an
|
||||||
|
* on-disk binary that's changed in the meantime from the original Supervisor's
|
||||||
double interval;
|
* version of the Zeek binary. However, if the Stem process itself dies
|
||||||
};
|
* prematurely, the Supervisor will have to fork() and exec() to revive it (and
|
||||||
|
* then the revived Stem will re-spawn its own children). Any node in the tree
|
||||||
|
* will self-terminate if it detects its parent has died and that detection is
|
||||||
|
* done via polling for change in parent process ID.
|
||||||
|
*/
|
||||||
class Supervisor : public iosource::IOSource {
|
class Supervisor : public iosource::IOSource {
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration options that change Supervisor behavior.
|
||||||
|
*/
|
||||||
struct Config {
|
struct Config {
|
||||||
|
/**
|
||||||
|
* The filesystem path of the Zeek binary/executable. This is used
|
||||||
|
* if the Stem process ever dies and we need to fork() and exec() to
|
||||||
|
* re-create it.
|
||||||
|
*/
|
||||||
std::string zeek_exe_path;
|
std::string zeek_exe_path;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration options that influence how a Supervised Zeek node
|
||||||
|
* integrates into the normal Zeek Cluster Framework.
|
||||||
|
*/
|
||||||
struct ClusterEndpoint {
|
struct ClusterEndpoint {
|
||||||
|
/**
|
||||||
|
* The node's role within the cluster. E.g. manager, logger, worker.
|
||||||
|
*/
|
||||||
BifEnum::Supervisor::ClusterRole role;
|
BifEnum::Supervisor::ClusterRole role;
|
||||||
|
/**
|
||||||
|
* The host/IP at which the cluster node is listening for connections.
|
||||||
|
*/
|
||||||
std::string host;
|
std::string host;
|
||||||
|
/**
|
||||||
|
* The TCP port number at which the cluster node listens for connections.
|
||||||
|
*/
|
||||||
int port;
|
int port;
|
||||||
|
/**
|
||||||
|
* The interface name from which the node read/analyze packets.
|
||||||
|
* Typically used by worker nodes.
|
||||||
|
*/
|
||||||
std::optional<std::string> interface;
|
std::optional<std::string> interface;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration options that influence behavior of a Supervised Zeek node.
|
||||||
|
*/
|
||||||
struct NodeConfig {
|
struct NodeConfig {
|
||||||
|
/**
|
||||||
|
* Create configuration from script-layer record value.
|
||||||
|
* @param node_val the script-layer record value to convert.
|
||||||
|
*/
|
||||||
static NodeConfig FromRecord(const RecordVal* node_val);
|
static NodeConfig FromRecord(const RecordVal* node_val);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create configuration from JSON representation.
|
||||||
|
* @param json the JSON string to convert.
|
||||||
|
*/
|
||||||
static NodeConfig FromJSON(std::string_view json);
|
static NodeConfig FromJSON(std::string_view json);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert this object into JSON respresentation.
|
||||||
|
* @return the JSON string representing the node config.
|
||||||
|
*/
|
||||||
std::string ToJSON() const;
|
std::string ToJSON() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert his object into script-layer record value.
|
||||||
|
* @return the script-layer record value representing the node config.
|
||||||
|
*/
|
||||||
IntrusivePtr<RecordVal> ToRecord() const;
|
IntrusivePtr<RecordVal> ToRecord() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The name of the supervised Zeek node. These are unique within
|
||||||
|
* a given supervised process tree and typically human-readable.
|
||||||
|
*/
|
||||||
std::string name;
|
std::string name;
|
||||||
|
/**
|
||||||
|
* The interface name from which the node should read/analyze packets.
|
||||||
|
*/
|
||||||
std::optional<std::string> interface;
|
std::optional<std::string> interface;
|
||||||
|
/**
|
||||||
|
* The working directory that should be used by the node.
|
||||||
|
*/
|
||||||
std::optional<std::string> directory;
|
std::optional<std::string> directory;
|
||||||
|
/**
|
||||||
|
* The filename/path to which the node's stdout will be redirected.
|
||||||
|
*/
|
||||||
std::optional<std::string> stdout_file;
|
std::optional<std::string> stdout_file;
|
||||||
|
/**
|
||||||
|
* The filename/path to which the node's stderr will be redirected.
|
||||||
|
*/
|
||||||
std::optional<std::string> stderr_file;
|
std::optional<std::string> stderr_file;
|
||||||
|
/**
|
||||||
|
* A cpu/core number to which the node will try to pin itself.
|
||||||
|
*/
|
||||||
std::optional<int> cpu_affinity;
|
std::optional<int> cpu_affinity;
|
||||||
|
/**
|
||||||
|
* Additional script filename/paths that the node should load.
|
||||||
|
*/
|
||||||
std::vector<std::string> scripts;
|
std::vector<std::string> scripts;
|
||||||
|
/**
|
||||||
|
* The Cluster Layout definition. Each node in the Cluster Framework
|
||||||
|
* knows about the full, static cluster topology to which it belongs.
|
||||||
|
* Entries in the map use node names for keys.
|
||||||
|
*/
|
||||||
std::map<std::string, ClusterEndpoint> cluster;
|
std::map<std::string, ClusterEndpoint> cluster;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* State which defines a Supervised node's understanding of itself.
|
||||||
|
*/
|
||||||
struct SupervisedNode {
|
struct SupervisedNode {
|
||||||
|
/**
|
||||||
|
* Initialize the Supervised node within the Zeek Cluster Framework.
|
||||||
|
* This function populates the "Cluster::nodes" script-layer variable
|
||||||
|
* that otherwise is expected to be populated by a
|
||||||
|
* "cluster-layout.zeek" script in other context (e.g. ZeekCtl
|
||||||
|
* generates that cluster layout).
|
||||||
|
*/
|
||||||
static bool InitCluster();
|
static bool InitCluster();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The node's configuration options.
|
||||||
|
*/
|
||||||
NodeConfig config;
|
NodeConfig config;
|
||||||
|
/**
|
||||||
|
* The process ID of the supervised node's parent process (i.e. the PID
|
||||||
|
* of the Stem process).
|
||||||
|
*/
|
||||||
pid_t parent_pid;
|
pid_t parent_pid;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The state of a supervised node from the Supervisor's perspective.
|
||||||
|
*/
|
||||||
struct Node {
|
struct Node {
|
||||||
|
/**
|
||||||
|
* Convert the node into script-layer Supervisor::NodeStatus record
|
||||||
|
* representation.
|
||||||
|
*/
|
||||||
IntrusivePtr<RecordVal> ToRecord() const;
|
IntrusivePtr<RecordVal> ToRecord() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the name of the node.
|
||||||
|
*/
|
||||||
const std::string& Name() const
|
const std::string& Name() const
|
||||||
{ return config.name; }
|
{ return config.name; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new node state from a given configuration.
|
||||||
|
* @param arg_config the configuration to use for the node.
|
||||||
|
*/
|
||||||
Node(NodeConfig arg_config) : config(std::move(arg_config))
|
Node(NodeConfig arg_config) : config(std::move(arg_config))
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The desired configuration for the node.
|
||||||
|
*/
|
||||||
NodeConfig config;
|
NodeConfig config;
|
||||||
|
/**
|
||||||
|
* Process ID of the node (positive/non-zero are valid/live PIDs).
|
||||||
|
*/
|
||||||
pid_t pid = 0;
|
pid_t pid = 0;
|
||||||
|
/**
|
||||||
|
* Whether the node is voluntarily marked for termination by the
|
||||||
|
* Supervisor.
|
||||||
|
*/
|
||||||
bool killed = false;
|
bool killed = false;
|
||||||
|
/**
|
||||||
|
* The last exit status of the node.
|
||||||
|
*/
|
||||||
int exit_status = 0;
|
int exit_status = 0;
|
||||||
|
/**
|
||||||
|
* The last signal which terminated the node.
|
||||||
|
*/
|
||||||
int signal_number = 0;
|
int signal_number = 0;
|
||||||
|
/**
|
||||||
|
* Number of process revival attempts made after the node first died
|
||||||
|
* prematurely.
|
||||||
|
*/
|
||||||
int revival_attempts = 0;
|
int revival_attempts = 0;
|
||||||
|
/**
|
||||||
|
* How many seconds to wait until the next revival attempt for the node.
|
||||||
|
*/
|
||||||
int revival_delay = 1;
|
int revival_delay = 1;
|
||||||
|
/**
|
||||||
|
* The time at which the node's process was last spawned.
|
||||||
|
*/
|
||||||
std::chrono::time_point<std::chrono::steady_clock> spawn_time;
|
std::chrono::time_point<std::chrono::steady_clock> spawn_time;
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::optional<SupervisedNode> RunStem(std::unique_ptr<bro::PipePair> pipe,
|
/**
|
||||||
pid_t parent_pid);
|
* Run the Stem process. The Stem process will receive instructions from
|
||||||
|
* the Supervisor to manipulate the process hierarchy and it's in charge
|
||||||
|
* of directly monitoring for whether any nodes die premature and need
|
||||||
|
* to be revived.
|
||||||
|
* @param pipe bidirectional pipes that allow the Supervisor and Stem
|
||||||
|
* process to communicate.
|
||||||
|
* @param pid the Stem's parent process ID (i.e. the PID of the Supervisor)
|
||||||
|
* @return state which describes what a supervised node should know about
|
||||||
|
* itself. I.e. this function only returns from a fork()'d child process.
|
||||||
|
*/
|
||||||
|
static SupervisedNode RunStem(std::unique_ptr<bro::PipePair> pipe,
|
||||||
|
pid_t parent_pid);
|
||||||
|
|
||||||
using NodeMap = std::map<std::string, Node, std::less<>>;
|
using NodeMap = std::map<std::string, Node, std::less<>>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new Supervisor object.
|
||||||
|
* @param stem_pipe bidirectional pipe that allow the Supervisor and Stem
|
||||||
|
* process to communicate.
|
||||||
|
* @param stem_pid the Stem's process ID.
|
||||||
|
*/
|
||||||
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
|
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destruction also cleanly shuts down the entire supervised process tree.
|
||||||
|
*/
|
||||||
~Supervisor();
|
~Supervisor();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the process ID of the Stem.
|
||||||
|
*/
|
||||||
pid_t StemPID() const
|
pid_t StemPID() const
|
||||||
{ return stem_pid; }
|
{ return stem_pid; }
|
||||||
|
|
||||||
void ObserveChildSignal(int signo);
|
/**
|
||||||
|
* @return the state of currently supervised processes. The map uses
|
||||||
RecordVal* Status(std::string_view node_name);
|
* node names for keys.
|
||||||
std::string Create(const RecordVal* node);
|
*/
|
||||||
std::string Create(const Supervisor::NodeConfig& node);
|
|
||||||
bool Destroy(std::string_view node_name);
|
|
||||||
bool Restart(std::string_view node_name);
|
|
||||||
|
|
||||||
const NodeMap& Nodes()
|
const NodeMap& Nodes()
|
||||||
{ return nodes; }
|
{ return nodes; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve current status of a supervised node.
|
||||||
|
* @param node_name the name of the node for which to retrieve status
|
||||||
|
* or an empty string to mean "all nodes".
|
||||||
|
* @return script-layer Supervisor::Status record value describing the
|
||||||
|
* status of a node or set of nodes.
|
||||||
|
*/
|
||||||
|
RecordVal* Status(std::string_view node_name);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new supervised node.
|
||||||
|
* @param node the script-layer Supervisor::NodeConfig value that
|
||||||
|
* describes the desired node configuration
|
||||||
|
* @return an empty string on success or description of the error/failure
|
||||||
|
*/
|
||||||
|
std::string Create(const RecordVal* node);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new supervised node.
|
||||||
|
* @param node the desired node configuration
|
||||||
|
* @return an empty string on success or description of the error/failure
|
||||||
|
*/
|
||||||
|
std::string Create(const Supervisor::NodeConfig& node);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroys and removes a supervised node.
|
||||||
|
* @param node_name the name of the node to destroy or an empty string
|
||||||
|
* to mean "all nodes"
|
||||||
|
* @return true on success
|
||||||
|
*/
|
||||||
|
bool Destroy(std::string_view node_name);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart a supervised node process (by destroying and re-recreating).
|
||||||
|
* @param node_name the name of the node to restart or an empty string
|
||||||
|
* to mean "all nodes"
|
||||||
|
* @return true on success
|
||||||
|
*/
|
||||||
|
bool Restart(std::string_view node_name);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not meant for public use. For use in a signal handler to tell the
|
||||||
|
* Supervisor a child process (i.e. the Stem) potentially died.
|
||||||
|
*/
|
||||||
|
void ObserveChildSignal(int signo);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// IOSource interface overrides:
|
// IOSource interface overrides:
|
||||||
|
@ -140,6 +338,28 @@ private:
|
||||||
std::string msg_buffer;
|
std::string msg_buffer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A timer used by supervised processes to periodically check whether their
|
||||||
|
* parent (supervisor) process has died. If it has died, the supervised
|
||||||
|
* process self-terminates.
|
||||||
|
*/
|
||||||
|
class ParentProcessCheckTimer : public Timer {
|
||||||
|
public:
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a timer to check for parent process death.
|
||||||
|
* @param t the time at which to trigger the timer's check.
|
||||||
|
* @param interval number of seconds to wait before checking again.
|
||||||
|
*/
|
||||||
|
ParentProcessCheckTimer(double t, double interval);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
|
||||||
|
void Dispatch(double t, int is_expire) override;
|
||||||
|
|
||||||
|
double interval;
|
||||||
|
};
|
||||||
|
|
||||||
extern Supervisor* supervisor;
|
extern Supervisor* supervisor;
|
||||||
extern std::optional<Supervisor::SupervisedNode> supervised_node;
|
extern std::optional<Supervisor::SupervisedNode> supervised_node;
|
||||||
|
|
||||||
|
|
|
@ -79,6 +79,21 @@ function Supervisor::__is_supervised%(%): bool
|
||||||
return val_mgr->GetBool(zeek::supervised_node.has_value());
|
return val_mgr->GetBool(zeek::supervised_node.has_value());
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
function Supervisor::__node%(%): Supervisor::NodeConfig
|
||||||
|
%{
|
||||||
|
if ( ! zeek::supervised_node )
|
||||||
|
{
|
||||||
|
builtin_error("not a supervised process");
|
||||||
|
auto rt = BifType::Record::Supervisor::NodeConfig;
|
||||||
|
auto rval = make_intrusive<RecordVal>(rt);
|
||||||
|
rval->Assign(rt->FieldOffset("name"), new StringVal("<invalid>"));
|
||||||
|
return rval.detach();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto rval = zeek::supervised_node->config.ToRecord();
|
||||||
|
return rval.detach();
|
||||||
|
%}
|
||||||
|
|
||||||
function Supervisor::__is_supervisor%(%): bool
|
function Supervisor::__is_supervisor%(%): bool
|
||||||
%{
|
%{
|
||||||
return val_mgr->GetBool(zeek::supervisor != nullptr);
|
return val_mgr->GetBool(zeek::supervisor != nullptr);
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
supervised node zeek_init(), logger-1, Cluster::LOGGER
|
supervised node zeek_init(), logger-1, Cluster::LOGGER
|
||||||
supervised node zeek_done(), logger-1
|
supervised node zeek_done(), logger-1, logger-1
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
supervised node zeek_init(), manager, Cluster::MANAGER
|
supervised node zeek_init(), manager, Cluster::MANAGER
|
||||||
supervised node zeek_done(), manager
|
supervised node zeek_done(), manager, manager
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
supervised node zeek_init(), proxy-1, Cluster::PROXY
|
supervised node zeek_init(), proxy-1, Cluster::PROXY
|
||||||
supervised node zeek_done(), proxy-1
|
supervised node zeek_done(), proxy-1, proxy-1
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
supervised node zeek_init(), worker-1, Cluster::WORKER
|
supervised node zeek_init(), worker-1, Cluster::WORKER
|
||||||
supervised node zeek_done(), worker-1
|
supervised node zeek_done(), worker-1, worker-1
|
||||||
|
|
|
@ -83,7 +83,7 @@ event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
event zeek_done()
|
event zeek_done()
|
||||||
{
|
{
|
||||||
if ( Supervisor::is_supervised() )
|
if ( Supervisor::is_supervised() )
|
||||||
print "supervised node zeek_done()", Cluster::node;
|
print "supervised node zeek_done()", Cluster::node, Supervisor::node()$name;
|
||||||
else
|
else
|
||||||
print supervisor_output_file, "supervisor zeek_done()";
|
print supervisor_output_file, "supervisor zeek_done()";
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue