Cleanup minor Supervisor TODOs

e.g. Mainly making default parameter for restart/destroy/status API
calls to operate on all nodes.
This commit is contained in:
Jon Siwek 2020-01-14 13:41:46 -08:00
parent 80b3aef486
commit 4d712d6203
5 changed files with 178 additions and 137 deletions

View file

@ -39,25 +39,25 @@ export {
nodes: table[string] of NodeStatus; nodes: table[string] of NodeStatus;
}; };
global status: function(nodes: string &default="all"): Status;
global create: function(node: NodeConfig): string; global create: function(node: NodeConfig): string;
global destroy: function(nodes: string): bool; global status: function(node: string &default=""): Status;
global restart: function(nodes: string &default="all"): bool; global restart: function(node: string &default=""): bool;
global destroy: function(node: string &default=""): bool;
global is_supervisor: function(): bool; global is_supervisor: function(): bool;
global is_supervised: function(): bool; global is_supervised: function(): bool;
global Supervisor::stop_request: event();
global Supervisor::status_request: event(reqid: string, nodes: string);
global Supervisor::status_response: event(reqid: string, result: Status);
global Supervisor::create_request: event(reqid: string, node: NodeConfig); global Supervisor::create_request: event(reqid: string, node: NodeConfig);
global Supervisor::create_response: event(reqid: string, result: string); global Supervisor::create_response: event(reqid: string, result: string);
global Supervisor::destroy_request: event(reqid: string, nodes: string); global Supervisor::status_request: event(reqid: string, node: string);
global Supervisor::status_response: event(reqid: string, result: Status);
global Supervisor::restart_request: event(reqid: string, node: string);
global Supervisor::restart_response: event(reqid: string, result: bool);
global Supervisor::destroy_request: event(reqid: string, node: string);
global Supervisor::destroy_response: event(reqid: string, result: bool); global Supervisor::destroy_response: event(reqid: string, result: bool);
global Supervisor::restart_request: event(reqid: string, nodes: string); global Supervisor::stop_request: event();
global Supervisor::restart_response: event(reqid: string, result: bool);
} }

View file

@ -21,9 +21,9 @@ event Supervisor::stop_request()
terminate(); terminate();
} }
event Supervisor::status_request(reqid: string, nodes: string) event Supervisor::status_request(reqid: string, node: string)
{ {
local res = Supervisor::status(nodes); local res = Supervisor::status(node);
local topic = Supervisor::topic_prefix + fmt("/status_response/%s", reqid); local topic = Supervisor::topic_prefix + fmt("/status_response/%s", reqid);
Broker::publish(topic, Supervisor::status_response, reqid, res); Broker::publish(topic, Supervisor::status_response, reqid, res);
} }
@ -35,23 +35,23 @@ event Supervisor::create_request(reqid: string, node: NodeConfig)
Broker::publish(topic, Supervisor::create_response, reqid, res); Broker::publish(topic, Supervisor::create_response, reqid, res);
} }
event Supervisor::destroy_request(reqid: string, nodes: string) event Supervisor::destroy_request(reqid: string, node: string)
{ {
local res = Supervisor::destroy(nodes); local res = Supervisor::destroy(node);
local topic = Supervisor::topic_prefix + fmt("/destroy_response/%s", reqid); local topic = Supervisor::topic_prefix + fmt("/destroy_response/%s", reqid);
Broker::publish(topic, Supervisor::destroy_response, reqid, res); Broker::publish(topic, Supervisor::destroy_response, reqid, res);
} }
event Supervisor::restart_request(reqid: string, nodes: string) event Supervisor::restart_request(reqid: string, node: string)
{ {
local res = Supervisor::restart(nodes); local res = Supervisor::restart(node);
local topic = Supervisor::topic_prefix + fmt("/restart_response/%s", reqid); local topic = Supervisor::topic_prefix + fmt("/restart_response/%s", reqid);
Broker::publish(topic, Supervisor::restart_response, reqid, res); Broker::publish(topic, Supervisor::restart_response, reqid, res);
} }
function Supervisor::status(nodes: string): Status function Supervisor::status(node: string): Status
{ {
return Supervisor::__status(nodes); return Supervisor::__status(node);
} }
function Supervisor::create(node: NodeConfig): string function Supervisor::create(node: NodeConfig): string
@ -59,14 +59,14 @@ function Supervisor::create(node: NodeConfig): string
return Supervisor::__create(node); return Supervisor::__create(node);
} }
function Supervisor::destroy(nodes: string): bool function Supervisor::destroy(node: string): bool
{ {
return Supervisor::__destroy(nodes); return Supervisor::__destroy(node);
} }
function Supervisor::restart(nodes: string): bool function Supervisor::restart(node: string): bool
{ {
return Supervisor::__restart(nodes); return Supervisor::__restart(node);
} }
function is_supervisor(): bool function is_supervisor(): bool

