Add supervisor node creation

This commit is contained in:
Jon Siwek 2019-10-16 20:10:25 -07:00
parent 7c08488dfc
commit 573e127672
5 changed files with 115 additions and 35 deletions

View file

@ -141,3 +141,9 @@ Pipe& Pipe::operator=(const Pipe& other)
status_flags[1] = other.status_flags[1];
return *this;
}
PipePair::PipePair(int flags, int status_flags, int* fds)
: pipes{Pipe(flags, flags, status_flags, status_flags, fds ? fds + 0 : nullptr),
Pipe(flags, flags, status_flags, status_flags, fds ? fds + 2 : nullptr)}
{
}

View file

@ -66,4 +66,34 @@ private:
int status_flags[2];
};
class PipePair {
public:
PipePair(int flags, int status_flags, int* fds = nullptr);
Pipe& In()
{ return pipes[swapped]; }
Pipe& Out()
{ return pipes[!swapped]; }
const Pipe& In() const
{ return pipes[swapped]; }
const Pipe& Out() const
{ return pipes[!swapped]; }
int InFD() const
{ return In().ReadFD(); }
int OutFD() const
{ return Out().WriteFD(); }
void Swap()
{ swapped = ! swapped; }
Pipe pipes[2];
bool swapped = false;
};
} // namespace bro

View file

