Separate supervisor node config and status data structures

This commit is contained in:
Jon Siwek 2020-01-13 20:09:05 -08:00
parent 520c6e3ebf
commit 0ff99c3df8
6 changed files with 93 additions and 64 deletions

View file

@ -19,7 +19,7 @@ export {
interface: string &optional;
};
type Node: record {
type NodeConfig: record {
name: string;
interface: string &optional;
directory: string &optional;
@ -28,18 +28,19 @@ export {
scripts: vector of string &default = vector();
cpu_affinity: int &optional;
cluster: table[string] of ClusterEndpoint &default=table();
};
# TODO: separate node config fields from status fields ?
# TODO: add more status fields ?
pid: count &optional;
type NodeStatus: record {
node: NodeConfig;
pid: count;
};
type Status: record {
nodes: table[string] of Node;
nodes: table[string] of NodeStatus;
};
global status: function(nodes: string &default="all"): Status;
global create: function(node: Node): string;
global create: function(node: NodeConfig): string;
global destroy: function(nodes: string): bool;
global restart: function(nodes: string &default="all"): bool;
@ -51,7 +52,7 @@ export {
global Supervisor::status_request: event(reqid: string, nodes: string);
global Supervisor::status_response: event(reqid: string, result: Status);
global Supervisor::create_request: event(reqid: string, node: Node);
global Supervisor::create_request: event(reqid: string, node: NodeConfig);
global Supervisor::create_response: event(reqid: string, result: string);
global Supervisor::destroy_request: event(reqid: string, nodes: string);

View file

@ -28,7 +28,7 @@ event Supervisor::status_request(reqid: string, nodes: string)
Broker::publish(topic, Supervisor::status_response, reqid, res);
}
event Supervisor::create_request(reqid: string, node: Node)
event Supervisor::create_request(reqid: string, node: NodeConfig)
{
local res = Supervisor::create(node);
local topic = Supervisor::topic_prefix + fmt("/create_response/%s", reqid);
@ -54,7 +54,7 @@ function Supervisor::status(nodes: string): Status
return Supervisor::__status(nodes);
}
function Supervisor::create(node: Node): string
function Supervisor::create(node: NodeConfig): string
{
return Supervisor::__create(node);
}

View file

@ -30,11 +30,11 @@ struct Stem {
~Stem();
Supervisor::Node* Run();
std::optional<Supervisor::NodeConfig> Run();
Supervisor::Node* Poll();
std::optional<Supervisor::NodeConfig> Poll();
Supervisor::Node* Revive();
std::optional<Supervisor::NodeConfig> Revive();
void Reap();
@ -108,7 +108,7 @@ static std::vector<std::string> extract_messages(std::string* buffer)
return rval;
}
static std::string make_create_message(const Supervisor::Node& node)
static std::string make_create_message(const Supervisor::NodeConfig& node)
{
auto json_str = node.ToJSON();
return fmt("create %s %s", node.name.data(), json_str.data());
@ -274,7 +274,7 @@ void Supervisor::HandleChildSignal()
for ( const auto& n : nodes )
{
const auto& node = n.second;
auto msg = make_create_message(node);
auto msg = make_create_message(node.config);
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
}
}
@ -378,7 +378,7 @@ bool Stem::Wait(Supervisor::Node* node, int options) const
if ( res == -1 )
{
fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n",
node->name.data(), node->pid, strerror(errno));
node->Name().data(), node->pid, strerror(errno));
return false;
}
@ -387,17 +387,17 @@ bool Stem::Wait(Supervisor::Node* node, int options) const
node->exit_status = WEXITSTATUS(status);
// TODO: may be some cases where the node is intended to exit
printf("node '%s' exited with status %d\n",
node->name.data(), node->exit_status);
node->Name().data(), node->exit_status);
}
else if ( WIFSIGNALED(status) )
{
node->signal_number = WTERMSIG(status);
printf("node '%s' terminated by signal %d\n",
node->name.data(), node->signal_number);
node->Name().data(), node->signal_number);
}
else
fprintf(stderr, "Stem failed to get node exit status %s (%d)\n",
node->name.data(), node->pid);
node->Name().data(), node->pid);
node->pid = 0;
return true;
@ -409,7 +409,7 @@ void Stem::KillNode(const Supervisor::Node& node, int signal) const
if ( kill_res == -1 )
fprintf(stderr, "Failed to send signal to node %s: %s",
node.name.data(), strerror(errno));
node.Name().data(), strerror(errno));
}
void Stem::Destroy(Supervisor::Node* node) const
@ -428,12 +428,12 @@ void Stem::Destroy(Supervisor::Node* node) const
break;
printf("Stem waiting to destroy node: %s (%d)\n",
node->name.data(), node->pid);
node->Name().data(), node->pid);
sleep(kill_delay);
}
}
Supervisor::Node* Stem::Revive()
std::optional<Supervisor::NodeConfig> Stem::Revive()
{
constexpr auto attempts_before_delay_increase = 3;
constexpr auto delay_increase_factor = 2;
@ -468,7 +468,7 @@ Supervisor::Node* Stem::Revive()
node.revival_delay *= delay_increase_factor;
if ( Spawn(&node) )
return new Supervisor::Node(node);
return node.config;
ReportStatus(node);
}
@ -483,19 +483,19 @@ bool Stem::Spawn(Supervisor::Node* node)
if ( node_pid == -1 )
{
fprintf(stderr, "failed to fork Zeek node '%s': %s\n",
node->name.data(), strerror(errno));
node->Name().data(), strerror(errno));
return false;
}
if ( node_pid == 0 )
{
zeek::set_thread_name(fmt("zeek.%s", node->name.data()));
zeek::set_thread_name(fmt("zeek.%s", node->Name().data()));
return true;
}
node->pid = node_pid;
node->spawn_time = std::chrono::steady_clock::now();
printf("Stem spawned node: %s (%d)\n", node->name.data(), node->pid);
printf("Stem spawned node: %s (%d)\n", node->Name().data(), node->pid);
return false;
}
@ -563,11 +563,11 @@ void Stem::Shutdown(int exit_code)
void Stem::ReportStatus(const Supervisor::Node& node) const
{
std::string msg = fmt("status %s %d", node.name.data(), node.pid);
std::string msg = fmt("status %s %d", node.Name().data(), node.pid);
safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
}
Supervisor::Node* Stem::Run()
std::optional<Supervisor::NodeConfig> Stem::Run()
{
for ( ; ; )
{
@ -580,7 +580,7 @@ Supervisor::Node* Stem::Run()
return {};
}
Supervisor::Node* Stem::Poll()
std::optional<Supervisor::NodeConfig> Stem::Poll()
{
pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 },
{ signal_flare->FD(), POLLIN, 0} };
@ -659,15 +659,16 @@ Supervisor::Node* Stem::Poll()
{
const auto& node_json = msg_tokens[2];
assert(nodes.find(node_name) == nodes.end());
auto node = Supervisor::Node::FromJSON(node_json);
auto node_config = Supervisor::NodeConfig::FromJSON(node_json);
auto it = nodes.emplace(node_name, std::move(node_config)).first;
auto& node = it->second;
if ( Spawn(&node) )
return new Supervisor::Node(node);
return node.config;
// TODO: get stem printfs going through standard Zeek debug.log
printf("Stem created node: %s (%d)\n", node.name.data(), node.pid);
auto it = nodes.emplace(node_name, std::move(node)).first;
ReportStatus(it->second);
printf("Stem created node: %s (%d)\n", node.Name().data(), node.pid);
ReportStatus(node);
}
else if ( cmd == "destroy" )
{
@ -687,7 +688,7 @@ Supervisor::Node* Stem::Poll()
Destroy(&node);
if ( Spawn(&node) )
return new Supervisor::Node(node);
return node.config;
ReportStatus(node);
}
@ -698,7 +699,7 @@ Supervisor::Node* Stem::Poll()
return {};
}
Supervisor::Node* Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe)
std::optional<Supervisor::NodeConfig> Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe)
{
Stem s(std::move(pipe));
return s.Run();
@ -718,9 +719,9 @@ static BifEnum::Supervisor::ClusterRole role_str_to_enum(const std::string& r)
return BifEnum::Supervisor::NONE;
}
Supervisor::Node Supervisor::Node::FromRecord(const RecordVal* node)
Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node)
{
Supervisor::Node rval;
Supervisor::NodeConfig rval;
rval.name = node->Lookup("name")->AsString()->CheckString();
auto iface_val = node->Lookup("interface");
@ -784,9 +785,9 @@ Supervisor::Node Supervisor::Node::FromRecord(const RecordVal* node)
return rval;
}
Supervisor::Node Supervisor::Node::FromJSON(std::string_view json)
Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json)
{
Supervisor::Node rval;
Supervisor::NodeConfig rval;
auto j = nlohmann::json::parse(json);
rval.name = j["name"];
@ -836,7 +837,7 @@ Supervisor::Node Supervisor::Node::FromJSON(std::string_view json)
return rval;
}
std::string Supervisor::Node::ToJSON() const
std::string Supervisor::NodeConfig::ToJSON() const
{
auto re = std::make_unique<RE_Matcher>("^_");
auto node_val = ToRecord();
@ -845,9 +846,9 @@ std::string Supervisor::Node::ToJSON() const
return rval;
}
IntrusivePtr<RecordVal> Supervisor::Node::ToRecord() const
IntrusivePtr<RecordVal> Supervisor::NodeConfig::ToRecord() const
{
auto rt = BifType::Record::Supervisor::Node;
auto rt = BifType::Record::Supervisor::NodeConfig;
auto rval = make_intrusive<RecordVal>(rt);
rval->Assign(rt->FieldOffset("name"), new StringVal(name));
@ -866,14 +867,14 @@ IntrusivePtr<RecordVal> Supervisor::Node::ToRecord() const
if ( cpu_affinity )
rval->Assign(rt->FieldOffset("cpu_affinity"), val_mgr->GetInt(*cpu_affinity));
auto st = BifType::Record::Supervisor::Node->FieldType("scripts");
auto st = BifType::Record::Supervisor::NodeConfig->FieldType("scripts");
auto scripts_val = new VectorVal(st->AsVectorType());
rval->Assign(rt->FieldOffset("scripts"), scripts_val);
for ( const auto& s : scripts )
scripts_val->Assign(scripts_val->Size(), new StringVal(s));
auto tt = BifType::Record::Supervisor::Node->FieldType("cluster");
auto tt = BifType::Record::Supervisor::NodeConfig->FieldType("cluster");
auto cluster_val = new TableVal(tt->AsTableType());
rval->Assign(rt->FieldOffset("cluster"), cluster_val);
@ -895,12 +896,23 @@ IntrusivePtr<RecordVal> Supervisor::Node::ToRecord() const
cluster_val->Assign(key.get(), val.detach());
}
return rval;
}
IntrusivePtr<RecordVal> Supervisor::Node::ToRecord() const
{
auto rt = BifType::Record::Supervisor::NodeStatus;
auto rval = make_intrusive<RecordVal>(rt);
rval->Assign(rt->FieldOffset("node"), config.ToRecord().detach());
if ( pid )
rval->Assign(rt->FieldOffset("pid"), val_mgr->GetCount(pid));
return rval;
}
static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRole role)
{
static auto node_type = global_scope()->Lookup("Cluster::NodeType")->AsType()->AsEnumType();
@ -919,7 +931,7 @@ static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRol
}
}
void Supervisor::Node::InitCluster()
void Supervisor::NodeConfig::InitCluster()
{
auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType();
auto cluster_nodes_id = global_scope()->Lookup("Cluster::nodes");
@ -972,8 +984,9 @@ RecordVal* Supervisor::Status(std::string_view node_name)
for ( const auto& n : nodes )
{
const auto& name = n.first;
const auto& node = n.second;
auto key = make_intrusive<StringVal>(node.name);
auto key = make_intrusive<StringVal>(name);
auto val = node.ToRecord();
node_table_val->Assign(key.get(), val.detach());
}
@ -983,11 +996,11 @@ RecordVal* Supervisor::Status(std::string_view node_name)
std::string Supervisor::Create(const RecordVal* node_val)
{
auto node = Supervisor::Node::FromRecord(node_val);
auto node = Supervisor::NodeConfig::FromRecord(node_val);
return Create(node);
}
std::string Supervisor::Create(const Supervisor::Node& node)
std::string Supervisor::Create(const Supervisor::NodeConfig& node)
{
if ( node.name.find(' ') != std::string::npos )
return fmt("node names must not contain spaces: '%s'",

View file

@ -35,11 +35,10 @@ public:
std::optional<std::string> interface;
};
struct Node {
static Node FromRecord(const RecordVal* node_val);
static Node FromJSON(std::string_view json);
struct NodeConfig {
static void InitCluster();
static NodeConfig FromRecord(const RecordVal* node_val);
static NodeConfig FromJSON(std::string_view json);
std::string ToJSON() const;
IntrusivePtr<RecordVal> ToRecord() const;
@ -52,7 +51,18 @@ public:
std::optional<int> cpu_affinity;
std::vector<std::string> scripts;
std::map<std::string, ClusterEndpoint> cluster;
};
struct Node {
IntrusivePtr<RecordVal> ToRecord() const;
const std::string& Name() const
{ return config.name; }
Node(NodeConfig arg_config) : config(std::move(arg_config))
{ }
NodeConfig config;
pid_t pid = 0;
int exit_status = 0;
int signal_number = 0;
@ -61,7 +71,9 @@ public:
std::chrono::time_point<std::chrono::steady_clock> spawn_time;
};
static Node* RunStem(std::unique_ptr<bro::PipePair> pipe);
static std::optional<NodeConfig> RunStem(std::unique_ptr<bro::PipePair> pipe);
using NodeMap = std::map<std::string, Node, std::less<>>;
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
@ -74,10 +86,13 @@ public:
RecordVal* Status(std::string_view node_name);
std::string Create(const RecordVal* node);
std::string Create(const Supervisor::Node& node);
std::string Create(const Supervisor::NodeConfig& node);
bool Destroy(std::string_view node_name);
bool Restart(std::string_view node_name);
const NodeMap& Nodes()
{ return nodes; }
private:
// IOSource interface overrides:
@ -99,11 +114,11 @@ private:
pid_t stem_pid;
std::unique_ptr<bro::PipePair> stem_pipe;
bro::Flare signal_flare;
std::map<std::string, Node, std::less<>> nodes;
NodeMap nodes;
std::string msg_buffer;
};
extern Supervisor* supervisor;
extern Supervisor::Node* supervised_node;
extern std::optional<Supervisor::NodeConfig> supervised_node;
} // namespace zeek

View file

@ -100,7 +100,7 @@ zeekygen::Manager* zeekygen_mgr = 0;
iosource::Manager* iosource_mgr = 0;
bro_broker::Manager* broker_mgr = 0;
zeek::Supervisor* zeek::supervisor = 0;
zeek::Supervisor::Node* zeek::supervised_node = 0;
std::optional<zeek::Supervisor::NodeConfig> zeek::supervised_node;
std::vector<std::string> zeek_script_prefixes;
Stmt* stmts;
@ -286,7 +286,7 @@ struct zeek_options {
* and discard the rest.
* @param node the supervised-node whose Zeek options are to be modified.
*/
void filter_supervised_node_options(zeek::Supervisor::Node* node)
void filter_supervised_node_options(const zeek::Supervisor::NodeConfig& node)
{
auto og = *this;
*this = {};
@ -752,7 +752,6 @@ void terminate_bro()
delete file_mgr;
// broker_mgr is deleted via iosource_mgr
// supervisor is deleted via iosource_mgr
delete zeek::supervised_node;
delete iosource_mgr;
delete log_mgr;
delete reporter;
@ -992,7 +991,7 @@ int main(int argc, char** argv)
node_name.data(), strerror(errno));
}
options.filter_supervised_node_options(zeek::supervised_node);
options.filter_supervised_node_options(*zeek::supervised_node);
if ( zeek::supervised_node->interface )
options.interfaces.emplace_back(*zeek::supervised_node->interface);

View file

@ -16,7 +16,8 @@ enum ClusterRole %{
type Supervisor::ClusterEndpoint: record;
type Supervisor::Status: record;
type Supervisor::Node: record;
type Supervisor::NodeConfig: record;
type Supervisor::NodeStatus: record;
function Supervisor::__status%(nodes: string%): Supervisor::Status
%{
@ -29,7 +30,7 @@ function Supervisor::__status%(nodes: string%): Supervisor::Status
return zeek::supervisor->Status(nodes->CheckString());
%}
function Supervisor::__create%(node: Supervisor::Node%): string
function Supervisor::__create%(node: Supervisor::NodeConfig%): string
%{
if ( ! zeek::supervisor )
{
@ -78,7 +79,7 @@ function Supervisor::__init_cluster%(%): bool
function Supervisor::__is_supervised%(%): bool
%{
return val_mgr->GetBool(zeek::supervised_node != nullptr);
return val_mgr->GetBool(zeek::supervised_node.has_value());
%}
function Supervisor::__is_supervisor%(%): bool