From 773b39e52ed655a90651bed2b482b83f0a26a509 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Fri, 18 Oct 2019 17:57:20 -0700 Subject: [PATCH] Finish implementing supervisor infrastructure The process hierarchy and all supervisor control commands are now working (e.g. status, create, destroy, restart), but nodes are not currently spawned with the desired configuration parameters so they don't yet operate as real cluster nodes (e.g. worker, logger, manager, proxy). --- scripts/base/frameworks/supervisor/api.zeek | 4 +- src/Supervisor.cc | 480 ++++++++++++-------- src/Supervisor.h | 1 + 3 files changed, 287 insertions(+), 198 deletions(-) diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 25b1f274d5..9ebc9b7fa8 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -7,11 +7,11 @@ export { type Node: record { # TODO: add proper config fields name: string; + pid: count &optional; }; type Status: record { - # TODO: add proper status fields - n: count; + # TODO: add more status fields ? nodes: table[string] of Node; }; diff --git a/src/Supervisor.cc b/src/Supervisor.cc index a6b7d7002a..008d1a309e 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -18,35 +18,45 @@ extern "C" { #include "setsignal.h" } -struct StemState { - StemState(std::unique_ptr p); +namespace { +struct Stem { + Stem(std::unique_ptr p); - ~StemState(); + ~Stem(); std::string Run(); + std::string Poll(); + void Reap(); std::string Revive(); bool Spawn(zeek::Supervisor::Node* node); - std::vector ExtractMessages(std::string* buffer) const; - int AliveNodeCount() const; void KillNodes(int signal) const; + void KillNode(const zeek::Supervisor::Node& node, int signal) const; + + void Destroy(zeek::Supervisor::Node* node) const; + + bool Wait(zeek::Supervisor::Node* node, int options) const; + void Shutdown(int exit_code); + void ReportStatus(const zeek::Supervisor::Node& node) const; + std::unique_ptr signal_flare; std::unique_ptr pipe; std::map nodes; std::string msg_buffer; bool shutting_down = false; }; +} -static StemState* stem_state = nullptr; +static Stem* stem_state = nullptr; static RETSIGTYPE stem_sigchld_handler(int signo) { @@ -73,6 +83,26 @@ static RETSIGTYPE supervisor_sig_handler(int signo) return RETSIGVAL; } +static std::vector extract_messages(std::string* buffer) + { + std::vector rval; + + for ( ; ; ) + { + auto msg_end = buffer->find('\0'); + + if ( msg_end == std::string::npos ) + // Don't have any full messages left + break; + + auto msg = buffer->substr(0, msg_end); + rval.emplace_back(std::move(msg)); + buffer->erase(0, msg_end + 1); + } + + return rval; + } + zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg, std::unique_ptr pipe, pid_t arg_stem_pid) @@ -242,12 +272,31 @@ void zeek::Supervisor::Process() int bytes_read = read(stem_pipe->InFD(), buf, 256); if ( bytes_read > 0 ) + msg_buffer.append(buf, bytes_read); + + auto msgs = extract_messages(&msg_buffer); + + for ( auto& msg : msgs ) { - DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %.*s", bytes_read, buf); + DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %s", msg.data()); + std::vector msg_tokens; + tokenize_string(msg, " ", &msg_tokens); + const auto& type = msg_tokens[0]; + + if ( type == "status" ) + { + const auto& name = msg_tokens[1]; + auto it = nodes.find(name); + + if ( it != nodes.end() ) + it->second.pid = std::stoi(msg_tokens[2]); + } + else + reporter->Error("Supervisor got unknown msg: %s", msg.data()); } } -StemState::StemState(std::unique_ptr p) +Stem::Stem(std::unique_ptr p) : signal_flare(new bro::Flare()), pipe(std::move(p)) { zeek::set_thread_name("zeek.stem"); @@ -255,15 +304,30 @@ StemState::StemState(std::unique_ptr p) stem_state = this; setsignal(SIGCHLD, stem_sigchld_handler); setsignal(SIGTERM, stem_sigterm_handler); + + // TODO: changing the process group here so that SIGINT to the + // supervisor doesn't also get passed to the children. i.e. supervisor + // should be in charge of initiating orderly shutdown. But calling + // just setpgid() like this is technically a race-condition -- need + // to do more work of blocking SIGINT before fork(), unblocking after, + // then also calling setpgid() from parent. And just not doing that + // until more is known whether that's the right SIGINT behavior in + // the first place. + auto res = setpgid(0, 0); + + if ( res == -1 ) + fprintf(stderr, "failed to set stem process group: %s\n", + strerror(errno)); + } -StemState::~StemState() +Stem::~Stem() { setsignal(SIGCHLD, SIG_DFL); setsignal(SIGTERM, SIG_DFL); } -void StemState::Reap() +void Stem::Reap() { for ( auto& n : nodes ) { @@ -272,42 +336,78 @@ void StemState::Reap() if ( ! node.pid ) continue; - int status; - auto res = waitpid(node.pid, &status, WNOHANG); - - if ( res == 0 ) - // It's still alive. - continue; - - if ( res == -1 ) - { - fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n", - node.name.data(), node.pid, strerror(errno)); - continue; - } - - if ( WIFEXITED(status) ) - { - node.exit_status = WEXITSTATUS(status); - // TODO: may be some cases where the node is intended to exit - printf("node '%s' exited with status %d\n", - node.name.data(), node.exit_status); - } - else if ( WIFSIGNALED(status) ) - { - node.signal_number = WTERMSIG(status); - printf("node '%s' terminated by signal %d\n", - node.name.data(), node.signal_number); - } - else - fprintf(stderr, "Stem failed to get node exit status %s (%d)\n", - node.name.data(), node.pid); - - node.pid = 0; + Wait(&node, WNOHANG); } } -std::string StemState::Revive() +bool Stem::Wait(zeek::Supervisor::Node* node, int options) const + { + int status; + auto res = waitpid(node->pid, &status, options); + + if ( res == 0 ) + // It's still alive. + return false; + + if ( res == -1 ) + { + fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n", + node->name.data(), node->pid, strerror(errno)); + return false; + } + + if ( WIFEXITED(status) ) + { + node->exit_status = WEXITSTATUS(status); + // TODO: may be some cases where the node is intended to exit + printf("node '%s' exited with status %d\n", + node->name.data(), node->exit_status); + } + else if ( WIFSIGNALED(status) ) + { + node->signal_number = WTERMSIG(status); + printf("node '%s' terminated by signal %d\n", + node->name.data(), node->signal_number); + } + else + fprintf(stderr, "Stem failed to get node exit status %s (%d)\n", + node->name.data(), node->pid); + + node->pid = 0; + return true; + } + +void Stem::KillNode(const zeek::Supervisor::Node& node, int signal) const + { + auto kill_res = kill(node.pid, signal); + + if ( kill_res == -1 ) + fprintf(stderr, "Failed to send signal to node %s: %s", + node.name.data(), strerror(errno)); + } + +void Stem::Destroy(zeek::Supervisor::Node* node) const + { + constexpr auto max_term_attempts = 13; + constexpr auto kill_delay = 2; + auto kill_attempts = 0; + + for ( ; ; ) + { + auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; + KillNode(*node, sig); + usleep(10); + + if ( Wait(node, WNOHANG) ) + break; + + printf("Stem waiting to destroy node: %s (%d)\n", + node->name.data(), node->pid); + sleep(kill_delay); + } + } + +std::string Stem::Revive() { constexpr auto attempts_before_delay_increase = 3; constexpr auto delay_increase_factor = 2; @@ -343,12 +443,14 @@ std::string StemState::Revive() if ( Spawn(&node) ) return node.name; + + ReportStatus(node); } return ""; } -bool StemState::Spawn(zeek::Supervisor::Node* node) +bool Stem::Spawn(zeek::Supervisor::Node* node) { auto node_pid = fork(); @@ -371,27 +473,7 @@ bool StemState::Spawn(zeek::Supervisor::Node* node) return false; } -std::vector StemState::ExtractMessages(std::string* buffer) const - { - std::vector rval; - - for ( ; ; ) - { - auto msg_end = buffer->find('\0'); - - if ( msg_end == std::string::npos ) - // Don't have any full messages left - break; - - auto msg = buffer->substr(0, msg_end); - rval.emplace_back(std::move(msg)); - buffer->erase(0, msg_end + 1); - } - - return rval; - } - -int StemState::AliveNodeCount() const +int Stem::AliveNodeCount() const { auto rval = 0; @@ -402,28 +484,16 @@ int StemState::AliveNodeCount() const return rval; } -void StemState::KillNodes(int signal) const +void Stem::KillNodes(int signal) const { for ( const auto& n : nodes ) - { - const auto& node = n.second; - auto kill_res = kill(node.pid, signal); - - if ( kill_res == -1 ) - { - char tmp[256]; - bro_strerror_r(errno, tmp, sizeof(tmp)); - fprintf(stderr, "Failed to send signal to node %s: %s", - node.name.data(), tmp); - } - } + KillNode(n.second, signal); } -void StemState::Shutdown(int exit_code) +void Stem::Shutdown(int exit_code) { constexpr auto max_term_attempts = 13; constexpr auto kill_delay = 2; - auto kill_attempts = 0; for ( ; ; ) @@ -431,6 +501,7 @@ void StemState::Shutdown(int exit_code) auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; printf("Stem killed nodes with signal %d\n", sig); KillNodes(sig); + usleep(10); Reap(); auto nodes_alive = AliveNodeCount(); @@ -459,136 +530,149 @@ void StemState::Shutdown(int exit_code) } } -std::string StemState::Run() +void Stem::ReportStatus(const zeek::Supervisor::Node& node) const { - // TODO: changing the process group here so that SIGINT to the - // supervisor doesn't also get passed to the children. i.e. supervisor - // should be in charge of initiating orderly shutdown. But calling - // just setpgid() like this is technically a race-condition -- need - // to do more work of blocking SIGINT before fork(), unblocking after, - // then also calling setpgid() from parent. And just not doing that - // until more is known whether that's the right SIGINT behavior in - // the first place. - auto res = setpgid(0, 0); - - if ( res == -1 ) - fprintf(stderr, "failed to set stem process group: %s\n", - strerror(errno)); + std::string msg = fmt("status %s %d", node.name.data(), node.pid); + safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); + } +std::string Stem::Run() + { for ( ; ; ) { - pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, - { signal_flare->FD(), POLLIN, 0} }; - constexpr auto poll_timeout_ms = 1000; - auto res = poll(fds, 2, poll_timeout_ms); + auto new_node_name = Poll(); - if ( res < 0 ) + if ( ! new_node_name.empty() ) + return new_node_name; + } + + return ""; + } + +std::string Stem::Poll() + { + pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, + { signal_flare->FD(), POLLIN, 0} }; + constexpr auto poll_timeout_ms = 1000; + auto res = poll(fds, 2, poll_timeout_ms); + + if ( res < 0 ) + { + if ( errno != EINTR ) { - if ( errno != EINTR ) - { - fprintf(stderr, "poll() failed: %s\n", strerror(errno)); - continue; - } + fprintf(stderr, "Stem poll() failed: %s\n", strerror(errno)); + return {}; } + } - if ( getppid() == 1 ) - { - // TODO: better way to detect loss of parent than polling ? - printf("Stem suicide\n"); - Shutdown(13); - } + if ( getppid() == 1 ) + { + // TODO: better way to detect loss of parent than polling ? + // e.g. prctl(PR_SET_PDEATHSIG, ...) on Linux + // or procctl(PROC_PDEATHSIG_CTL) on FreeBSD + printf("Stem suicide\n"); + Shutdown(13); + } + auto new_node_name = Revive(); + + if ( ! new_node_name.empty() ) + return new_node_name; + + if ( res == 0 ) + return {}; + + if ( signal_flare->Extinguish() ) + { + if ( shutting_down ) + Shutdown(0); + + Reap(); auto new_node_name = Revive(); if ( ! new_node_name.empty() ) return new_node_name; - - // TODO: periodically send node status updates back to supervisor? - // e.g. can fill in information gaps in the supervisor's node map - // for things such as node PIDs. - - if ( res == 0 ) - continue; - - if ( signal_flare->Extinguish() ) - { - if ( shutting_down ) - Shutdown(0); - - Reap(); - auto new_node_name = Revive(); - - if ( ! new_node_name.empty() ) - return new_node_name; - } - - if ( ! fds[0].revents ) - continue; - - char buf[256]; - int bytes_read = read(pipe->InFD(), buf, 256); - - if ( bytes_read == 0 ) - // EOF - exit(0); - - if ( bytes_read < 0 ) - { - fprintf(stderr, "read() failed: %s\n", strerror(errno)); - continue; - } - - msg_buffer.append(buf, bytes_read); - auto msgs = ExtractMessages(&msg_buffer); - - for ( auto& msg : msgs ) - { - // TODO: improve message format ... - std::vector msg_tokens; - tokenize_string(std::move(msg), " ", &msg_tokens); - const auto& cmd = msg_tokens[0]; - const auto& node_name = msg_tokens[1]; - - if ( cmd == "create" ) - { - assert(nodes.find(node_name) == nodes.end()); - zeek::Supervisor::Node node; - node.name = node_name; - - if ( Spawn(&node) ) - // TODO: probably want to return the full configuration the - // new node ought to use - return node.name; - - // TODO: get stem printfs going through standard Zeek debug.log - printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); - nodes.emplace(node_name, std::move(node)); - } - else if ( cmd == "destroy" ) - { - auto res = nodes.erase(node_name); - assert(res > 0 ); - printf("Stem destroying node: %s\n", node_name.data()); - // TODO: kill - } - else if ( cmd == "restart" ) - { - auto it = nodes.find(node_name); - assert(it != nodes.end()); - printf("Stem restarting node: %s\n", node_name.data()); - // TODO: re-use logic for destroy then create - } - else - fprintf(stderr, "unknown supervisor message: %s", cmd.data()); - } } + if ( ! fds[0].revents ) + return {}; + + char buf[256]; + int bytes_read = read(pipe->InFD(), buf, 256); + + if ( bytes_read == 0 ) + { + // EOF, supervisor must have exited + printf("Stem EOF\n"); + Shutdown(14); + } + + if ( bytes_read < 0 ) + { + fprintf(stderr, "Stem read() failed: %s\n", strerror(errno)); + return {}; + } + + msg_buffer.append(buf, bytes_read); + auto msgs = extract_messages(&msg_buffer); + + for ( auto& msg : msgs ) + { + // TODO: improve message format ... + std::vector msg_tokens; + tokenize_string(std::move(msg), " ", &msg_tokens); + const auto& cmd = msg_tokens[0]; + const auto& node_name = msg_tokens[1]; + + if ( cmd == "create" ) + { + assert(nodes.find(node_name) == nodes.end()); + zeek::Supervisor::Node node; + node.name = node_name; + + if ( Spawn(&node) ) + // TODO: probably want to return the full configuration the + // new node ought to use + return node.name; + + // TODO: get stem printfs going through standard Zeek debug.log + printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); + auto it = nodes.emplace(node_name, std::move(node)).first; + ReportStatus(it->second); + } + else if ( cmd == "destroy" ) + { + auto it = nodes.find(node_name); + assert(it != nodes.end()); + auto& node = it->second; + printf("Stem destroying node: %s\n", node_name.data()); + Destroy(&node); + nodes.erase(it); + } + else if ( cmd == "restart" ) + { + auto it = nodes.find(node_name); + assert(it != nodes.end()); + auto& node = it->second; + printf("Stem restarting node: %s\n", node_name.data()); + Destroy(&node); + + if ( Spawn(&node) ) + return node.name; + + ReportStatus(node); + } + else + fprintf(stderr, "unknown supervisor message: %s", cmd.data()); + } + + return {}; } std::string zeek::Supervisor::RunStem(std::unique_ptr pipe) { - StemState ss(std::move(pipe)); - return ss.Run(); + Stem s(std::move(pipe)); + return s.Run(); } static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node) @@ -602,20 +686,20 @@ static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node) { auto rval = new RecordVal(BifType::Record::Supervisor::Node); rval->Assign(0, new StringVal(node.name)); + + if ( node.pid ) + rval->Assign(1, val_mgr->GetCount(node.pid)); + return rval; } RecordVal* zeek::Supervisor::Status(const std::string& node_name) { // TODO: handle node classes - // TODO: return real status information - static auto count = 0; auto rval = new RecordVal(BifType::Record::Supervisor::Status); - rval->Assign(0, val_mgr->GetCount(count++)); - auto tt = BifType::Record::Supervisor::Status->FieldType("nodes"); auto node_table_val = new TableVal(tt->AsTableType()); - rval->Assign(1, node_table_val); + rval->Assign(0, node_table_val); for ( const auto& n : nodes ) { @@ -633,6 +717,10 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val) { auto node = node_val_to_struct(node_val); + if ( node.name.find(' ') != std::string::npos ) + return fmt("node names must not contain spaces: '%s'", + node.name.data()); + if ( nodes.find(node.name) != nodes.end() ) return fmt("node with name '%s' already exists", node.name.data()); diff --git a/src/Supervisor.h b/src/Supervisor.h index a9209f5471..5402c23f44 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -70,6 +70,7 @@ private: std::unique_ptr stem_pipe; bro::Flare signal_flare; std::map nodes; + std::string msg_buffer; }; extern Supervisor* supervisor;