@ -26,7 +26,7 @@ static RETSIGTYPE supervisor_sig_handler(int signo)
}
zeek::Supervisor::Supervisor(zeek::Supervisor::Config cfg,
std::unique_ptr<bro::Pipe> pipe,
std::unique_ptr<bro::PipePair> pipe,
pid_t arg_stem_pid)
: config(std::move(cfg)), stem_pid(arg_stem_pid), stem_pipe(std::move(pipe))
{
@ -128,16 +128,18 @@ void zeek::Supervisor::HandleChildSignal()
bro_strerror_r(errno, tmp, sizeof(tmp));
reporter->Error("failed to fork Zeek supervisor stem process: %s\n", tmp);
signal_flare.Fire();
// Sleep to avoid spining too fast in a revival-fail loop.
// Sleep to avoid spinning too fast in a revival-fail loop.
sleep(1);
}
else if ( stem_pid == 0 )
{
char stem_env[256];
safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d",
stem_pipe->ReadFD(), stem_pipe->WriteFD());
safe_snprintf(stem_env, sizeof(stem_env), "ZEEK_STEM=%d,%d,%d,%d",
stem_pipe->In().ReadFD(), stem_pipe->In().WriteFD(),
stem_pipe->Out().ReadFD(), stem_pipe->Out().WriteFD());
char* env[] = { stem_env, (char*)0 };
stem_pipe->UnsetFlags(FD_CLOEXEC);
stem_pipe->In().UnsetFlags(FD_CLOEXEC);
stem_pipe->Out().UnsetFlags(FD_CLOEXEC);
auto res = execle(config.zeek_exe_path.data(),
config.zeek_exe_path.data(),
(char*)0, env);
@ -150,6 +152,16 @@ void zeek::Supervisor::HandleChildSignal()
else
{
DBG_LOG(DBG_SUPERVISOR, "stem process revived, new pid: %d", stem_pid);
// Recreate the desired process hierarchy.
// TODO: probably a preferred order in which to create nodes
// e.g. logger, manager, proxy, worker
for ( const auto& n : nodes )
{
const auto& node = n.second;
std::string msg = fmt("create %s", node.name.data());
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
}
}
}
@ -161,14 +173,11 @@ void zeek::Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(signal_flare.FD());
read->Insert(stem_pipe->ReadFD());
read->Insert(stem_pipe->InFD());
}
double zeek::Supervisor::NextTimestamp(double* local_network_time)
{
// We're only asked for a timestamp if either (1) a FD was ready
// or (2) we're not idle (and we go idle if when Process is no-op),
// so there's no case where returning -1 to signify a skip will help.
return timer_mgr->Time();
}
@ -177,7 +186,7 @@ void zeek::Supervisor::Process()
HandleChildSignal();
char buf[256];
int bytes_read = read(stem_pipe->ReadFD(), buf, 256);
int bytes_read = read(stem_pipe->InFD(), buf, 256);
if ( bytes_read > 0 )
{
@ -185,9 +194,10 @@ void zeek::Supervisor::Process()
}
}
void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
std::string zeek::Supervisor::RunStem(std::unique_ptr<bro::PipePair> pipe)
{
zeek::set_thread_name("zeek-stem");
zeek::set_thread_name("zeek.stem");
pipe->Swap();
// TODO: changing the process group here so that SIGINT to the
// supervisor doesn't also get passed to the children. i.e. supervisor
// should be in charge of initiating orderly shutdown. But calling
@ -229,7 +239,7 @@ void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
{
// TODO: better way to detect loss of parent than polling ?
pollfd fds = { pipe->ReadFD(), POLLIN, 0 };
pollfd fds = { pipe->InFD(), POLLIN, 0 };
constexpr auto poll_timeout_ms = 1000;
auto res = poll(&fds, 1, poll_timeout_ms);
@ -242,11 +252,15 @@ void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
if ( getppid() == 1 )
exit(0);
// TODO: periodically send node status updates back to supervisor?
// e.g. can fill in information gaps in the supervisor's node map
// for things such as node PIDs.
if ( res == 0 )
continue;
char buf[256];
int bytes_read = read(pipe->ReadFD(), buf, 256);
int bytes_read = read(pipe->InFD(), buf, 256);
if ( bytes_read == 0 )
// EOF
@ -271,10 +285,25 @@ void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
if ( cmd == "create" )
{
auto res = nodes.emplace(node_name, Node{node_name});
assert(res.second);
// TODO: fork
printf("Stem creating node: %s\n", node_name.data());
assert(nodes.find(node_name) == nodes.end());
auto node_pid = fork();
if ( node_pid == -1 )
fprintf(stderr, "failed to fork Zeek node '%s': %s\n",
node_name.data(), strerror(errno));
else if ( node_pid == 0 )
{
// TODO: probably want to return the configuration the
// new node ought to use
zeek::set_thread_name(fmt("zeek.%s", node_name.data()));
return node_name;
}
Node node;
node.name = node_name;
node.pid = node_pid;
nodes.emplace(node_name, node);
printf("Stem created node: %s %d\n", node_name.data(), node_pid);
}
else if ( cmd == "destroy" )
{
@ -342,7 +371,7 @@ std::string zeek::Supervisor::Create(const RecordVal* node_val)
return fmt("node with name '%s' already exists", node.name.data());
std::string msg = fmt("create %s", node.name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
nodes.emplace(node.name, node);
return "";
}
@ -355,7 +384,7 @@ bool zeek::Supervisor::Destroy(const std::string& node_name)
return false;
std::string msg = fmt("destroy %s", node_name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
return true;
}
@ -367,6 +396,6 @@ bool zeek::Supervisor::Restart(const std::string& node_name)
return false;
std::string msg = fmt("restart %s", node_name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1);
return true;
}

View file

@ -17,7 +17,7 @@ namespace zeek {
class Supervisor : public iosource::IOSource {
public:
static void RunStem(std::unique_ptr<bro::Pipe> pipe);
static std::string RunStem(std::unique_ptr<bro::PipePair> pipe);
struct Config {
int num_workers = 1;
@ -27,9 +27,10 @@ public:
struct Node {
std::string name;
pid_t pid = 0;
};
Supervisor(Config cfg, std::unique_ptr<bro::Pipe> stem_pipe, pid_t stem_pid);
Supervisor(Config cfg, std::unique_ptr<bro::PipePair> stem_pipe, pid_t stem_pid);
~Supervisor();
@ -60,7 +61,7 @@ private:
Config config;
pid_t stem_pid;
std::unique_ptr<bro::Pipe> stem_pipe;
std::unique_ptr<bro::PipePair> stem_pipe;
bro::Flare signal_flare;
std::map<std::string, Node> nodes;
};

View file

@ -740,12 +740,12 @@ int main(int argc, char** argv)
bool use_supervisor = options.supervised_workers > 0;
pid_t stem_pid = 0;
std::unique_ptr<bro::Pipe> supervisor_pipe;
std::unique_ptr<bro::PipePair> supervisor_pipe;
std::string stem_spawn = "";
if ( use_supervisor )
{
supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC,
O_NONBLOCK, O_NONBLOCK});
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK});
stem_pid = fork();
if ( stem_pid == -1 )
@ -756,7 +756,7 @@ int main(int argc, char** argv)
}
if ( stem_pid == 0 )
zeek::Supervisor::RunStem(std::move(supervisor_pipe));
stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe));
}
auto zeek_stem_env = getenv("ZEEK_STEM");
@ -766,20 +766,34 @@ int main(int argc, char** argv)
std::vector<std::string> fd_strings;
tokenize_string(zeek_stem_env, ",", &fd_strings);
if ( fd_strings.size() != 2 )
if ( fd_strings.size() != 4 )
{
fprintf(stderr, "invalid ZEEK_STEM environment variable value: '%s'\n",
zeek_stem_env);
exit(1);
}
int fds[2];
fds[0] = std::stoi(fd_strings[0]);
fds[1] = std::stoi(fd_strings[1]);
int fds[4];
supervisor_pipe.reset(new bro::Pipe{FD_CLOEXEC, FD_CLOEXEC,
O_NONBLOCK, O_NONBLOCK, fds});
zeek::Supervisor::RunStem(std::move(supervisor_pipe));
for ( auto i = 0; i < 4; ++i )
fds[i] = std::stoi(fd_strings[i]);
supervisor_pipe.reset(new bro::PipePair{FD_CLOEXEC, O_NONBLOCK, fds});
stem_spawn = zeek::Supervisor::RunStem(std::move(supervisor_pipe));
}
if ( ! stem_spawn.empty() )
{
for ( ; ; )
{
// TODO: this no-op loop is here just to test the process hierarchy
printf("node wakeup: %s\n", stem_spawn.data());
sleep(2);
// TODO: this re-parenting check needs to go somewhere proper
if ( getppid() == 1 )
exit(0);
}
}
std::set_new_handler(bro_new_handler);