Add supervisor stem process auto-revival

This commit is contained in:
Jon Siwek 2019-10-04 13:25:01 -07:00
parent 4959d438fa
commit 52f7647f25
5 changed files with 285 additions and 93 deletions

View file

@ -20,21 +20,52 @@ static void pipe_fail(int eno)
fprintf(stderr, "Pipe failure: %s", tmp); fprintf(stderr, "Pipe failure: %s", tmp);
} }
static void set_flags(int fd, int flags) static int set_flags(int fd, int flags)
{ {
auto rval = fcntl(fd, F_GETFD);
if ( flags ) if ( flags )
if ( fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags) == -1 ) {
rval |= flags;
if ( fcntl(fd, F_SETFD, rval) == -1 )
pipe_fail(errno); pipe_fail(errno);
} }
static void set_status_flags(int fd, int flags) return rval;
}
static int unset_flags(int fd, int flags)
{ {
auto rval = fcntl(fd, F_GETFD);
if ( flags ) if ( flags )
if ( fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags) == -1 ) {
rval &= ~flags;
if ( fcntl(fd, F_SETFD, rval) == -1 )
pipe_fail(errno); pipe_fail(errno);
} }
static int dup_or_fail(int fd, int flags) return rval;
}
static int set_status_flags(int fd, int flags)
{
auto rval = fcntl(fd, F_GETFL);
if ( flags )
{
rval |= flags;
if ( fcntl(fd, F_SETFL, rval) == -1 )
pipe_fail(errno);
}
return rval;
}
static int dup_or_fail(int fd, int flags, int status_flags)
{ {
int rval = dup(fd); int rval = dup(fd);
@ -42,22 +73,41 @@ static int dup_or_fail(int fd, int flags)
pipe_fail(errno); pipe_fail(errno);
set_flags(fd, flags); set_flags(fd, flags);
set_status_flags(fd, status_flags);
return rval; return rval;
} }
Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1) Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1,
int* arg_fds)
{
if ( arg_fds )
{
fds[0] = arg_fds[0];
fds[1] = arg_fds[1];
}
else
{ {
// pipe2 can set flags atomically, but not yet available everywhere. // pipe2 can set flags atomically, but not yet available everywhere.
if ( ::pipe(fds) ) if ( ::pipe(fds) )
pipe_fail(errno); pipe_fail(errno);
}
flags[0] = flags0; flags[0] = set_flags(fds[0], flags[0]);
flags[1] = flags1; flags[1] = set_flags(fds[1], flags[1]);
status_flags[0] = set_status_flags(fds[0], status_flags0);
status_flags[1] = set_status_flags(fds[1], status_flags1);
}
set_flags(fds[0], flags[0]); void Pipe::SetFlags(int arg_flags)
set_flags(fds[1], flags[1]); {
set_status_flags(fds[0], status_flags0); flags[0] = set_flags(fds[0], arg_flags);
set_status_flags(fds[1], status_flags1); flags[1] = set_flags(fds[1], arg_flags);
}
void Pipe::UnsetFlags(int arg_flags)
{
flags[0] = unset_flags(fds[0], arg_flags);
flags[1] = unset_flags(fds[1], arg_flags);
} }
Pipe::~Pipe() Pipe::~Pipe()
@ -68,10 +118,12 @@ Pipe::~Pipe()
Pipe::Pipe(const Pipe& other) Pipe::Pipe(const Pipe& other)
{ {
fds[0] = dup_or_fail(other.fds[0], other.flags[0]); fds[0] = dup_or_fail(other.fds[0], other.flags[0], other.status_flags[0]);
fds[1] = dup_or_fail(other.fds[1], other.flags[1]); fds[1] = dup_or_fail(other.fds[1], other.flags[1], other.status_flags[1]);
flags[0] = other.flags[0]; flags[0] = other.flags[0];
flags[1] = other.flags[1]; flags[1] = other.flags[1];
status_flags[0] = other.status_flags[0];
status_flags[1] = other.status_flags[1];
} }
Pipe& Pipe::operator=(const Pipe& other) Pipe& Pipe::operator=(const Pipe& other)
@ -81,9 +133,11 @@ Pipe& Pipe::operator=(const Pipe& other)
close(fds[0]); close(fds[0]);
close(fds[1]); close(fds[1]);
fds[0] = dup_or_fail(other.fds[0], other.flags[0]); fds[0] = dup_or_fail(other.fds[0], other.flags[0], other.status_flags[0]);
fds[1] = dup_or_fail(other.fds[1], other.flags[1]); fds[1] = dup_or_fail(other.fds[1], other.flags[1], other.status_flags[1]);
flags[0] = other.flags[0]; flags[0] = other.flags[0];
flags[1] = other.flags[1]; flags[1] = other.flags[1];
status_flags[0] = other.status_flags[0];
status_flags[1] = other.status_flags[1];
return *this; return *this;
} }

