diff --git a/src/Pipe.cc b/src/Pipe.cc index 97ccd7f3c5..9957e25dce 100644 --- a/src/Pipe.cc +++ b/src/Pipe.cc @@ -141,3 +141,9 @@ Pipe& Pipe::operator=(const Pipe& other) status_flags[1] = other.status_flags[1]; return *this; } + +PipePair::PipePair(int flags, int status_flags, int* fds) + : pipes{Pipe(flags, flags, status_flags, status_flags, fds ? fds + 0 : nullptr), + Pipe(flags, flags, status_flags, status_flags, fds ? fds + 2 : nullptr)} + { + } diff --git a/src/Pipe.h b/src/Pipe.h index ad09d58b3f..18786c75ad 100644 --- a/src/Pipe.h +++ b/src/Pipe.h @@ -66,4 +66,34 @@ private: int status_flags[2]; }; +class PipePair { +public: + + PipePair(int flags, int status_flags, int* fds = nullptr); + + Pipe& In() + { return pipes[swapped]; } + + Pipe& Out() + { return pipes[!swapped]; } + + const Pipe& In() const + { return pipes[swapped]; } + + const Pipe& Out() const + { return pipes[!swapped]; } + + int InFD() const + { return In().ReadFD(); } + + int OutFD() const + { return Out().WriteFD(); } + + void Swap() + { swapped = ! swapped; } + + Pipe pipes[2]; + bool swapped = false; +}; + } // namespace bro diff --git a/src/Supervisor.cc b/src/Supervisor.cc index 3493ac301f..030b28b256 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -26,7 +26,7 @@ static RETSIGTYPE supervisor_sig_handler(int signo) } zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg, - std::unique_ptr pipe, + std::unique_ptr pipe, pid_t arg_stem_pid) : config(std::move(cfg)), stem_pid(arg_stem_pid), stem_pipe(std::move(pipe)) { @@ -128,16 +128,18 @@ void zeek::Supervisor::HandleChildSignal() bro_strerror_r(errno, tmp, sizeof(tmp)); reporter->Error("failed to fork Zeek supervisor stem process: %s\n", tmp); signal_flare.Fire(); - // Sleep to avoid spining too fast in a revival-fail loop. + // Sleep to avoid spinning too fast in a revival-fail loop. sleep(1); } else if ( stem_pid == 0 ) { char stem_env[256]; - safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d", - stem_pipe->ReadFD(), stem_pipe->WriteFD()); + safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d,%d,%d", + stem_pipe->In().ReadFD(), stem_pipe->In().WriteFD(), + stem_pipe->Out().ReadFD(), stem_pipe->Out().WriteFD()); char* env[] = { stem_env, (char*)0 }; - stem_pipe->UnsetFlags(FD_CLOEXEC); + stem_pipe->In().UnsetFlags(FD_CLOEXEC); + stem_pipe->Out().UnsetFlags(FD_CLOEXEC); auto res = execle(config.zeek_exe_path.data(), config.zeek_exe_path.data(), (char*)0, env); @@ -150,6 +152,16 @@ void zeek::Supervisor::HandleChildSignal() else { DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid); + // Recreate the desired process hierarchy. + + // TODO: probably a preferred order in which to create nodes + // e.g. logger, manager, proxy, worker + for ( const auto& n : nodes ) + { + const auto& node = n.second; + std::string msg = fmt("create %s", node.name.data()); + safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); + } } } @@ -161,14 +173,11 @@ void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) { read->Insert(signal_flare.FD()); - read->Insert(stem_pipe->ReadFD()); + read->Insert(stem_pipe->InFD()); } double zeek::Supervisor::NextTimestamp(double* local_network_time) { - // We're only asked for a timestamp if either (1) a FD was ready - // or (2) we're not idle (and we go idle if when Process is no-op), - // so there's no case where returning -1 to signify a skip will help. return timer_mgr->Time(); } @@ -177,7 +186,7 @@ void zeek::Supervisor::Process() HandleChildSignal(); char buf[256]; - int bytes_read = read(stem_pipe->ReadFD(), buf, 256); + int bytes_read = read(stem_pipe->InFD(), buf, 256); if ( bytes_read > 0 ) { @@ -185,9 +194,10 @@ void zeek::Supervisor::Process() } } -void zeek::Supervisor::RunStem(std::unique_ptr pipe) +std::string zeek::Supervisor::RunStem(std::unique_ptr pipe) { - zeek::set_thread_name("zeek-stem"); + zeek::set_thread_name("zeek.stem"); + pipe->Swap(); // TODO: changing the process group here so that SIGINT to the // supervisor doesn't also get passed to the children. i.e. supervisor // should be in charge of initiating orderly shutdown. But calling @@ -229,7 +239,7 @@ void zeek::Supervisor::RunStem(std::unique_ptr pipe) { // TODO: better way to detect loss of parent than polling ? - pollfd fds = { pipe->ReadFD(), POLLIN, 0 }; + pollfd fds = { pipe->InFD(), POLLIN, 0 }; constexpr auto poll_timeout_ms = 1000; auto res = poll(&fds, 1, poll_timeout_ms); @@ -242,11 +252,15 @@ void zeek::Supervisor::RunStem(std::unique_ptr pipe) if ( getppid() == 1 ) exit(0); + // TODO: periodically send node status updates back to supervisor? + // e.g. can fill in information gaps in the supervisor's node map + // for things such as node PIDs. + if ( res == 0 ) continue; char buf[256]; - int bytes_read = read(pipe->ReadFD(), buf, 256); + int bytes_read = read(pipe->InFD(), buf, 256); if ( bytes_read == 0 ) // EOF @@ -271,10 +285,25 @@ void zeek::Supervisor::RunStem(std::unique_ptr pipe) if ( cmd == "create" ) { - auto res = nodes.emplace(node_name, Node{node_name}); - assert(res.second); - // TODO: fork - printf("Stem creating node: %s\n", node_name.data()); + assert(nodes.find(node_name) == nodes.end()); + auto node_pid = fork(); + + if ( node_pid == -1 ) + fprintf(stderr, "failed to fork Zeek node '%s': %s\n", + node_name.data(), strerror(errno)); + else if ( node_pid == 0 ) + { + // TODO: probably want to return the configuration the + // new node ought to use + zeek::set_thread_name(fmt("zeek.%s", node_name.data())); + return node_name; + } + + Node node; + node.name = node_name; + node.pid = node_pid; + nodes.emplace(node_name, node); + printf("Stem created node: %s %d\n", node_name.data(), node_pid); } else if ( cmd == "destroy" ) { @@ -342,7 +371,7 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val) return fmt("node with name '%s' already exists", node.name.data()); std::string msg = fmt("create %s", node.name.data()); - safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); nodes.emplace(node.name, node); return ""; } @@ -355,7 +384,7 @@ bool zeek::Supervisor::Destroy(const std::string& node_name) return false; std::string msg = fmt("destroy %s", node_name.data()); - safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); return true; } @@ -367,6 +396,6 @@ bool zeek::Supervisor::Restart(const std::string& node_name) return false; std::string msg = fmt("restart %s", node_name.data()); - safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); return true; } diff --git a/src/Supervisor.h b/src/Supervisor.h index 6ab2d15564..2bca83f4de 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -17,7 +17,7 @@ namespace zeek { class Supervisor : public iosource::IOSource { public: - static void RunStem(std::unique_ptr pipe); + static std::string RunStem(std::unique_ptr pipe); struct Config { int num_workers = 1; @@ -27,9 +27,10 @@ public: struct Node { std::string name; + pid_t pid = 0; }; - Supervisor(Config cfg, std::unique_ptr stem_pipe, pid_t stem_pid); + Supervisor(Config cfg, std::unique_ptr stem_pipe, pid_t stem_pid); ~Supervisor(); @@ -60,7 +61,7 @@ private: Config config; pid_t stem_pid; - std::unique_ptr stem_pipe; + std::unique_ptr stem_pipe; bro::Flare signal_flare; std::map nodes; }; diff --git a/src/main.cc b/src/main.cc index aeae9230fe..08618a719b 100644 --- a/src/main.cc +++ b/src/main.cc @@ -740,12 +740,12 @@ int main(int argc, char** argv) bool use_supervisor = options.supervised_workers > 0; pid_t stem_pid = 0; - std::unique_ptr supervisor_pipe; + std::unique_ptr supervisor_pipe; + std::string stem_spawn = ""; if ( use_supervisor ) { - supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC, - O_NONBLOCK, O_NONBLOCK}); + supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK}); stem_pid = fork(); if ( stem_pid == -1 ) @@ -756,7 +756,7 @@ int main(int argc, char** argv) } if ( stem_pid == 0 ) - zeek::Supervisor::RunStem(std::move(supervisor_pipe)); + stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); } auto zeek_stem_env = getenv("ZEEK_STEM"); @@ -766,20 +766,34 @@ int main(int argc, char** argv) std::vector fd_strings; tokenize_string(zeek_stem_env, ",", &fd_strings); - if ( fd_strings.size() != 2 ) + if ( fd_strings.size() != 4 ) { fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n", zeek_stem_env); exit(1); } - int fds[2]; - fds[0] = std::stoi(fd_strings[0]); - fds[1] = std::stoi(fd_strings[1]); + int fds[4]; - supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC, - O_NONBLOCK, O_NONBLOCK, fds}); - zeek::Supervisor::RunStem(std::move(supervisor_pipe)); + for ( auto i = 0; i < 4; ++i ) + fds[i] = std::stoi(fd_strings[i]); + + supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds}); + stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe)); + } + + if ( ! stem_spawn.empty() ) + { + for ( ; ; ) + { + // TODO: this no-op loop is here just to test the process hierarchy + printf("node wakeup: %s\n", stem_spawn.data()); + sleep(2); + + // TODO: this re-parenting check needs to go somewhere proper + if ( getppid() == 1 ) + exit(0); + } } std::set_new_handler(bro_new_handler);