Add skeleton logic for handling supevisor control messages

This commit is contained in:
Jon Siwek 2019-10-16 15:12:02 -07:00
parent e46cf88435
commit 7c08488dfc
5 changed files with 170 additions and 34 deletions

View file

@ -4,18 +4,19 @@
module Supervisor;
export {
type Status: record {
# TODO: add proper status fields
n: count;
};
type NodeConfig: record {
# TODO: add proper config field
type Node: record {
# TODO: add proper config fields
name: string;
};
type Status: record {
# TODO: add proper status fields
n: count;
nodes: table[string] of Node;
};
global status: function(nodes: string &default="all"): Status;
global create: function(config: NodeConfig): string;
global create: function(node: Node): string;
global destroy: function(nodes: string): bool;
global restart: function(nodes: string &default="all"): bool;
@ -24,7 +25,7 @@ export {
global Supervisor::status_request: event(id: count, nodes: string);
global Supervisor::status_response: event(id: count, result: Status);
global Supervisor::create_request: event(id: count, config: NodeConfig);
global Supervisor::create_request: event(id: count, node: Node);
global Supervisor::create_response: event(id: count, result: string);
global Supervisor::destroy_request: event(id: count, nodes: string);

View file

@ -28,9 +28,9 @@ event Supervisor::status_request(id: count, nodes: string)
Broker::publish(topic, Supervisor::status_response, id, res);
}
event Supervisor::create_request(id: count, config: NodeConfig)
event Supervisor::create_request(id: count, node: Node)
{
local res = Supervisor::create(config);
local res = Supervisor::create(node);
local topic = Supervisor::topic_prefix + "/create_response";
Broker::publish(topic, Supervisor::create_response, id, res);
}
@ -54,9 +54,9 @@ function Supervisor::status(nodes: string): Status
return Supervisor::__status(nodes);
}
function create(config: NodeConfig): string
function create(node: Node): string
{
return Supervisor::__create(config);
return Supervisor::__create(node);
}
function destroy(nodes: string): bool

View file

@ -4,6 +4,7 @@
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <poll.h>
#include "Supervisor.h"
#include "Reporter.h"
@ -201,43 +202,171 @@ void zeek::Supervisor::RunStem(std::unique_ptr<bro::Pipe> pipe)
fprintf(stderr, "failed to set stem process group: %s\n",
strerror(errno));
std::string msg_buffer;
std::map<std::string, Node> nodes;
auto extract_messages = [](std::string* buf) -> std::vector<std::string>
{
std::vector<std::string> rval;
for ( ; ; )
{
// TODO: make a proper I/O loop w/ message processing via pipe
// TODO: better way to detect loss of parent than polling
auto msg_end = buf->find('\0');
if ( msg_end == std::string::npos )
// Don't have a full message yet
break;
auto msg = buf->substr(0, msg_end);
rval.emplace_back(std::move(msg));
buf->erase(0, msg_end + 1);
}
return rval;
};
for ( ; ; )
{
// TODO: better way to detect loss of parent than polling ?
pollfd fds = { pipe->ReadFD(), POLLIN, 0 };
constexpr auto poll_timeout_ms = 1000;
auto res = poll(&fds, 1, poll_timeout_ms);
if ( res < 0 )
{
fprintf(stderr, "poll() failed: %s\n", strerror(errno));
continue;
}
if ( getppid() == 1 )
exit(0);
sleep(5);
printf("Stem wakeup\n");
write(pipe->WriteFD(), "hi", 2);
if ( res == 0 )
continue;
char buf[256];
int bytes_read = read(pipe->ReadFD(), buf, 256);
if ( bytes_read == 0 )
// EOF
exit(0);
if ( bytes_read < 0 )
{
fprintf(stderr, "read() failed: %s\n", strerror(errno));
continue;
}
msg_buffer.append(buf, bytes_read);
auto msgs = extract_messages(&msg_buffer);
for ( auto& msg : msgs )
{
// TODO: improve message format ...
std::vector<std::string> msg_tokens;
tokenize_string(std::move(msg), " ", &msg_tokens);
const auto& cmd = msg_tokens[0];
const auto& node_name = msg_tokens[1];
if ( cmd == "create" )
{
auto res = nodes.emplace(node_name, Node{node_name});
assert(res.second);
// TODO: fork
printf("Stem creating node: %s\n", node_name.data());
}
else if ( cmd == "destroy" )
{
auto res = nodes.erase(node_name);
assert(res > 0 );
printf("Stem destroying node: %s\n", node_name.data());
// TODO: kill
}
else if ( cmd == "restart" )
{
auto it = nodes.find(node_name);
assert(it != nodes.end());
printf("Stem restarting node: %s\n", node_name.data());
// TODO: re-use logic for destroy then create
}
else
fprintf(stderr, "unknown supervisor message: %s", cmd.data());
}
}
}
RecordVal* zeek::Supervisor::Status(const std::string& nodes)
static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node)
{
zeek::Supervisor::Node rval;
rval.name = node->Lookup("name")->AsString()->CheckString();
return rval;
}
static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node)
{
auto rval = new RecordVal(BifType::Record::Supervisor::Node);
rval->Assign(0, new StringVal(node.name));
return rval;
}
RecordVal* zeek::Supervisor::Status(const std::string& node_name)
{
// TODO: handle node classes
// TODO: return real status information
static auto count = 0;
auto rval = new RecordVal(BifType::Record::Supervisor::Status);
rval->Assign(0, val_mgr->GetCount(count++));
auto tt = BifType::Record::Supervisor::Status->FieldType("nodes");
auto node_table_val = new TableVal(tt->AsTableType());
rval->Assign(1, node_table_val);
for ( const auto& n : nodes )
{
const auto& node = n.second;
auto key = new StringVal(node.name);
auto val = node_struct_to_val(node);
node_table_val->Assign(key, val);
Unref(key);
}
return rval;
}
std::string zeek::Supervisor::Create(const RecordVal* node_config)
std::string zeek::Supervisor::Create(const RecordVal* node_val)
{
// TODO: return error msg on fail, or empty on success
auto node = node_val_to_struct(node_val);
if ( nodes.find(node.name) != nodes.end() )
return fmt("node with name '%s' already exists", node.name.data());
std::string msg = fmt("create %s", node.name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
nodes.emplace(node.name, node);
return "";
}
bool zeek::Supervisor::Destroy(const std::string& nodes)
bool zeek::Supervisor::Destroy(const std::string& node_name)
{
// TODO: return true if a matching node exists
// TODO: handle node classes
if ( ! nodes.erase(node_name) )
return false;
std::string msg = fmt("destroy %s", node_name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
return true;
}
bool zeek::Supervisor::Restart(const std::string& nodes)
bool zeek::Supervisor::Restart(const std::string& node_name)
{
// TODO: return true if a matching node exists
// TODO: handle node classes
if ( nodes.find(node_name) == nodes.end() )
return false;
std::string msg = fmt("restart %s", node_name.data());
safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1);
return true;
}

View file

@ -6,6 +6,7 @@
#include <vector>
#include <utility>
#include <memory>
#include <map>
#include "iosource/IOSource.h"
#include "Pipe.h"
@ -24,6 +25,10 @@ public:
std::string zeek_exe_path;
};
struct Node {
std::string name;
};
Supervisor(Config cfg, std::unique_ptr<bro::Pipe> stem_pipe, pid_t stem_pid);
~Supervisor();
@ -33,10 +38,10 @@ public:
void ObserveChildSignal();
RecordVal* Status(const std::string& nodes);
std::string Create(const RecordVal* node_config);
bool Destroy(const std::string& nodes);
bool Restart(const std::string& nodes);
RecordVal* Status(const std::string& node_name);
std::string Create(const RecordVal* node);
bool Destroy(const std::string& node_name);
bool Restart(const std::string& node_name);
private:
@ -57,6 +62,7 @@ private:
pid_t stem_pid;
std::unique_ptr<bro::Pipe> stem_pipe;
bro::Flare signal_flare;
std::map<std::string, Node> nodes;
};
extern Supervisor* supervisor;

View file

@ -7,16 +7,16 @@
module Supervisor;
type Supervisor::Status: record;
type Supervisor::NodeConfig: record;
type Supervisor::Node: record;
function Supervisor::__status%(nodes: string%): Supervisor::Status
%{
return zeek::supervisor->Status(nodes->CheckString());
%}
function Supervisor::__create%(config: Supervisor::NodeConfig%): string
function Supervisor::__create%(node: Supervisor::Node%): string
%{
auto rval = zeek::supervisor->Create(config->AsRecordVal());
auto rval = zeek::supervisor->Create(node->AsRecordVal());
return new StringVal(rval);
%}