View file

@ -13,9 +13,12 @@ public:
* @param flags1 file descriptor flags to set on write end of pipe. * @param flags1 file descriptor flags to set on write end of pipe.
* @param status_flags0 descriptor status flags to set on read end of pipe. * @param status_flags0 descriptor status flags to set on read end of pipe.
* @param status_flags1 descriptor status flags to set on write end of pipe. * @param status_flags1 descriptor status flags to set on write end of pipe.
* @param fds may be supplied to open an existing file descriptors rather
* than create ones from a new pipe. Should point to memory containing
* two consecutive file descriptors, the "read" one and then the "write" one.
*/ */
explicit Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0, explicit Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0,
int status_flags1 = 0); int status_flags1 = 0, int* fds = nullptr);
/** /**
* Close the pair of file descriptors owned by the object. * Close the pair of file descriptors owned by the object.
@ -45,9 +48,22 @@ public:
int WriteFD() const int WriteFD() const
{ return fds[1]; } { return fds[1]; }
/**
* Sets the given file descriptor flags for both the read and write end
* of the pipe.
*/
void SetFlags(int flags);
/**
* Unsets the given file descriptor flags for both the read and write end
* of the pipe.
*/
void UnsetFlags(int flags);
private: private:
int fds[2]; int fds[2];
int flags[2]; int flags[2];
int status_flags[2];
}; };
} // namespace bro } // namespace bro

View file

