diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 31480bda51..25b1f274d5 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -4,18 +4,19 @@ module Supervisor; export { - type Status: record { - # TODO: add proper status fields - n: count; - }; - - type NodeConfig: record { - # TODO: add proper config field + type Node: record { + # TODO: add proper config fields name: string; }; + type Status: record { + # TODO: add proper status fields + n: count; + nodes: table[string] of Node; + }; + global status: function(nodes: string &default="all"): Status; - global create: function(config: NodeConfig): string; + global create: function(node: Node): string; global destroy: function(nodes: string): bool; global restart: function(nodes: string &default="all"): bool; @@ -24,7 +25,7 @@ export { global Supervisor::status_request: event(id: count, nodes: string); global Supervisor::status_response: event(id: count, result: Status); - global Supervisor::create_request: event(id: count, config: NodeConfig); + global Supervisor::create_request: event(id: count, node: Node); global Supervisor::create_response: event(id: count, result: string); global Supervisor::destroy_request: event(id: count, nodes: string); diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index 3dc0651003..ee5abe0818 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -28,9 +28,9 @@ event Supervisor::status_request(id: count, nodes: string) Broker::publish(topic, Supervisor::status_response, id, res); } -event Supervisor::create_request(id: count, config: NodeConfig) +event Supervisor::create_request(id: count, node: Node) { - local res = Supervisor::create(config); + local res = Supervisor::create(node); local topic = Supervisor::topic_prefix + "/create_response"; Broker::publish(topic, Supervisor::create_response, id, res); } @@ -54,9 +54,9 @@ function Supervisor::status(nodes: string): Status return Supervisor::__status(nodes); } -function create(config: NodeConfig): string +function create(node: Node): string { - return Supervisor::__create(config); + return Supervisor::__create(node); } function destroy(nodes: string): bool diff --git a/src/Supervisor.cc b/src/Supervisor.cc index d6b7620abd..3493ac301f 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include "Supervisor.h" #include "Reporter.h" @@ -201,43 +202,171 @@ void zeek::Supervisor::RunStem(std::unique_ptr pipe) fprintf(stderr, "failed to set stem process group: %s\n", strerror(errno)); + std::string msg_buffer; + std::map nodes; + + auto extract_messages = [](std::string* buf) -> std::vector + { + std::vector rval; + + for ( ; ; ) + { + auto msg_end = buf->find('\0'); + + if ( msg_end == std::string::npos ) + // Don't have a full message yet + break; + + auto msg = buf->substr(0, msg_end); + rval.emplace_back(std::move(msg)); + buf->erase(0, msg_end + 1); + } + + return rval; + }; + for ( ; ; ) { - // TODO: make a proper I/O loop w/ message processing via pipe - // TODO: better way to detect loss of parent than polling + // TODO: better way to detect loss of parent than polling ? + + pollfd fds = { pipe->ReadFD(), POLLIN, 0 }; + constexpr auto poll_timeout_ms = 1000; + auto res = poll(&fds, 1, poll_timeout_ms); + + if ( res < 0 ) + { + fprintf(stderr, "poll() failed: %s\n", strerror(errno)); + continue; + } if ( getppid() == 1 ) exit(0); - sleep(5); - printf("Stem wakeup\n"); - write(pipe->WriteFD(), "hi", 2); + if ( res == 0 ) + continue; + + char buf[256]; + int bytes_read = read(pipe->ReadFD(), buf, 256); + + if ( bytes_read == 0 ) + // EOF + exit(0); + + if ( bytes_read < 0 ) + { + fprintf(stderr, "read() failed: %s\n", strerror(errno)); + continue; + } + + msg_buffer.append(buf, bytes_read); + auto msgs = extract_messages(&msg_buffer); + + for ( auto& msg : msgs ) + { + // TODO: improve message format ... + std::vector msg_tokens; + tokenize_string(std::move(msg), " ", &msg_tokens); + const auto& cmd = msg_tokens[0]; + const auto& node_name = msg_tokens[1]; + + if ( cmd == "create" ) + { + auto res = nodes.emplace(node_name, Node{node_name}); + assert(res.second); + // TODO: fork + printf("Stem creating node: %s\n", node_name.data()); + } + else if ( cmd == "destroy" ) + { + auto res = nodes.erase(node_name); + assert(res > 0 ); + printf("Stem destroying node: %s\n", node_name.data()); + // TODO: kill + } + else if ( cmd == "restart" ) + { + auto it = nodes.find(node_name); + assert(it != nodes.end()); + printf("Stem restarting node: %s\n", node_name.data()); + // TODO: re-use logic for destroy then create + } + else + fprintf(stderr, "unknown supervisor message: %s", cmd.data()); + } } } -RecordVal* zeek::Supervisor::Status(const std::string& nodes) +static zeek::Supervisor::Node node_val_to_struct(const RecordVal* node) { + zeek::Supervisor::Node rval; + rval.name = node->Lookup("name")->AsString()->CheckString(); + return rval; + } + +static RecordVal* node_struct_to_val(const zeek::Supervisor::Node& node) + { + auto rval = new RecordVal(BifType::Record::Supervisor::Node); + rval->Assign(0, new StringVal(node.name)); + return rval; + } + +RecordVal* zeek::Supervisor::Status(const std::string& node_name) + { + // TODO: handle node classes // TODO: return real status information static auto count = 0; auto rval = new RecordVal(BifType::Record::Supervisor::Status); rval->Assign(0, val_mgr->GetCount(count++)); + + auto tt = BifType::Record::Supervisor::Status->FieldType("nodes"); + auto node_table_val = new TableVal(tt->AsTableType()); + rval->Assign(1, node_table_val); + + for ( const auto& n : nodes ) + { + const auto& node = n.second; + auto key = new StringVal(node.name); + auto val = node_struct_to_val(node); + node_table_val->Assign(key, val); + Unref(key); + } + return rval; } -std::string zeek::Supervisor::Create(const RecordVal* node_config) +std::string zeek::Supervisor::Create(const RecordVal* node_val) { - // TODO: return error msg on fail, or empty on success + auto node = node_val_to_struct(node_val); + + if ( nodes.find(node.name) != nodes.end() ) + return fmt("node with name '%s' already exists", node.name.data()); + + std::string msg = fmt("create %s", node.name.data()); + safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + nodes.emplace(node.name, node); return ""; } -bool zeek::Supervisor::Destroy(const std::string& nodes) +bool zeek::Supervisor::Destroy(const std::string& node_name) { - // TODO: return true if a matching node exists - return false; + // TODO: handle node classes + + if ( ! nodes.erase(node_name) ) + return false; + + std::string msg = fmt("destroy %s", node_name.data()); + safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + return true; } -bool zeek::Supervisor::Restart(const std::string& nodes) +bool zeek::Supervisor::Restart(const std::string& node_name) { - // TODO: return true if a matching node exists - return false; + // TODO: handle node classes + + if ( nodes.find(node_name) == nodes.end() ) + return false; + + std::string msg = fmt("restart %s", node_name.data()); + safe_write(stem_pipe->WriteFD(), msg.data(), msg.size() + 1); + return true; } diff --git a/src/Supervisor.h b/src/Supervisor.h index 0713b1f5cd..6ab2d15564 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "iosource/IOSource.h" #include "Pipe.h" @@ -24,6 +25,10 @@ public: std::string zeek_exe_path; }; + struct Node { + std::string name; + }; + Supervisor(Config cfg, std::unique_ptr stem_pipe, pid_t stem_pid); ~Supervisor(); @@ -33,10 +38,10 @@ public: void ObserveChildSignal(); - RecordVal* Status(const std::string& nodes); - std::string Create(const RecordVal* node_config); - bool Destroy(const std::string& nodes); - bool Restart(const std::string& nodes); + RecordVal* Status(const std::string& node_name); + std::string Create(const RecordVal* node); + bool Destroy(const std::string& node_name); + bool Restart(const std::string& node_name); private: @@ -57,6 +62,7 @@ private: pid_t stem_pid; std::unique_ptr stem_pipe; bro::Flare signal_flare; + std::map nodes; }; extern Supervisor* supervisor; diff --git a/src/supervisor.bif b/src/supervisor.bif index cac895e774..a6188d4687 100644 --- a/src/supervisor.bif +++ b/src/supervisor.bif @@ -7,16 +7,16 @@ module Supervisor; type Supervisor::Status: record; -type Supervisor::NodeConfig: record; +type Supervisor::Node: record; function Supervisor::__status%(nodes: string%): Supervisor::Status %{ return zeek::supervisor->Status(nodes->CheckString()); %} -function Supervisor::__create%(config: Supervisor::NodeConfig%): string +function Supervisor::__create%(node: Supervisor::Node%): string %{ - auto rval = zeek::supervisor->Create(config->AsRecordVal()); + auto rval = zeek::supervisor->Create(node->AsRecordVal()); return new StringVal(rval); %}