diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 25b1f274d5..9ebc9b7fa8 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -7,11 +7,11 @@ export { type Node: record { # TODO: add proper config fields name: string; + pid: count &optional; }; type Status: record { - # TODO: add proper status fields - n: count; + # TODO: add more status fields ? nodes: table[string] of Node; }; diff --git a/src/Supervisor.cc b/src/Supervisor.cc index a6b7d7002a..008d1a309e 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -18,35 +18,45 @@ extern "C" { #include "setsignal.h" } -struct StemState { - StemState(std::unique_ptr p); +namespace { +struct Stem { + Stem(std::unique_ptr p); - ~StemState(); + ~Stem(); std::string Run(); + std::string Poll(); + void Reap(); std::string Revive(); bool Spawn(zeek::Supervisor::Node* node); - std::vector ExtractMessages(std::string* buffer) const; - int AliveNodeCount() const; void KillNodes(int signal) const; + void KillNode(const zeek::Supervisor::Node& node, int signal) const; + + void Destroy(zeek::Supervisor::Node* node) const; + + bool Wait(zeek::Supervisor::Node* node, int options) const; + void Shutdown(int exit_code); + void ReportStatus(const zeek::Supervisor::Node& node) const; + std::unique_ptr signal_flare; std::unique_ptr pipe; std::map nodes; std::string msg_buffer; bool shutting_down = false; }; +} -static StemState* stem_state = nullptr; +static Stem* stem_state = nullptr; static RETSIGTYPE stem_sigchld_handler(int signo) { @@ -73,6 +83,26 @@ static RETSIGTYPE supervisor_sig_handler(int signo) return RETSIGVAL; } +static std::vector extract_messages(std::string* buffer) + { + std::vector rval; + + for ( ; ; ) + { + auto msg_end = buffer->find('\0'); + + if ( msg_end == std::string::npos ) + // Don't have any full messages left + break; + + auto msg = buffer->substr(0, msg_end); + rval.emplace_back(std::move(msg)); + buffer->erase(0, msg_end + 1); + } + + return rval; + } + zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg, std::unique_ptr pipe, pid_t arg_stem_pid) @@ -242,12 +272,31 @@ void zeek::Supervisor::Process() int bytes_read = read(stem_pipe->InFD(), buf, 256); if ( bytes_read > 0 ) + msg_buffer.append(buf, bytes_read); + + auto msgs = extract_messages(&msg_buffer); + + for ( auto& msg : msgs ) { - DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %.*s", bytes_read, buf); + DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %s", msg.data()); + std::vector msg_tokens; + tokenize_string(msg, " ", &msg_tokens); + const auto& type = msg_tokens[0]; + + if ( type == "status" ) + { + const auto& name = msg_tokens[1]; + auto it = nodes.find(name); + + if ( it != nodes.end() ) + it->second.pid = std::stoi(msg_tokens[2]); + } + else + reporter->Error("Supervisor got unknown msg: %s", msg.data()); } } -StemState::StemState(std::unique_ptr p) +Stem::Stem(std::unique_ptr p) : signal_flare(new bro::Flare()), pipe(std::move(p)) { zeek::set_thread_name("zeek.stem"); @@ -255,15 +304,30 @@ StemState::StemState(std::unique_ptr p) stem_state = this; setsignal(SIGCHLD, stem_sigchld_handler); setsignal(SIGTERM, stem_sigterm_handler); + + // TODO: changing the process group here so that SIGINT to the + // supervisor doesn't also get passed to the children. i.e. supervisor + // should be in charge of initiating orderly shutdown. But calling + // just setpgid() like this is technically a race-condition -- need + // to do more work of blocking SIGINT before fork(), unblocking after, + // then also calling setpgid() from parent. And just not doing that + // until more is known whether that's the right SIGINT behavior in + // the first place. + auto res = setpgid(0, 0); + + if ( res == -1 ) + fprintf(stderr, "failed to set stem process group: %s\n", + strerror(errno)); + } -StemState::~StemState() +Stem::~Stem() { setsignal(SIGCHLD, SIG_DFL); setsignal(SIGTERM, SIG_DFL); } -void StemState::Reap() +void Stem::Reap() { for ( auto& n : nodes ) { @@ -272,42 +336,78 @@ void StemState::Reap() if ( ! node.pid ) continue; - int status; - auto res = waitpid(node.pid, &status, WNOHANG); - - if ( res == 0 ) - // It's still alive. - continue; - - if ( res == -1 ) - { - fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n", - node.name.data(), node.pid, strerror(errno)); - continue; - } - - if ( WIFEXITED(status) ) - { - node.exit_status = WEXITSTATUS(status); - // TODO: may be some cases where the node is intended to exit - printf("node '%s' exited with status %d\n", - node.name.data(), node.exit_status); - } - else if ( WIFSIGNALED(status) ) - { - node.signal_number = WTERMSIG(status); - printf("node '%s' terminated by signal %d\n", - node.name.data(), node.signal_number); - } - else - fprintf(stderr, "Stem failed to get node exit status %s (%d)\n", - node.name.data(), node.pid); - - node.pid = 0; + Wait(&node, WNOHANG); } } -std::string StemState::Revive() +bool Stem::Wait(zeek::Supervisor::Node* node, int options) const + { + int status; + auto res = waitpid(node->pid, &status, options); + + if ( res == 0 ) + // It's still alive. + return false; + + if ( res == -1 ) + { + fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n", + node->name.data(), node->pid, strerror(errno)); + return false; + } + + if ( WIFEXITED(status) ) + { + node->exit_status = WEXITSTATUS(status); + // TODO: may be some cases where the node is intended to exit + printf("node '%s' exited with status %d\n", + node->name.data(), node->exit_status); + } + else if ( WIFSIGNALED(status) ) + { + node->signal_number = WTERMSIG(status); + printf("node '%s' terminated by signal %d\n", + node->name.data(), node->signal_number); + } + else + fprintf(stderr, "Stem failed to get node exit status %s (%d)\n", + node->name.data(), node->pid); + + node->pid = 0; + return true; + } + +void Stem::KillNode(const zeek::Supervisor::Node& node, int signal) const + { + auto kill_res = kill(node.pid, signal); + + if ( kill_res == -1 ) + fprintf(stderr, "Failed to send signal to node %s: %s", + node.name.data(), strerror(errno)); + } + +void Stem::Destroy(zeek::Supervisor::Node* node) const + { + constexpr auto max_term_attempts = 13; + constexpr auto kill_delay = 2; + auto kill_attempts = 0; + + for ( ; ; ) + { + auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; + KillNode(*node, sig); + usleep(10); + + if ( Wait(node, WNOHANG) ) + break; + + printf("Stem waiting to destroy node: %s (%d)\n", + node->name.data(), node->pid); + sleep(kill_delay); + } + } + +std::string Stem::Revive() { constexpr auto attempts_before_delay_increase = 3; constexpr auto delay_increase_factor = 2; @@ -343,12 +443,14 @@ std::string StemState::Revive() if ( Spawn(&node) ) return node.name; + + ReportStatus(node); } return ""; } -bool StemState::Spawn(zeek::Supervisor::Node* node) +bool Stem::Spawn(zeek::Supervisor::Node* node) { auto node_pid = fork(); @@ -371,27 +473,7 @@ bool StemState::Spawn(zeek::Supervisor::Node* node) return false; } -std::vector StemState::ExtractMessages(std::string* buffer) const - { - std::vector rval; - - for ( ; ; ) - { - auto msg_end = buffer->find('\0'); - - if ( msg_end == std::string::npos ) - // Don't have any full messages left - break; - - auto msg = buffer->substr(0, msg_end); - rval.emplace_back(std::move(msg)); - buffer->erase(0, msg_end + 1); - } - - return rval; - } - -int StemState::AliveNodeCount() const +int Stem::AliveNodeCount() const { auto rval = 0; @@ -402,28 +484,16 @@ int StemState::AliveNodeCount() const return rval; } -void StemState::KillNodes(int signal) const +void Stem::KillNodes(int signal) const { for ( const auto& n : nodes ) - { - const auto& node = n.second; - auto kill_res = kill(node.pid, signal); - - if ( kill_res == -1 ) - { - char tmp[256]; - bro_strerror_r(errno, tmp, sizeof(tmp)); - fprintf(stderr, "Failed to send signal to node %s: %s", - node.name.data(), tmp); - } - } + KillNode(n.second, signal); } -void StemState::Shutdown(int exit_code) +void Stem::Shutdown(int exit_code) { constexpr auto max_term_attempts = 13; constexpr auto kill_delay = 2; - auto kill_attempts = 0; for ( ; ; ) @@ -431,6 +501,7 @@ void StemState::Shutdown(int exit_code) auto sig = kill_attempts++ < max_term_attempts ? SIGTERM : SIGKILL; printf("Stem killed nodes with signal %d\n", sig); KillNodes(sig); + usleep(10); Reap(); auto nodes_alive = AliveNodeCount(); @@ -459,136 +530,149 @@ void StemState::Shutdown(int exit_code) } } -std::string StemState::Run() +void Stem::ReportStatus(const zeek::Supervisor::Node& node) const { - // TODO: changing the process group here so that SIGINT to the - // supervisor doesn't also get passed to the children. i.e. supervisor - // should be in charge of initiating orderly shutdown. But calling - // just setpgid() like this is technically a race-condition -- need - // to do more work of blocking SIGINT before fork(), unblocking after, - // then also calling setpgid() from parent. And just not doing that - // until more is known whether that's the right SIGINT behavior in - // the first place. - auto res = setpgid(0, 0); - - if ( res == -1 ) - fprintf(stderr, "failed to set stem process group: %s\n", - strerror(errno)); + std::string msg = fmt("status %s %d", node.name.data(), node.pid); + safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); + } +std::string Stem::Run() + { for ( ; ; ) { - pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, - { signal_flare->FD(), POLLIN, 0} }; - constexpr auto poll_timeout_ms = 1000; - auto res = poll(fds, 2, poll_timeout_ms); + auto new_node_name = Poll(); - if ( res < 0 ) + if ( ! new_node_name.empty() ) + return new_node_name; + } + + return ""; + } + +std::string Stem::Poll() + { + pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, + { signal_flare->FD(), POLLIN, 0} }; + constexpr auto poll_timeout_ms = 1000; + auto res = poll(fds, 2, poll_timeout_ms); + + if ( res < 0 ) + { + if ( errno != EINTR ) { - if ( errno != EINTR ) - { - fprintf(stderr, "poll() failed: %s\n", strerror(errno)); - continue; - } + fprintf(stderr, "Stem poll() failed: %s\n", strerror(errno)); + return {}; } + } - if ( getppid() == 1 ) - { - // TODO: better way to detect loss of parent than polling ? - printf("Stem suicide\n"); - Shutdown(13); - } + if ( getppid() == 1 ) + { + // TODO: better way to detect loss of parent than polling ? + // e.g. prctl(PR_SET_PDEATHSIG, ...) on Linux + // or procctl(PROC_PDEATHSIG_CTL) on FreeBSD + printf("Stem suicide\n"); + Shutdown(13); + } + auto new_node_name = Revive(); + + if ( ! new_node_name.empty() ) + return new_node_name; + + if ( res == 0 ) + return {}; + + if ( signal_flare->Extinguish() ) + { + if ( shutting_down ) + Shutdown(0); + + Reap(); auto new_node_name = Revive(); if ( ! new_node_name.empty() ) return new_node_name; - - // TODO: periodically send node status updates back to supervisor? - // e.g. can fill in information gaps in the supervisor's node map - // for things such as node PIDs. - - if ( res == 0 ) - continue; - - if ( signal_flare->Extinguish() ) - { - if ( shutting_down ) - Shutdown(0); - - Reap(); - auto new_node_name = Revive(); - - if ( ! new_node_name.empty() ) - return new_node_name; - } - - if ( ! fds[0].revents ) - continue; - - char buf[256]; - int bytes_read = read(pipe->InFD(), buf, 256); - - if ( bytes_read == 0 ) - // EOF - exit(0); - - if ( bytes_read < 0 ) - { - fprintf(stderr, "read() failed: %s\n", strerror(errno)); - continue; - } - - msg_buffer.append(buf, bytes_read); - auto msgs = ExtractMessages(&msg_buffer); - - for ( auto& msg : msgs ) - { - // TODO: improve message format ... - std::vector msg_tokens; - tokenize_string(std::move(msg), " ", &msg_tokens); - const auto& cmd = msg_tokens[0]; - const auto& node_name = msg_tokens[1]; - - if ( cmd == "create" ) - { - assert(nodes.find(node_name) == nodes.end()); - zeek::Supervisor::Node node; - node.name = node_name; - - if ( Spawn(&node) ) - // TODO: probably want to return the full configuration the - // new node ought to use - return node.name; - - // TODO: get stem printfs going through standard Zeek debug.log - printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); - nodes.emplace(node_name, std::move(node)); - } - else if ( cmd == "destroy" ) - { - auto res = nodes.erase(node_name); - assert(res > 0 ); - printf("Stem destroying node: %s\n", node_name.data()); - // TODO: kill - } - else if ( cmd == "restart" ) - { - auto it = nodes.find(node_name); - assert(it != nodes.end()); - printf("Stem restarting node: %s\n", node_name.data()); - // TODO: re-use logic for destroy then create - } - else - fprintf(stderr, "unknown supervisor message: %s", cmd.data()); - } } + if ( ! fds[0].revents ) + return {}; + + char buf[256]; + int bytes_read = read(pipe->InFD(), buf, 256); + + if ( bytes_read == 0 ) + { + // EOF, supervisor must have exited + printf("Stem EOF\n"); + Shutdown(14); + } + + if ( bytes_read < 0 ) + { + fprintf(stderr, "Stem read() failed: %s\n", strerror(errno)); + return {}; + } + + msg_buffer.append(buf, bytes_read); + auto msgs = extract_messages(&msg_buffer); + + for ( auto& msg : msgs ) + { + // TODO: improve message format ... + std::vector msg_tokens; + tokenize_string(std::move(msg), " ", &msg_tokens); + const auto& cmd = msg_tokens[0]; + const auto& node_name = msg_tokens[1]; + + if ( cmd == "create" ) + { + assert(nodes.find(node_name) == nodes.end()); + zeek::Supervisor::Node node; + node.name = node_name; + + if ( Spawn(&node) ) + // TODO: probably want to return the full configuration the + // new node ought to use + return node.name; + + // TODO: get stem printfs going through standard Zeek debug.log + printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); + auto it = nodes.emplace(node_name, std::move(node)).first; + ReportStatus(it->second); + } + else if ( cmd == "destroy" ) + { + auto it = nodes.find(node_name); + assert(it != nodes.end()); + auto& node = it->second; + printf("Stem destroying node: %s\n", node_name.data()); + Destroy(&node); + nodes.erase(it); + } + else if ( cmd == "restart" ) + { + auto it = nodes.find(node_name); + assert(it != nodes.end()); + auto& node = it->second; + printf("Stem restarting node: %s\n", node_name.data()); + Destroy(&node); + + if ( Spawn(&node) ) + return node.name; + + ReportStatus(node); + } + else + fprintf(stderr, "unknown supervisor message: %s", cmd.data()); + } + + return {}; } std::string zeek::Supervisor::RunStem(std::unique_ptr pipe) { - StemState ss(std::move(pipe)); - return ss.Run(); + Stem s(std::move(pipe)); + return s.Run(); } static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node) @@ -602,20 +686,20 @@ static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node) { auto rval = new RecordVal(BifType::Record::Supervisor::Node); rval->Assign(0, new StringVal(node.name)); + + if ( node.pid ) + rval->Assign(1, val_mgr->GetCount(node.pid)); + return rval; } RecordVal* zeek::Supervisor::Status(const std::string& node_name) { // TODO: handle node classes - // TODO: return real status information - static auto count = 0; auto rval = new RecordVal(BifType::Record::Supervisor::Status); - rval->Assign(0, val_mgr->GetCount(count++)); - auto tt = BifType::Record::Supervisor::Status->FieldType("nodes"); auto node_table_val = new TableVal(tt->AsTableType()); - rval->Assign(1, node_table_val); + rval->Assign(0, node_table_val); for ( const auto& n : nodes ) { @@ -633,6 +717,10 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val) { auto node = node_val_to_struct(node_val); + if ( node.name.find(' ') != std::string::npos ) + return fmt("node names must not contain spaces: '%s'", + node.name.data()); + if ( nodes.find(node.name) != nodes.end() ) return fmt("node with name '%s' already exists", node.name.data()); diff --git a/src/Supervisor.h b/src/Supervisor.h index a9209f5471..5402c23f44 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -70,6 +70,7 @@ private: std::unique_ptr stem_pipe; bro::Flare signal_flare; std::map nodes; + std::string msg_buffer; }; extern Supervisor* supervisor;