@ -1,6 +1,9 @@
#include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h>
#include <signal.h> #include <signal.h>
#include <fcntl.h>
#include "Supervisor.h" #include "Supervisor.h"
#include "Reporter.h" #include "Reporter.h"
@ -30,11 +33,6 @@ zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg,
SetIdle(true); SetIdle(true);
} }
void zeek::Supervisor::ObserveChildSignal()
{
signal_flare.Fire();
}
zeek::Supervisor::~Supervisor() zeek::Supervisor::~Supervisor()
{ {
if ( ! stem_pid ) if ( ! stem_pid )
@ -68,31 +66,24 @@ zeek::Supervisor::~Supervisor()
} }
} }
void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, void zeek::Supervisor::ObserveChildSignal()
iosource::FD_Set* except)
{ {
read->Insert(signal_flare.FD()); signal_flare.Fire();
read->Insert(stem_pipe->ReadFD());
} }
double zeek::Supervisor::NextTimestamp(double* local_network_time) void zeek::Supervisor::HandleChildSignal()
{ {
// We're only asked for a timestamp if either (1) a FD was ready if ( ! stem_pid )
// or (2) we're not idle (and we go idle if when Process is no-op), return;
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
void zeek::Supervisor::Process()
{
auto child_signals = signal_flare.Extinguish(); auto child_signals = signal_flare.Extinguish();
DBG_LOG(DBG_SUPERVISOR, "process: child_signals %d, stem_pid %d", if ( ! child_signals )
return;
DBG_LOG(DBG_SUPERVISOR, "handle %d child signals, wait for stem pid %d",
child_signals, stem_pid); child_signals, stem_pid);
if ( child_signals && stem_pid )
{
DBG_LOG(DBG_SUPERVISOR, "handle child signal, wait for %d", stem_pid);
int status; int status;
auto res = waitpid(stem_pid, &status, WNOHANG); auto res = waitpid(stem_pid, &status, WNOHANG);
@ -120,16 +111,104 @@ void zeek::Supervisor::Process()
stem_pid = 0; stem_pid = 0;
} }
else else
{
reporter->Error("Supervisor failed to get exit status" reporter->Error("Supervisor failed to get exit status"
" of stem process for unknown reason"); " of stem process for unknown reason");
if ( ! stem_pid )
{
// Revive the Stem process
stem_pid = fork();
if ( stem_pid == -1 )
{
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
reporter->Error("failed to fork Zeek supervisor stem process: %s\n", tmp);
signal_flare.Fire();
// Sleep to avoid spining too fast in a revival-fail loop.
sleep(1);
}
else if ( stem_pid == 0 )
{
char stem_env[256];
safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d",
stem_pipe->ReadFD(), stem_pipe->WriteFD());
char* env[] = { stem_env, (char*)0 };
stem_pipe->UnsetFlags(FD_CLOEXEC);
auto res = execle(config.zeek_exe_path.data(),
config.zeek_exe_path.data(),
(char*)0, env);
char tmp[256];
bro_strerror_r(errno, tmp, sizeof(tmp));
fprintf(stderr, "failed to exec Zeek supervisor stem process: %s\n", tmp);
exit(1);
}
else
{
DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid);
}
} }
// TODO: add proper handling of stem process exiting // TODO: Stem process needs a way to inform Supervisor not to revive
// In wait cases is it ok for the stem process to terminate and }
// in what cases do we need to automatically re-recreate it ?
// And how do we re-create it? It would be too late to fork() again
// because we've potentially already changed so much global state by the void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
// time we get there, so guess we exec() and start over completely ?. iosource::FD_Set* except)
{
read->Insert(signal_flare.FD());
read->Insert(stem_pipe->ReadFD());
}
double zeek::Supervisor::NextTimestamp(double* local_network_time)
{
// We're only asked for a timestamp if either (1) a FD was ready
// or (2) we're not idle (and we go idle if when Process is no-op),
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
void zeek::Supervisor::Process()
{
HandleChildSignal();
char buf[256];
int bytes_read = read(stem_pipe->ReadFD(), buf, 256);
if ( bytes_read > 0 )
{
DBG_LOG(DBG_SUPERVISOR, "read msg from Stem: %.*s", bytes_read, buf);
}
}
void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
{
zeek::set_thread_name("zeek-stem");
// TODO: changing the process group here so that SIGINT to the
// supervisor doesn't also get passed to the children. i.e. supervisor
// should be in charge of initiating orderly shutdown. But calling
// just setpgid() like this is technically a race-condition -- need
// to do more work of blocking SIGINT before fork(), unblocking after,
// then also calling setpgid() from parent. And just not doing that
// until more is known whether that's the right SIGINT behavior in
// the first place.
auto res = setpgid(0, 0);
if ( res == -1 )
fprintf(stderr, "failed to set stem process group: %s\n",
strerror(errno));
for ( ; ; )
{
// TODO: make a proper I/O loop w/ message processing via pipe
// TODO: better way to detect loss of parent than polling
if ( getppid() == 1 )
exit(0);
sleep(5);
printf("Stem wakeup\n");
write(pipe->WriteFD(), "hi", 2);
} }
} }

View file

