diff --git a/src/Pipe.cc b/src/Pipe.cc index 7276571402..97ccd7f3c5 100644 --- a/src/Pipe.cc +++ b/src/Pipe.cc @@ -20,21 +20,52 @@ static void pipe_fail(int eno) fprintf(stderr, "Pipe failure: %s", tmp); } -static void set_flags(int fd, int flags) +static int set_flags(int fd, int flags) { + auto rval = fcntl(fd, F_GETFD); + if ( flags ) - if ( fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags) == -1 ) + { + rval |= flags; + + if ( fcntl(fd, F_SETFD, rval) == -1 ) pipe_fail(errno); + } + + return rval; } -static void set_status_flags(int fd, int flags) +static int unset_flags(int fd, int flags) { + auto rval = fcntl(fd, F_GETFD); + if ( flags ) - if ( fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags) == -1 ) + { + rval &= ~flags; + + if ( fcntl(fd, F_SETFD, rval) == -1 ) pipe_fail(errno); + } + + return rval; } -static int dup_or_fail(int fd, int flags) +static int set_status_flags(int fd, int flags) + { + auto rval = fcntl(fd, F_GETFL); + + if ( flags ) + { + rval |= flags; + + if ( fcntl(fd, F_SETFL, rval) == -1 ) + pipe_fail(errno); + } + + return rval; + } + +static int dup_or_fail(int fd, int flags, int status_flags) { int rval = dup(fd); @@ -42,22 +73,41 @@ static int dup_or_fail(int fd, int flags) pipe_fail(errno); set_flags(fd, flags); + set_status_flags(fd, status_flags); return rval; } -Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1) +Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1, + int* arg_fds) { - // pipe2 can set flags atomically, but not yet available everywhere. - if ( ::pipe(fds) ) - pipe_fail(errno); + if ( arg_fds ) + { + fds[0] = arg_fds[0]; + fds[1] = arg_fds[1]; + } + else + { + // pipe2 can set flags atomically, but not yet available everywhere. + if ( ::pipe(fds) ) + pipe_fail(errno); + } - flags[0] = flags0; - flags[1] = flags1; + flags[0] = set_flags(fds[0], flags[0]); + flags[1] = set_flags(fds[1], flags[1]); + status_flags[0] = set_status_flags(fds[0], status_flags0); + status_flags[1] = set_status_flags(fds[1], status_flags1); + } - set_flags(fds[0], flags[0]); - set_flags(fds[1], flags[1]); - set_status_flags(fds[0], status_flags0); - set_status_flags(fds[1], status_flags1); +void Pipe::SetFlags(int arg_flags) + { + flags[0] = set_flags(fds[0], arg_flags); + flags[1] = set_flags(fds[1], arg_flags); + } + +void Pipe::UnsetFlags(int arg_flags) + { + flags[0] = unset_flags(fds[0], arg_flags); + flags[1] = unset_flags(fds[1], arg_flags); } Pipe::~Pipe() @@ -68,10 +118,12 @@ Pipe::~Pipe() Pipe::Pipe(const Pipe& other) { - fds[0] = dup_or_fail(other.fds[0], other.flags[0]); - fds[1] = dup_or_fail(other.fds[1], other.flags[1]); + fds[0] = dup_or_fail(other.fds[0], other.flags[0], other.status_flags[0]); + fds[1] = dup_or_fail(other.fds[1], other.flags[1], other.status_flags[1]); flags[0] = other.flags[0]; flags[1] = other.flags[1]; + status_flags[0] = other.status_flags[0]; + status_flags[1] = other.status_flags[1]; } Pipe& Pipe::operator=(const Pipe& other) @@ -81,9 +133,11 @@ Pipe& Pipe::operator=(const Pipe& other) close(fds[0]); close(fds[1]); - fds[0] = dup_or_fail(other.fds[0], other.flags[0]); - fds[1] = dup_or_fail(other.fds[1], other.flags[1]); + fds[0] = dup_or_fail(other.fds[0], other.flags[0], other.status_flags[0]); + fds[1] = dup_or_fail(other.fds[1], other.flags[1], other.status_flags[1]); flags[0] = other.flags[0]; flags[1] = other.flags[1]; + status_flags[0] = other.status_flags[0]; + status_flags[1] = other.status_flags[1]; return *this; } diff --git a/src/Pipe.h b/src/Pipe.h index eed32bac01..ad09d58b3f 100644 --- a/src/Pipe.h +++ b/src/Pipe.h @@ -13,9 +13,12 @@ public: * @param flags1 file descriptor flags to set on write end of pipe. * @param status_flags0 descriptor status flags to set on read end of pipe. * @param status_flags1 descriptor status flags to set on write end of pipe. + * @param fds may be supplied to open an existing file descriptors rather + * than create ones from a new pipe. Should point to memory containing + * two consecutive file descriptors, the "read" one and then the "write" one. */ explicit Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0, - int status_flags1 = 0); + int status_flags1 = 0, int* fds = nullptr); /** * Close the pair of file descriptors owned by the object. @@ -45,9 +48,22 @@ public: int WriteFD() const { return fds[1]; } + /** + * Sets the given file descriptor flags for both the read and write end + * of the pipe. + */ + void SetFlags(int flags); + + /** + * Unsets the given file descriptor flags for both the read and write end + * of the pipe. + */ + void UnsetFlags(int flags); + private: int fds[2]; int flags[2]; + int status_flags[2]; }; } // namespace bro diff --git a/src/Supervisor.cc b/src/Supervisor.cc index 2d9cc64e18..f63a8e350e 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -1,6 +1,9 @@ +#include #include +#include #include +#include #include "Supervisor.h" #include "Reporter.h" @@ -30,11 +33,6 @@ zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg, SetIdle(true); } -void zeek::Supervisor::ObserveChildSignal() - { - signal_flare.Fire(); - } - zeek::Supervisor::~Supervisor() { if ( ! stem_pid ) @@ -68,6 +66,94 @@ zeek::Supervisor::~Supervisor() } } +void zeek::Supervisor::ObserveChildSignal() + { + signal_flare.Fire(); + } + +void zeek::Supervisor::HandleChildSignal() + { + if ( ! stem_pid ) + return; + + auto child_signals = signal_flare.Extinguish(); + + if ( ! child_signals ) + return; + + DBG_LOG(DBG_SUPERVISOR, "handle %d child signals, wait for stem pid %d", + child_signals, stem_pid); + + int status; + auto res = waitpid(stem_pid, &status, WNOHANG); + + if ( res == 0 ) + { + DBG_LOG(DBG_SUPERVISOR, "false alarm, stem process still lives"); + } + else if ( res == -1 ) + { + char tmp[256]; + bro_strerror_r(errno, tmp, sizeof(tmp)); + reporter->Error("Supervisor failed to get exit status" + " of stem process: %s", tmp); + } + else if ( WIFEXITED(status) ) + { + DBG_LOG(DBG_SUPERVISOR, "stem process exited with status %d", + WEXITSTATUS(status)); + stem_pid = 0; + } + else if ( WIFSIGNALED(status) ) + { + DBG_LOG(DBG_SUPERVISOR, "stem process terminated by signal %d", + WTERMSIG(status)); + stem_pid = 0; + } + else + reporter->Error("Supervisor failed to get exit status" + " of stem process for unknown reason"); + + if ( ! stem_pid ) + { + // Revive the Stem process + stem_pid = fork(); + + if ( stem_pid == -1 ) + { + char tmp[256]; + bro_strerror_r(errno, tmp, sizeof(tmp)); + reporter->Error("failed to fork Zeek supervisor stem process: %s\n", tmp); + signal_flare.Fire(); + // Sleep to avoid spining too fast in a revival-fail loop. + sleep(1); + } + else if ( stem_pid == 0 ) + { + char stem_env[256]; + safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d", + stem_pipe->ReadFD(), stem_pipe->WriteFD()); + char* env[] = { stem_env, (char*)0 }; + stem_pipe->UnsetFlags(FD_CLOEXEC); + auto res = execle(config.zeek_exe_path.data(), + config.zeek_exe_path.data(), + (char*)0, env); + + char tmp[256]; + bro_strerror_r(errno, tmp, sizeof(tmp)); + fprintf(stderr, "failed to exec Zeek supervisor stem process: %s\n", tmp); + exit(1); + } + else + { + DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid); + } + } + + // TODO: Stem process needs a way to inform Supervisor not to revive + } + + void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) { @@ -85,51 +171,44 @@ double zeek::Supervisor::NextTimestamp(double* local_network_time) void zeek::Supervisor::Process() { - auto child_signals = signal_flare.Extinguish(); + HandleChildSignal(); - DBG_LOG(DBG_SUPERVISOR, "process: child_signals %d, stem_pid %d", - child_signals, stem_pid); + char buf[256]; + int bytes_read = read(stem_pipe->ReadFD(), buf, 256); - if ( child_signals && stem_pid ) + if ( bytes_read > 0 ) { - DBG_LOG(DBG_SUPERVISOR, "handle child signal, wait for %d", stem_pid); - int status; - auto res = waitpid(stem_pid, &status, WNOHANG); - - if ( res == 0 ) - { - DBG_LOG(DBG_SUPERVISOR, "false alarm, stem process still lives"); - } - else if ( res == -1 ) - { - char tmp[256]; - bro_strerror_r(errno, tmp, sizeof(tmp)); - reporter->Error("Supervisor failed to get exit status" - " of stem process: %s", tmp); - } - else if ( WIFEXITED(status) ) - { - DBG_LOG(DBG_SUPERVISOR, "stem process exited with status %d", - WEXITSTATUS(status)); - stem_pid = 0; - } - else if ( WIFSIGNALED(status) ) - { - DBG_LOG(DBG_SUPERVISOR, "stem process terminated by signal %d", - WTERMSIG(status)); - stem_pid = 0; - } - else - { - reporter->Error("Supervisor failed to get exit status" - " of stem process for unknown reason"); - } - - // TODO: add proper handling of stem process exiting - // In wait cases is it ok for the stem process to terminate and - // in what cases do we need to automatically re-recreate it ? - // And how do we re-create it? It would be too late to fork() again - // because we've potentially already changed so much global state by the - // time we get there, so guess we exec() and start over completely ?. + DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %.*s", bytes_read, buf); + } + } + +void zeek::Supervisor::RunStem(std::unique_ptr pipe) + { + zeek::set_thread_name("zeek-stem"); + // TODO: changing the process group here so that SIGINT to the + // supervisor doesn't also get passed to the children. i.e. supervisor + // should be in charge of initiating orderly shutdown. But calling + // just setpgid() like this is technically a race-condition -- need + // to do more work of blocking SIGINT before fork(), unblocking after, + // then also calling setpgid() from parent. And just not doing that + // until more is known whether that's the right SIGINT behavior in + // the first place. + auto res = setpgid(0, 0); + + if ( res == -1 ) + fprintf(stderr, "failed to set stem process group: %s\n", + strerror(errno)); + + for ( ; ; ) + { + // TODO: make a proper I/O loop w/ message processing via pipe + // TODO: better way to detect loss of parent than polling + + if ( getppid() == 1 ) + exit(0); + + sleep(5); + printf("Stem wakeup\n"); + write(pipe->WriteFD(), "hi", 2); } } diff --git a/src/Supervisor.h b/src/Supervisor.h index aa647c5aed..7d553d3b68 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -16,9 +16,12 @@ namespace zeek { class Supervisor : public iosource::IOSource { public: + static void RunStem(std::unique_ptr pipe); + struct Config { int num_workers = 1; std::vector pcaps; + std::string zeek_exe_path; }; Supervisor(Config cfg, std::unique_ptr stem_pipe, pid_t stem_pid); @@ -40,6 +43,8 @@ private: void Process() override; + void HandleChildSignal(); + const char* Tag() override { return "zeek::Supervisor"; } diff --git a/src/main.cc b/src/main.cc index a76355f3a3..aeae9230fe 100644 --- a/src/main.cc +++ b/src/main.cc @@ -679,8 +679,48 @@ static std::vector get_script_signature_files() return rval; } +static std::string get_exe_path(std::string invocation) + { + if ( invocation.empty() ) + return ""; + + if ( invocation[0] == '/' ) + // Absolute path + return invocation; + + if ( invocation.find('/') != std::string::npos ) + { + // Relative path + char cwd[PATH_MAX]; + + if ( ! getcwd(cwd, sizeof(cwd)) ) + { + fprintf(stderr, "failed to get current directory: %s\n", + strerror(errno)); + exit(1); + } + + return std::string(cwd) + "/" + invocation; + } + + auto path = getenv("PATH"); + + if ( ! path ) + return ""; + + return find_file(invocation, path); + } + int main(int argc, char** argv) { + auto zeek_exe_path = get_exe_path(argv[0]); + + if ( zeek_exe_path.empty() ) + { + fprintf(stderr, "failed to get path to executable '%s'", argv[0]); + exit(1); + } + bro_argc = argc; bro_argv = new char* [argc]; @@ -716,33 +756,30 @@ int main(int argc, char** argv) } if ( stem_pid == 0 ) + zeek::Supervisor::RunStem(std::move(supervisor_pipe)); + } + + auto zeek_stem_env = getenv("ZEEK_STEM"); + + if ( zeek_stem_env ) + { + std::vector fd_strings; + tokenize_string(zeek_stem_env, ",", &fd_strings); + + if ( fd_strings.size() != 2 ) { - zeek::set_thread_name("zeek-stem"); - // TODO: changing the process group here so that SIGINT to the - // supervisor doesn't also get passed to the children. i.e. supervisor - // should be in charge of initiating orderly shutdown. But calling - // just setpgid() like this is technically a race-condition -- need - // to do more work of blocking SIGINT before fork(), unblocking after, - // then also calling setpgid() from parent. And just not doing that - // until more is known whether that's the right SIGINT behavior in - // the first place. - auto res = setpgid(0, 0); - - if ( res == -1 ) - fprintf(stderr, "failed to set stem process group: %s\n", - strerror(errno)); - - for ( ; ; ) - { - // TODO: make a proper I/O loop w/ message processing via pipe - // TODO: better way to detect loss of parent than polling - - if ( getppid() == 1 ) - exit(0); - - sleep(1); - } + fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n", + zeek_stem_env); + exit(1); } + + int fds[2]; + fds[0] = std::stoi(fd_strings[0]); + fds[1] = std::stoi(fd_strings[1]); + + supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC, + O_NONBLOCK, O_NONBLOCK, fds}); + zeek::Supervisor::RunStem(std::move(supervisor_pipe)); } std::set_new_handler(bro_new_handler); @@ -823,6 +860,7 @@ int main(int argc, char** argv) zeek::Supervisor::Config cfg = {}; cfg.pcaps = options.pcap_files; cfg.num_workers = options.supervised_workers; + cfg.zeek_exe_path = zeek_exe_path; zeek::supervisor = new zeek::Supervisor(std::move(cfg), std::move(supervisor_pipe), stem_pid);