View file

@ -148,7 +148,6 @@ Supervisor::~Supervisor()
DBG_LOG(DBG_SUPERVISOR, "shutdown, killing stem process %d", stem_pid); DBG_LOG(DBG_SUPERVISOR, "shutdown, killing stem process %d", stem_pid);
// TODO: is signal the best way to trigger shutdown of decendent processes?
auto kill_res = kill(stem_pid, SIGTERM); auto kill_res = kill(stem_pid, SIGTERM);
if ( kill_res == -1 ) if ( kill_res == -1 )
@ -232,7 +231,6 @@ void Supervisor::HandleChildSignal()
return; return;
// Revive the Stem process // Revive the Stem process
// TODO: Stem process needs a way to inform Supervisor not to revive
stem_pid = fork(); stem_pid = fork();
if ( stem_pid == -1 ) if ( stem_pid == -1 )
@ -280,10 +278,13 @@ void Supervisor::HandleChildSignal()
DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid); DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid);
// Parent supervisor process resends node configurations to recreate // Parent supervisor process resends node configurations to recreate
// the desired process hierarchy // the desired process hierarchy.
// TODO: probably a preferred order in which to create nodes // Note: there's probably a preferred order in which to create nodes.
// e.g. logger, manager, proxy, worker // E.g. logger, manager, proxy, worker. However, fully synchronizing
// a startup order like that is slow and complicated: essentially have
// to wait for each process to start up and reach the point just after
// it starts listening (and maybe that never happens for some error case).
for ( const auto& n : nodes ) for ( const auto& n : nodes )
{ {
const auto& node = n.second; const auto& node = n.second;
@ -355,14 +356,16 @@ Stem::Stem(std::unique_ptr<bro::PipePair> p)
setsignal(SIGCHLD, stem_sig_handler); setsignal(SIGCHLD, stem_sig_handler);
setsignal(SIGTERM, stem_sig_handler); setsignal(SIGTERM, stem_sig_handler);
// TODO: changing the process group here so that SIGINT to the // Note: changing the process group here so that SIGINT to the supervisor
// supervisor doesn't also get passed to the children. i.e. supervisor // doesn't also get passed to the children. I.e. the supervisor should be
// should be in charge of initiating orderly shutdown. But calling // in charge of initiating orderly shutdown of the process tree.
// just setpgid() like this is technically a race-condition -- need // Technically calling setpgid() like this is a race-condition (if we get a
// to do more work of blocking SIGINT before fork(), unblocking after, // SIGINT in between the fork() and setpgid() calls), but can treat that as
// then also calling setpgid() from parent. And just not doing that // mostly be harmless since the only affected node in the process tree at
// until more is known whether that's the right SIGINT behavior in // the point will be this Stem process and the Supervisor *should* do the
// the first place. // right thing if it also sees SIGINT with the Stem already having exited
// (since that same type of situation with the Stem dying prematurely can
// happen for any arbitrary reason, not just for SIGINT).
auto res = setpgid(0, 0); auto res = setpgid(0, 0);
if ( res == -1 ) if ( res == -1 )
@ -407,7 +410,6 @@ bool Stem::Wait(Supervisor::Node* node, int options) const
if ( WIFEXITED(status) ) if ( WIFEXITED(status) )
{ {
node->exit_status = WEXITSTATUS(status); node->exit_status = WEXITSTATUS(status);
// TODO: may be some cases where the node is intended to exit
DBG_STEM("node '%s' exited with status %d", DBG_STEM("node '%s' exited with status %d",
node->Name().data(), node->exit_status); node->Name().data(), node->exit_status);
} }
@ -709,7 +711,6 @@ std::optional<Supervisor::NodeConfig> Stem::Poll()
for ( auto& msg : msgs ) for ( auto& msg : msgs )
{ {
// TODO: improve message format ...
std::vector<std::string> msg_tokens; std::vector<std::string> msg_tokens;
tokenize_string(std::move(msg), " ", &msg_tokens, 2); tokenize_string(std::move(msg), " ", &msg_tokens, 2);
const auto& cmd = msg_tokens[0]; const auto& cmd = msg_tokens[0];
@ -1035,16 +1036,31 @@ void Supervisor::NodeConfig::InitCluster()
RecordVal* Supervisor::Status(std::string_view node_name) RecordVal* Supervisor::Status(std::string_view node_name)
{ {
// TODO: handle node classes
auto rval = new RecordVal(BifType::Record::Supervisor::Status); auto rval = new RecordVal(BifType::Record::Supervisor::Status);
auto tt = BifType::Record::Supervisor::Status->FieldType("nodes"); auto tt = BifType::Record::Supervisor::Status->FieldType("nodes");
auto node_table_val = new TableVal(tt->AsTableType()); auto node_table_val = new TableVal(tt->AsTableType());
rval->Assign(0, node_table_val); rval->Assign(0, node_table_val);
for ( const auto& n : nodes ) if ( node_name.empty() )
{ {
const auto& name = n.first; for ( const auto& n : nodes )
const auto& node = n.second; {
const auto& name = n.first;
const auto& node = n.second;
auto key = make_intrusive<StringVal>(name);
auto val = node.ToRecord();
node_table_val->Assign(key.get(), val.detach());
}
}
else
{
auto it = nodes.find(node_name);
if ( it == nodes.end() )
return rval;
const auto& name = it->first;
const auto& node = it->second;
auto key = make_intrusive<StringVal>(name); auto key = make_intrusive<StringVal>(name);
auto val = node.ToRecord(); auto val = node.ToRecord();
node_table_val->Assign(key.get(), val.detach()); node_table_val->Assign(key.get(), val.detach());
@ -1061,6 +1077,9 @@ std::string Supervisor::Create(const RecordVal* node_val)
std::string Supervisor::Create(const Supervisor::NodeConfig& node) std::string Supervisor::Create(const Supervisor::NodeConfig& node)
{ {
if ( node.name.empty() )
return "node names must not be an empty string";
if ( node.name.find(' ') != std::string::npos ) if ( node.name.find(' ') != std::string::npos )
return fmt("node names must not contain spaces: '%s'", return fmt("node names must not contain spaces: '%s'",
node.name.data()); node.name.data());
@ -1085,7 +1104,22 @@ std::string Supervisor::Create(const Supervisor::NodeConfig& node)
bool Supervisor::Destroy(std::string_view node_name) bool Supervisor::Destroy(std::string_view node_name)
{ {
// TODO: handle node classes auto send_destroy_msg = [this](std::string_view name)
{
std::stringstream ss;
ss << "destroy " << name;
std::string msg = ss.str();
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
};
if ( node_name.empty() )
{
for ( const auto& n : nodes )
send_destroy_msg(n.first);
nodes.clear();
return true;
}
auto it = nodes.find(node_name); auto it = nodes.find(node_name);
@ -1093,24 +1127,31 @@ bool Supervisor::Destroy(std::string_view node_name)
return false; return false;
nodes.erase(it); nodes.erase(it);
send_destroy_msg(node_name);
std::stringstream ss;
ss << "destroy " << node_name;
std::string msg = ss.str();
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
return true; return true;
} }
bool Supervisor::Restart(std::string_view node_name) bool Supervisor::Restart(std::string_view node_name)
{ {
// TODO: handle node classes auto send_restart_msg = [this](std::string_view name)
{
std::stringstream ss;
ss << "restart " << name;
std::string msg = ss.str();
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
};
if ( node_name.empty() )
{
for ( const auto& n : nodes )
send_restart_msg(n.first);
return true;
}
if ( nodes.find(node_name) == nodes.end() ) if ( nodes.find(node_name) == nodes.end() )
return false; return false;
std::stringstream ss; send_restart_msg(node_name);
ss << "restart " << node_name;
std::string msg = ss.str();
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
return true; return true;
} }

View file

@ -284,9 +284,8 @@ struct zeek_options {
/** /**
* Inherit certain options set in the original supervisor parent process * Inherit certain options set in the original supervisor parent process
* and discard the rest. * and discard the rest.
* @param node the supervised-node whose Zeek options are to be modified.
*/ */
void filter_supervised_node_options(const zeek::Supervisor::NodeConfig& node) void filter_supervised_node_options()
{ {
auto og = *this; auto og = *this;
*this = {}; *this = {};
@ -327,6 +326,82 @@ struct zeek_options {
} }
}; };
static void init_supervised_node(zeek_options* options)
{
const auto& node_name = zeek::supervised_node->name;
if ( zeek::supervised_node->directory )
{
if ( chdir(zeek::supervised_node->directory->data()) )
{
fprintf(stderr, "node '%s' failed to chdir to %s: %s\n",
node_name.data(),
zeek::supervised_node->directory->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->stderr_file )
{
auto fd = open(zeek::supervised_node->stderr_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n",
node_name.data(),
zeek::supervised_node->stderr_file->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->stdout_file )
{
auto fd = open(zeek::supervised_node->stdout_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n",
node_name.data(),
zeek::supervised_node->stdout_file->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->cpu_affinity )
{
auto res = zeek::set_affinity(*zeek::supervised_node->cpu_affinity);
if ( ! res )
fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n",
node_name.data(), strerror(errno));
}
options->filter_supervised_node_options();
if ( zeek::supervised_node->interface )
options->interfaces.emplace_back(*zeek::supervised_node->interface);
if ( ! zeek::supervised_node->cluster.empty() )
{
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{
fprintf(stderr, "node '%s' failed to setenv: %s\n",
node_name.data(), strerror(errno));
exit(1);
}
}
for ( const auto& s : zeek::supervised_node->scripts )
options->scripts_to_load.emplace_back(s);
}
static std::vector<const char*> to_cargs(const std::vector<std::string>& args) static std::vector<const char*> to_cargs(const std::vector<std::string>& args)
{ {
std::vector<const char*> rval; std::vector<const char*> rval;
@ -933,82 +1008,7 @@ int main(int argc, char** argv)
} }
if ( zeek::supervised_node ) if ( zeek::supervised_node )
{ init_supervised_node(&options);
// TODO: probably all of this block could move to a new
// zeek::supervised_node->Init(options) method
const auto& node_name = zeek::supervised_node->name;
if ( zeek::supervised_node->directory )
{
if ( chdir(zeek::supervised_node->directory->data()) )
{
fprintf(stderr, "node '%s' failed to chdir to %s: %s\n",
node_name.data(),
zeek::supervised_node->directory->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->stderr_file )
{
auto fd = open(zeek::supervised_node->stderr_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDERR_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stderr file %s: %s\n",
node_name.data(),
zeek::supervised_node->stderr_file->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->stdout_file )
{
auto fd = open(zeek::supervised_node->stdout_file->data(),
O_WRONLY | O_CREAT | O_TRUNC | O_APPEND | O_CLOEXEC,
0600);
if ( fd == -1 || dup2(fd, STDOUT_FILENO) == -1 )
{
fprintf(stderr, "node '%s' failed to create stdout file %s: %s\n",
node_name.data(),
zeek::supervised_node->stdout_file->data(),
strerror(errno));
exit(1);
}
}
if ( zeek::supervised_node->cpu_affinity )
{
auto res = zeek::set_affinity(*zeek::supervised_node->cpu_affinity);
if ( ! res )
fprintf(stderr, "node '%s' failed to set CPU affinity: %s\n",
node_name.data(), strerror(errno));
}
options.filter_supervised_node_options(*zeek::supervised_node);
if ( zeek::supervised_node->interface )
options.interfaces.emplace_back(*zeek::supervised_node->interface);
if ( ! zeek::supervised_node->cluster.empty() )
{
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 )
{
fprintf(stderr, "node '%s' failed to setenv: %s\n",
node_name.data(), strerror(errno));
exit(1);
}
}
for ( const auto& s : zeek::supervised_node->scripts )
options.scripts_to_load.emplace_back(s);
}
double time_start = current_time(true); double time_start = current_time(true);

View file

@ -19,7 +19,7 @@ type Supervisor::Status: record;
type Supervisor::NodeConfig: record; type Supervisor::NodeConfig: record;
type Supervisor::NodeStatus: record; type Supervisor::NodeStatus: record;
function Supervisor::__status%(nodes: string%): Supervisor::Status function Supervisor::__status%(node: string%): Supervisor::Status
%{ %{
if ( ! zeek::supervisor ) if ( ! zeek::supervisor )
{ {
@ -27,7 +27,7 @@ function Supervisor::__status%(nodes: string%): Supervisor::Status
return new RecordVal(BifType::Record::Supervisor::Status); return new RecordVal(BifType::Record::Supervisor::Status);
} }
return zeek::supervisor->Status(nodes->CheckString()); return zeek::supervisor->Status(node->CheckString());
%} %}
function Supervisor::__create%(node: Supervisor::NodeConfig%): string function Supervisor::__create%(node: Supervisor::NodeConfig%): string
@ -42,7 +42,7 @@ function Supervisor::__create%(node: Supervisor::NodeConfig%): string
return new StringVal(rval); return new StringVal(rval);
%} %}
function Supervisor::__destroy%(nodes: string%): bool function Supervisor::__destroy%(node: string%): bool
%{ %{
if ( ! zeek::supervisor ) if ( ! zeek::supervisor )
{ {
@ -50,11 +50,11 @@ function Supervisor::__destroy%(nodes: string%): bool
return val_mgr->GetBool(false); return val_mgr->GetBool(false);
} }
auto rval = zeek::supervisor->Destroy(nodes->CheckString()); auto rval = zeek::supervisor->Destroy(node->CheckString());
return val_mgr->GetBool(rval); return val_mgr->GetBool(rval);
%} %}
function Supervisor::__restart%(nodes: string%): bool function Supervisor::__restart%(node: string%): bool
%{ %{
if ( ! zeek::supervisor ) if ( ! zeek::supervisor )
{ {
@ -62,7 +62,7 @@ function Supervisor::__restart%(nodes: string%): bool
return val_mgr->GetBool(false); return val_mgr->GetBool(false);
} }
auto rval = zeek::supervisor->Restart(nodes->CheckString()); auto rval = zeek::supervisor->Restart(node->CheckString());
return val_mgr->GetBool(rval); return val_mgr->GetBool(rval);
%} %}