@ -16,9 +16,12 @@ namespace zeek {
class Supervisor : public iosource::IOSource { class Supervisor : public iosource::IOSource {
public: public:
static void RunStem(std::unique_ptr<bro::Pipe> pipe);
struct Config { struct Config {
int num_workers = 1; int num_workers = 1;
std::vector<std::string> pcaps; std::vector<std::string> pcaps;
std::string zeek_exe_path;
}; };
Supervisor(Config cfg, std::unique_ptr<bro::Pipe> stem_pipe, pid_t stem_pid); Supervisor(Config cfg, std::unique_ptr<bro::Pipe> stem_pipe, pid_t stem_pid);
@ -40,6 +43,8 @@ private:
void Process() override; void Process() override;
void HandleChildSignal();
const char* Tag() override const char* Tag() override
{ return "zeek::Supervisor"; } { return "zeek::Supervisor"; }

View file

@ -679,8 +679,48 @@ static std::vector<std::string> get_script_signature_files()
return rval; return rval;
} }
static std::string get_exe_path(std::string invocation)
{
if ( invocation.empty() )
return "";
if ( invocation[0] == '/' )
// Absolute path
return invocation;
if ( invocation.find('/') != std::string::npos )
{
// Relative path
char cwd[PATH_MAX];
if ( ! getcwd(cwd, sizeof(cwd)) )
{
fprintf(stderr, "failed to get current directory: %s\n",
strerror(errno));
exit(1);
}
return std::string(cwd) + "/" + invocation;
}
auto path = getenv("PATH");
if ( ! path )
return "";
return find_file(invocation, path);
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
auto zeek_exe_path = get_exe_path(argv[0]);
if ( zeek_exe_path.empty() )
{
fprintf(stderr, "failed to get path to executable '%s'", argv[0]);
exit(1);
}
bro_argc = argc; bro_argc = argc;
bro_argv = new char* [argc]; bro_argv = new char* [argc];
@ -716,33 +756,30 @@ int main(int argc, char** argv)
} }
if ( stem_pid == 0 ) if ( stem_pid == 0 )
{ zeek::Supervisor::RunStem(std::move(supervisor_pipe));
zeek::set_thread_name("zeek-stem");
// TODO: changing the process group here so that SIGINT to the
// supervisor doesn't also get passed to the children. i.e. supervisor
// should be in charge of initiating orderly shutdown. But calling
// just setpgid() like this is technically a race-condition -- need
// to do more work of blocking SIGINT before fork(), unblocking after,
// then also calling setpgid() from parent. And just not doing that
// until more is known whether that's the right SIGINT behavior in
// the first place.
auto res = setpgid(0, 0);
if ( res == -1 )
fprintf(stderr, "failed to set stem process group: %s\n",
strerror(errno));
for ( ; ; )
{
// TODO: make a proper I/O loop w/ message processing via pipe
// TODO: better way to detect loss of parent than polling
if ( getppid() == 1 )
exit(0);
sleep(1);
} }
auto zeek_stem_env = getenv("ZEEK_STEM");
if ( zeek_stem_env )
{
std::vector<std::string> fd_strings;
tokenize_string(zeek_stem_env, ",", &fd_strings);
if ( fd_strings.size() != 2 )
{
fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n",
zeek_stem_env);
exit(1);
} }
int fds[2];
fds[0] = std::stoi(fd_strings[0]);
fds[1] = std::stoi(fd_strings[1]);
supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC,
O_NONBLOCK, O_NONBLOCK, fds});
zeek::Supervisor::RunStem(std::move(supervisor_pipe));
} }
std::set_new_handler(bro_new_handler); std::set_new_handler(bro_new_handler);
@ -823,6 +860,7 @@ int main(int argc, char** argv)
zeek::Supervisor::Config cfg = {}; zeek::Supervisor::Config cfg = {};
cfg.pcaps = options.pcap_files; cfg.pcaps = options.pcap_files;
cfg.num_workers = options.supervised_workers; cfg.num_workers = options.supervised_workers;
cfg.zeek_exe_path = zeek_exe_path;
zeek::supervisor = new zeek::Supervisor(std::move(cfg), zeek::supervisor = new zeek::Supervisor(std::move(cfg),
std::move(supervisor_pipe), std::move(supervisor_pipe),
stem_pid); stem_pid);