Finish implementing supervisor infrastructure

The process hierarchy and all supervisor control commands are now
working (e.g. status, create, destroy, restart), but nodes are
not currently spawned with the desired configuration parameters so
they don't yet operate as real cluster nodes (e.g. worker, logger,
manager, proxy).
This commit is contained in:
Jon Siwek 2019-10-18 17:57:20 -07:00
parent 2bc533f762
commit 773b39e52e
3 changed files with 287 additions and 198 deletions

View file

@ -7,11 +7,11 @@ export {
type Node: record { type Node: record {
# TODO: add proper config fields # TODO: add proper config fields
name: string; name: string;
pid: count &optional;
}; };
type Status: record { type Status: record {
# TODO: add proper status fields # TODO: add more status fields ?
n: count;
nodes: table[string] of Node; nodes: table[string] of Node;
}; };

View file

@ -18,35 +18,45 @@ extern "C" {
#include "setsignal.h" #include "setsignal.h"
} }
struct StemState { namespace {
StemState(std::unique_ptr<bro::PipePair> p); struct Stem {
Stem(std::unique_ptr<bro::PipePair> p);
~StemState(); ~Stem();
std::string Run(); std::string Run();
std::string Poll();
void Reap(); void Reap();
std::string Revive(); std::string Revive();
bool Spawn(zeek::Supervisor::Node* node); bool Spawn(zeek::Supervisor::Node* node);
std::vector<std::string> ExtractMessages(std::string* buffer) const;
int AliveNodeCount() const; int AliveNodeCount() const;
void KillNodes(int signal) const; void KillNodes(int signal) const;
void KillNode(const zeek::Supervisor::Node& node, int signal) const;
void Destroy(zeek::Supervisor::Node* node) const;
bool Wait(zeek::Supervisor::Node* node, int options) const;
void Shutdown(int exit_code); void Shutdown(int exit_code);
void ReportStatus(const zeek::Supervisor::Node& node) const;
std::unique_ptr<bro::Flare> signal_flare; std::unique_ptr<bro::Flare> signal_flare;
std::unique_ptr<bro::PipePair> pipe; std::unique_ptr<bro::PipePair> pipe;
std::map<std::string, zeek::Supervisor::Node> nodes; std::map<std::string, zeek::Supervisor::Node> nodes;
std::string msg_buffer; std::string msg_buffer;
bool shutting_down = false; bool shutting_down = false;
}; };
}
static StemState* stem_state = nullptr; static Stem* stem_state = nullptr;
static RETSIGTYPE stem_sigchld_handler(int signo) static RETSIGTYPE stem_sigchld_handler(int signo)
{ {
@ -73,6 +83,26 @@ static RETSIGTYPE supervisor_sig_handler(int signo)
return RETSIGVAL; return RETSIGVAL;
} }
static std::vector<std::string> extract_messages(std::string* buffer)
{
std::vector<std::string> rval;
for ( ; ; )
{
auto msg_end = buffer->find('\0');
if ( msg_end == std::string::npos )
// Don't have any full messages left
break;
auto msg = buffer->substr(0, msg_end);
rval.emplace_back(std::move(msg));
buffer->erase(0, msg_end + 1);
}
return rval;
}
zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg, zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg,
std::unique_ptr<bro::PipePair> pipe, std::unique_ptr<bro::PipePair> pipe,
pid_t arg_stem_pid) pid_t arg_stem_pid)
@ -242,12 +272,31 @@ void zeek::Supervisor::Process()
int bytes_read = read(stem_pipe->InFD(), buf, 256); int bytes_read = read(stem_pipe->InFD(), buf, 256);
if ( bytes_read > 0 ) if ( bytes_read > 0 )
msg_buffer.append(buf, bytes_read);
auto msgs = extract_messages(&msg_buffer);
for ( auto& msg : msgs )
{ {
DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %.*s", bytes_read, buf); DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %s", msg.data());
std::vector<std::string> msg_tokens;
tokenize_string(msg, " ", &msg_tokens);
const auto& type = msg_tokens[0];
if ( type == "status" )
{
const auto& name = msg_tokens[1];
auto it = nodes.find(name);
if ( it != nodes.end() )
it->second.pid = std::stoi(msg_tokens[2]);
}
else
reporter->Error("Supervisor got unknown msg: %s", msg.data());
} }
} }
StemState::StemState(std::unique_ptr<bro::PipePair> p) Stem::Stem(std::unique_ptr<bro::PipePair> p)
: signal_flare(new bro::Flare()), pipe(std::move(p)) : signal_flare(new bro::Flare()), pipe(std::move(p))
{ {
zeek::set_thread_name("zeek.stem"); zeek::set_thread_name("zeek.stem");
@ -255,15 +304,30 @@ StemState::StemState(std::unique_ptr<bro::PipePair> p)
stem_state = this; stem_state = this;
setsignal(SIGCHLD, stem_sigchld_handler); setsignal(SIGCHLD, stem_sigchld_handler);
setsignal(SIGTERM, stem_sigterm_handler); setsignal(SIGTERM, stem_sigterm_handler);
// TODO: changing the process group here so that SIGINT to the
// supervisor doesn't also get passed to the children. i.e. supervisor
// should be in charge of initiating orderly shutdown. But calling
// just setpgid() like this is technically a race-condition -- need
// to do more work of blocking SIGINT before fork(), unblocking after,
// then also calling setpgid() from parent. And just not doing that
// until more is known whether that's the right SIGINT behavior in
// the first place.
auto res = setpgid(0, 0);
if ( res == -1 )
fprintf(stderr, "failed to set stem process group: %s\n",
strerror(errno));
} }
StemState::~StemState() Stem::~Stem()
{ {
setsignal(SIGCHLD, SIG_DFL); setsignal(SIGCHLD, SIG_DFL);
setsignal(SIGTERM, SIG_DFL); setsignal(SIGTERM, SIG_DFL);
} }
void StemState::Reap() void Stem::Reap()
{ {
for ( auto& n : nodes ) for ( auto& n : nodes )
{ {
@ -272,42 +336,78 @@ void StemState::Reap()
if ( ! node.pid ) if ( ! node.pid )
continue; continue;
int status; Wait(&node, WNOHANG);
auto res = waitpid(node.pid, &status, WNOHANG);
if ( res == 0 )
// It's still alive.
continue;
if ( res == -1 )
{
fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n",
node.name.data(), node.pid, strerror(errno));
continue;
}
if ( WIFEXITED(status) )
{
node.exit_status = WEXITSTATUS(status);
// TODO: may be some cases where the node is intended to exit
printf("node '%s' exited with status %d\n",
node.name.data(), node.exit_status);
}
else if ( WIFSIGNALED(status) )
{
node.signal_number = WTERMSIG(status);
printf("node '%s' terminated by signal %d\n",
node.name.data(), node.signal_number);
}
else
fprintf(stderr, "Stem failed to get node exit status %s (%d)\n",
node.name.data(), node.pid);
node.pid = 0;
} }
} }
std::string StemState::Revive() bool Stem::Wait(zeek::Supervisor::Node* node, int options) const
{
int status;
auto res = waitpid(node->pid, &status, options);
if ( res == 0 )
// It's still alive.
return false;
if ( res == -1 )
{
fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n",
node->name.data(), node->pid, strerror(errno));
return false;
}
if ( WIFEXITED(status) )
{
node->exit_status = WEXITSTATUS(status);
// TODO: may be some cases where the node is intended to exit
printf("node '%s' exited with status %d\n",
node->name.data(), node->exit_status);
}
else if ( WIFSIGNALED(status) )
{
node->signal_number = WTERMSIG(status);
printf("node '%s' terminated by signal %d\n",
node->name.data(), node->signal_number);
}
else
fprintf(stderr, "Stem failed to get node exit status %s (%d)\n",
node->name.data(), node->pid);
node->pid = 0;
return true;
}
void Stem::KillNode(const zeek::Supervisor::Node& node, int signal) const
{
auto kill_res = kill(node.pid, signal);
if ( kill_res == -1 )
fprintf(stderr, "Failed to send signal to node %s: %s",
node.name.data(), strerror(errno));
}
void Stem::Destroy(zeek::Supervisor::Node* node) const
{
constexpr auto max_term_attempts = 13;
constexpr auto kill_delay = 2;
auto kill_attempts = 0;
for ( ; ; )
{
auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL;
KillNode(*node, sig);
usleep(10);
if ( Wait(node, WNOHANG) )
break;
printf("Stem waiting to destroy node: %s (%d)\n",
node->name.data(), node->pid);
sleep(kill_delay);
}
}
std::string Stem::Revive()
{ {
constexpr auto attempts_before_delay_increase = 3; constexpr auto attempts_before_delay_increase = 3;
constexpr auto delay_increase_factor = 2; constexpr auto delay_increase_factor = 2;
@ -343,12 +443,14 @@ std::string StemState::Revive()
if ( Spawn(&node) ) if ( Spawn(&node) )
return node.name; return node.name;
ReportStatus(node);
} }
return ""; return "";
} }
bool StemState::Spawn(zeek::Supervisor::Node* node) bool Stem::Spawn(zeek::Supervisor::Node* node)
{ {
auto node_pid = fork(); auto node_pid = fork();
@ -371,27 +473,7 @@ bool StemState::Spawn(zeek::Supervisor::Node* node)
return false; return false;
} }
std::vector<std::string> StemState::ExtractMessages(std::string* buffer) const int Stem::AliveNodeCount() const
{
std::vector<std::string> rval;
for ( ; ; )
{
auto msg_end = buffer->find('\0');
if ( msg_end == std::string::npos )
// Don't have any full messages left
break;
auto msg = buffer->substr(0, msg_end);
rval.emplace_back(std::move(msg));
buffer->erase(0, msg_end + 1);
}
return rval;
}
int StemState::AliveNodeCount() const
{ {
auto rval = 0; auto rval = 0;
@ -402,28 +484,16 @@ int StemState::AliveNodeCount() const
return rval; return rval;
} }
void StemState::KillNodes(int signal) const void Stem::KillNodes(int signal) const
{ {
for ( const auto& n : nodes ) for ( const auto& n : nodes )
{ KillNode(n.second, signal);
const auto& node = n.second;
auto kill_res = kill(node.pid, signal);
if ( kill_res == -1 )
{
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
fprintf(stderr, "Failed to send signal to node %s: %s",
node.name.data(), tmp);
}
}
} }
void StemState::Shutdown(int exit_code) void Stem::Shutdown(int exit_code)
{ {
constexpr auto max_term_attempts = 13; constexpr auto max_term_attempts = 13;
constexpr auto kill_delay = 2; constexpr auto kill_delay = 2;
auto kill_attempts = 0; auto kill_attempts = 0;
for ( ; ; ) for ( ; ; )
@ -431,6 +501,7 @@ void StemState::Shutdown(int exit_code)
auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL;
printf("Stem killed nodes with signal %d\n", sig); printf("Stem killed nodes with signal %d\n", sig);
KillNodes(sig); KillNodes(sig);
usleep(10);
Reap(); Reap();
auto nodes_alive = AliveNodeCount(); auto nodes_alive = AliveNodeCount();
@ -459,136 +530,149 @@ void StemState::Shutdown(int exit_code)
} }
} }
std::string StemState::Run() void Stem::ReportStatus(const zeek::Supervisor::Node& node) const
{ {
// TODO: changing the process group here so that SIGINT to the std::string msg = fmt("status %s %d", node.name.data(), node.pid);
// supervisor doesn't also get passed to the children. i.e. supervisor safe_write(pipe->OutFD(), msg.data(), msg.size() + 1);
// should be in charge of initiating orderly shutdown. But calling }
// just setpgid() like this is technically a race-condition -- need
// to do more work of blocking SIGINT before fork(), unblocking after,
// then also calling setpgid() from parent. And just not doing that
// until more is known whether that's the right SIGINT behavior in
// the first place.
auto res = setpgid(0, 0);
if ( res == -1 )
fprintf(stderr, "failed to set stem process group: %s\n",
strerror(errno));
std::string Stem::Run()
{
for ( ; ; ) for ( ; ; )
{ {
pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, auto new_node_name = Poll();
{ signal_flare->FD(), POLLIN, 0} };
constexpr auto poll_timeout_ms = 1000;
auto res = poll(fds, 2, poll_timeout_ms);
if ( res < 0 ) if ( ! new_node_name.empty() )
return new_node_name;
}
return "";
}
std::string Stem::Poll()
{
pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 },
{ signal_flare->FD(), POLLIN, 0} };
constexpr auto poll_timeout_ms = 1000;
auto res = poll(fds, 2, poll_timeout_ms);
if ( res < 0 )
{
if ( errno != EINTR )
{ {
if ( errno != EINTR ) fprintf(stderr, "Stem poll() failed: %s\n", strerror(errno));
{ return {};
fprintf(stderr, "poll() failed: %s\n", strerror(errno));
continue;
}
} }
}
if ( getppid() == 1 ) if ( getppid() == 1 )
{ {
// TODO: better way to detect loss of parent than polling ? // TODO: better way to detect loss of parent than polling ?
printf("Stem suicide\n"); // e.g. prctl(PR_SET_PDEATHSIG, ...) on Linux
Shutdown(13); // or procctl(PROC_PDEATHSIG_CTL) on FreeBSD
} printf("Stem suicide\n");
Shutdown(13);
}
auto new_node_name = Revive();
if ( ! new_node_name.empty() )
return new_node_name;
if ( res == 0 )
return {};
if ( signal_flare->Extinguish() )
{
if ( shutting_down )
Shutdown(0);
Reap();
auto new_node_name = Revive(); auto new_node_name = Revive();
if ( ! new_node_name.empty() ) if ( ! new_node_name.empty() )
return new_node_name; return new_node_name;
// TODO: periodically send node status updates back to supervisor?
// e.g. can fill in information gaps in the supervisor's node map
// for things such as node PIDs.
if ( res == 0 )
continue;
if ( signal_flare->Extinguish() )
{
if ( shutting_down )
Shutdown(0);
Reap();
auto new_node_name = Revive();
if ( ! new_node_name.empty() )
return new_node_name;
}
if ( ! fds[0].revents )
continue;
char buf[256];
int bytes_read = read(pipe->InFD(), buf, 256);
if ( bytes_read == 0 )
// EOF
exit(0);
if ( bytes_read < 0 )
{
fprintf(stderr, "read() failed: %s\n", strerror(errno));
continue;
}
msg_buffer.append(buf, bytes_read);
auto msgs = ExtractMessages(&msg_buffer);
for ( auto& msg : msgs )
{
// TODO: improve message format ...
std::vector<std::string> msg_tokens;
tokenize_string(std::move(msg), " ", &msg_tokens);
const auto& cmd = msg_tokens[0];
const auto& node_name = msg_tokens[1];
if ( cmd == "create" )
{
assert(nodes.find(node_name) == nodes.end());
zeek::Supervisor::Node node;
node.name = node_name;
if ( Spawn(&node) )
// TODO: probably want to return the full configuration the
// new node ought to use
return node.name;
// TODO: get stem printfs going through standard Zeek debug.log
printf("Stem created node: %s (%d)\n", node.name.data(), node.pid);
nodes.emplace(node_name, std::move(node));
}
else if ( cmd == "destroy" )
{
auto res = nodes.erase(node_name);
assert(res > 0 );
printf("Stem destroying node: %s\n", node_name.data());
// TODO: kill
}
else if ( cmd == "restart" )
{
auto it = nodes.find(node_name);
assert(it != nodes.end());
printf("Stem restarting node: %s\n", node_name.data());
// TODO: re-use logic for destroy then create
}
else
fprintf(stderr, "unknown supervisor message: %s", cmd.data());
}
} }
if ( ! fds[0].revents )
return {};
char buf[256];
int bytes_read = read(pipe->InFD(), buf, 256);
if ( bytes_read == 0 )
{
// EOF, supervisor must have exited
printf("Stem EOF\n");
Shutdown(14);
}
if ( bytes_read < 0 )
{
fprintf(stderr, "Stem read() failed: %s\n", strerror(errno));
return {};
}
msg_buffer.append(buf, bytes_read);
auto msgs = extract_messages(&msg_buffer);
for ( auto& msg : msgs )
{
// TODO: improve message format ...
std::vector<std::string> msg_tokens;
tokenize_string(std::move(msg), " ", &msg_tokens);
const auto& cmd = msg_tokens[0];
const auto& node_name = msg_tokens[1];
if ( cmd == "create" )
{
assert(nodes.find(node_name) == nodes.end());
zeek::Supervisor::Node node;
node.name = node_name;
if ( Spawn(&node) )
// TODO: probably want to return the full configuration the
// new node ought to use
return node.name;
// TODO: get stem printfs going through standard Zeek debug.log
printf("Stem created node: %s (%d)\n", node.name.data(), node.pid);
auto it = nodes.emplace(node_name, std::move(node)).first;
ReportStatus(it->second);
}
else if ( cmd == "destroy" )
{
auto it = nodes.find(node_name);
assert(it != nodes.end());
auto& node = it->second;
printf("Stem destroying node: %s\n", node_name.data());
Destroy(&node);
nodes.erase(it);
}
else if ( cmd == "restart" )
{
auto it = nodes.find(node_name);
assert(it != nodes.end());
auto& node = it->second;
printf("Stem restarting node: %s\n", node_name.data());
Destroy(&node);
if ( Spawn(&node) )
return node.name;
ReportStatus(node);
}
else
fprintf(stderr, "unknown supervisor message: %s", cmd.data());
}
return {};
} }
std::string zeek::Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe) std::string zeek::Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe)
{ {
StemState ss(std::move(pipe)); Stem s(std::move(pipe));
return ss.Run(); return s.Run();
} }
static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node) static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node)
@ -602,20 +686,20 @@ static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node)
{ {
auto rval = new RecordVal(BifType::Record::Supervisor::Node); auto rval = new RecordVal(BifType::Record::Supervisor::Node);
rval->Assign(0, new StringVal(node.name)); rval->Assign(0, new StringVal(node.name));
if ( node.pid )
rval->Assign(1, val_mgr->GetCount(node.pid));
return rval; return rval;
} }
RecordVal* zeek::Supervisor::Status(const std::string& node_name) RecordVal* zeek::Supervisor::Status(const std::string& node_name)
{ {
// TODO: handle node classes // TODO: handle node classes
// TODO: return real status information
static auto count = 0;
auto rval = new RecordVal(BifType::Record::Supervisor::Status); auto rval = new RecordVal(BifType::Record::Supervisor::Status);
rval->Assign(0, val_mgr->GetCount(count++));
auto tt = BifType::Record::Supervisor::Status->FieldType("nodes"); auto tt = BifType::Record::Supervisor::Status->FieldType("nodes");
auto node_table_val = new TableVal(tt->AsTableType()); auto node_table_val = new TableVal(tt->AsTableType());
rval->Assign(1, node_table_val); rval->Assign(0, node_table_val);
for ( const auto& n : nodes ) for ( const auto& n : nodes )
{ {
@ -633,6 +717,10 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val)
{ {
auto node = node_val_to_struct(node_val); auto node = node_val_to_struct(node_val);
if ( node.name.find(' ') != std::string::npos )
return fmt("node names must not contain spaces: '%s'",
node.name.data());
if ( nodes.find(node.name) != nodes.end() ) if ( nodes.find(node.name) != nodes.end() )
return fmt("node with name '%s' already exists", node.name.data()); return fmt("node with name '%s' already exists", node.name.data());

View file

@ -70,6 +70,7 @@ private:
std::unique_ptr<bro::PipePair> stem_pipe; std::unique_ptr<bro::PipePair> stem_pipe;
bro::Flare signal_flare; bro::Flare signal_flare;
std::map<std::string, Node> nodes; std::map<std::string, Node> nodes;
std::string msg_buffer;
}; };
extern Supervisor* supervisor; extern Supervisor* supervisor;