From 0ff99c3df8a369f9d06910a22281a601c7da7978 Mon Sep 17 00:00:00 2001 From: Jon Siwek Date: Mon, 13 Jan 2020 20:09:05 -0800 Subject: [PATCH] Separate supervisor node config and status data structures --- scripts/base/frameworks/supervisor/api.zeek | 15 ++-- scripts/base/frameworks/supervisor/main.zeek | 4 +- src/Supervisor.cc | 93 +++++++++++--------- src/Supervisor.h | 31 +++++-- src/main.cc | 7 +- src/supervisor.bif | 7 +- 6 files changed, 93 insertions(+), 64 deletions(-) diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 09a80d6cb5..d11d8a2da5 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -19,7 +19,7 @@ export { interface: string &optional; }; - type Node: record { + type NodeConfig: record { name: string; interface: string &optional; directory: string &optional; @@ -28,18 +28,19 @@ export { scripts: vector of string &default = vector(); cpu_affinity: int &optional; cluster: table[string] of ClusterEndpoint &default=table(); + }; - # TODO: separate node config fields from status fields ? - # TODO: add more status fields ? - pid: count &optional; + type NodeStatus: record { + node: NodeConfig; + pid: count; }; type Status: record { - nodes: table[string] of Node; + nodes: table[string] of NodeStatus; }; global status: function(nodes: string &default="all"): Status; - global create: function(node: Node): string; + global create: function(node: NodeConfig): string; global destroy: function(nodes: string): bool; global restart: function(nodes: string &default="all"): bool; @@ -51,7 +52,7 @@ export { global Supervisor::status_request: event(reqid: string, nodes: string); global Supervisor::status_response: event(reqid: string, result: Status); - global Supervisor::create_request: event(reqid: string, node: Node); + global Supervisor::create_request: event(reqid: string, node: NodeConfig); global Supervisor::create_response: event(reqid: string, result: string); global Supervisor::destroy_request: event(reqid: string, nodes: string); diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index b98fd59847..0dc93a64bc 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -28,7 +28,7 @@ event Supervisor::status_request(reqid: string, nodes: string) Broker::publish(topic, Supervisor::status_response, reqid, res); } -event Supervisor::create_request(reqid: string, node: Node) +event Supervisor::create_request(reqid: string, node: NodeConfig) { local res = Supervisor::create(node); local topic = Supervisor::topic_prefix + fmt("/create_response/%s", reqid); @@ -54,7 +54,7 @@ function Supervisor::status(nodes: string): Status return Supervisor::__status(nodes); } -function Supervisor::create(node: Node): string +function Supervisor::create(node: NodeConfig): string { return Supervisor::__create(node); } diff --git a/src/Supervisor.cc b/src/Supervisor.cc index ff26ca1f28..0ce4329861 100644 --- a/src/Supervisor.cc +++ b/src/Supervisor.cc @@ -30,11 +30,11 @@ struct Stem { ~Stem(); - Supervisor::Node* Run(); + std::optional Run(); - Supervisor::Node* Poll(); + std::optional Poll(); - Supervisor::Node* Revive(); + std::optional Revive(); void Reap(); @@ -108,7 +108,7 @@ static std::vector extract_messages(std::string* buffer) return rval; } -static std::string make_create_message(const Supervisor::Node& node) +static std::string make_create_message(const Supervisor::NodeConfig& node) { auto json_str = node.ToJSON(); return fmt("create %s %s", node.name.data(), json_str.data()); @@ -274,13 +274,13 @@ void Supervisor::HandleChildSignal() for ( const auto& n : nodes ) { const auto& node = n.second; - auto msg = make_create_message(node); + auto msg = make_create_message(node.config); safe_write(stem_pipe->OutFD(), msg.data(), msg.size() + 1); } } void Supervisor::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, - iosource::FD_Set* except) + iosource::FD_Set* except) { read->Insert(signal_flare.FD()); read->Insert(stem_pipe->InFD()); @@ -378,7 +378,7 @@ bool Stem::Wait(Supervisor::Node* node, int options) const if ( res == -1 ) { fprintf(stderr, "Stem failed to get node exit status %s (%d): %s\n", - node->name.data(), node->pid, strerror(errno)); + node->Name().data(), node->pid, strerror(errno)); return false; } @@ -387,17 +387,17 @@ bool Stem::Wait(Supervisor::Node* node, int options) const node->exit_status = WEXITSTATUS(status); // TODO: may be some cases where the node is intended to exit printf("node '%s' exited with status %d\n", - node->name.data(), node->exit_status); + node->Name().data(), node->exit_status); } else if ( WIFSIGNALED(status) ) { node->signal_number = WTERMSIG(status); printf("node '%s' terminated by signal %d\n", - node->name.data(), node->signal_number); + node->Name().data(), node->signal_number); } else fprintf(stderr, "Stem failed to get node exit status %s (%d)\n", - node->name.data(), node->pid); + node->Name().data(), node->pid); node->pid = 0; return true; @@ -409,7 +409,7 @@ void Stem::KillNode(const Supervisor::Node& node, int signal) const if ( kill_res == -1 ) fprintf(stderr, "Failed to send signal to node %s: %s", - node.name.data(), strerror(errno)); + node.Name().data(), strerror(errno)); } void Stem::Destroy(Supervisor::Node* node) const @@ -428,12 +428,12 @@ void Stem::Destroy(Supervisor::Node* node) const break; printf("Stem waiting to destroy node: %s (%d)\n", - node->name.data(), node->pid); + node->Name().data(), node->pid); sleep(kill_delay); } } -Supervisor::Node* Stem::Revive() +std::optional Stem::Revive() { constexpr auto attempts_before_delay_increase = 3; constexpr auto delay_increase_factor = 2; @@ -468,7 +468,7 @@ Supervisor::Node* Stem::Revive() node.revival_delay *= delay_increase_factor; if ( Spawn(&node) ) - return new Supervisor::Node(node); + return node.config; ReportStatus(node); } @@ -483,19 +483,19 @@ bool Stem::Spawn(Supervisor::Node* node) if ( node_pid == -1 ) { fprintf(stderr, "failed to fork Zeek node '%s': %s\n", - node->name.data(), strerror(errno)); + node->Name().data(), strerror(errno)); return false; } if ( node_pid == 0 ) { - zeek::set_thread_name(fmt("zeek.%s", node->name.data())); + zeek::set_thread_name(fmt("zeek.%s", node->Name().data())); return true; } node->pid = node_pid; node->spawn_time = std::chrono::steady_clock::now(); - printf("Stem spawned node: %s (%d)\n", node->name.data(), node->pid); + printf("Stem spawned node: %s (%d)\n", node->Name().data(), node->pid); return false; } @@ -563,11 +563,11 @@ void Stem::Shutdown(int exit_code) void Stem::ReportStatus(const Supervisor::Node& node) const { - std::string msg = fmt("status %s %d", node.name.data(), node.pid); + std::string msg = fmt("status %s %d", node.Name().data(), node.pid); safe_write(pipe->OutFD(), msg.data(), msg.size() + 1); } -Supervisor::Node* Stem::Run() +std::optional Stem::Run() { for ( ; ; ) { @@ -580,7 +580,7 @@ Supervisor::Node* Stem::Run() return {}; } -Supervisor::Node* Stem::Poll() +std::optional Stem::Poll() { pollfd fds[2] = { { pipe->InFD(), POLLIN, 0 }, { signal_flare->FD(), POLLIN, 0} }; @@ -659,15 +659,16 @@ Supervisor::Node* Stem::Poll() { const auto& node_json = msg_tokens[2]; assert(nodes.find(node_name) == nodes.end()); - auto node = Supervisor::Node::FromJSON(node_json); + auto node_config = Supervisor::NodeConfig::FromJSON(node_json); + auto it = nodes.emplace(node_name, std::move(node_config)).first; + auto& node = it->second; if ( Spawn(&node) ) - return new Supervisor::Node(node); + return node.config; // TODO: get stem printfs going through standard Zeek debug.log - printf("Stem created node: %s (%d)\n", node.name.data(), node.pid); - auto it = nodes.emplace(node_name, std::move(node)).first; - ReportStatus(it->second); + printf("Stem created node: %s (%d)\n", node.Name().data(), node.pid); + ReportStatus(node); } else if ( cmd == "destroy" ) { @@ -687,7 +688,7 @@ Supervisor::Node* Stem::Poll() Destroy(&node); if ( Spawn(&node) ) - return new Supervisor::Node(node); + return node.config; ReportStatus(node); } @@ -698,7 +699,7 @@ Supervisor::Node* Stem::Poll() return {}; } -Supervisor::Node* Supervisor::RunStem(std::unique_ptr pipe) +std::optional Supervisor::RunStem(std::unique_ptr pipe) { Stem s(std::move(pipe)); return s.Run(); @@ -718,9 +719,9 @@ static BifEnum::Supervisor::ClusterRole role_str_to_enum(const std::string& r) return BifEnum::Supervisor::NONE; } -Supervisor::Node Supervisor::Node::FromRecord(const RecordVal* node) +Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node) { - Supervisor::Node rval; + Supervisor::NodeConfig rval; rval.name = node->Lookup("name")->AsString()->CheckString(); auto iface_val = node->Lookup("interface"); @@ -784,9 +785,9 @@ Supervisor::Node Supervisor::Node::FromRecord(const RecordVal* node) return rval; } -Supervisor::Node Supervisor::Node::FromJSON(std::string_view json) +Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) { - Supervisor::Node rval; + Supervisor::NodeConfig rval; auto j = nlohmann::json::parse(json); rval.name = j["name"]; @@ -836,7 +837,7 @@ Supervisor::Node Supervisor::Node::FromJSON(std::string_view json) return rval; } -std::string Supervisor::Node::ToJSON() const +std::string Supervisor::NodeConfig::ToJSON() const { auto re = std::make_unique("^_"); auto node_val = ToRecord(); @@ -845,9 +846,9 @@ std::string Supervisor::Node::ToJSON() const return rval; } -IntrusivePtr Supervisor::Node::ToRecord() const +IntrusivePtr Supervisor::NodeConfig::ToRecord() const { - auto rt = BifType::Record::Supervisor::Node; + auto rt = BifType::Record::Supervisor::NodeConfig; auto rval = make_intrusive(rt); rval->Assign(rt->FieldOffset("name"), new StringVal(name)); @@ -866,14 +867,14 @@ IntrusivePtr Supervisor::Node::ToRecord() const if ( cpu_affinity ) rval->Assign(rt->FieldOffset("cpu_affinity"), val_mgr->GetInt(*cpu_affinity)); - auto st = BifType::Record::Supervisor::Node->FieldType("scripts"); + auto st = BifType::Record::Supervisor::NodeConfig->FieldType("scripts"); auto scripts_val = new VectorVal(st->AsVectorType()); rval->Assign(rt->FieldOffset("scripts"), scripts_val); for ( const auto& s : scripts ) scripts_val->Assign(scripts_val->Size(), new StringVal(s)); - auto tt = BifType::Record::Supervisor::Node->FieldType("cluster"); + auto tt = BifType::Record::Supervisor::NodeConfig->FieldType("cluster"); auto cluster_val = new TableVal(tt->AsTableType()); rval->Assign(rt->FieldOffset("cluster"), cluster_val); @@ -895,12 +896,23 @@ IntrusivePtr Supervisor::Node::ToRecord() const cluster_val->Assign(key.get(), val.detach()); } + return rval; + } + +IntrusivePtr Supervisor::Node::ToRecord() const + { + auto rt = BifType::Record::Supervisor::NodeStatus; + auto rval = make_intrusive(rt); + + rval->Assign(rt->FieldOffset("node"), config.ToRecord().detach()); + if ( pid ) rval->Assign(rt->FieldOffset("pid"), val_mgr->GetCount(pid)); return rval; } + static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRole role) { static auto node_type = global_scope()->Lookup("Cluster::NodeType")->AsType()->AsEnumType(); @@ -919,7 +931,7 @@ static Val* supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRol } } -void Supervisor::Node::InitCluster() +void Supervisor::NodeConfig::InitCluster() { auto cluster_node_type = global_scope()->Lookup("Cluster::Node")->AsType()->AsRecordType(); auto cluster_nodes_id = global_scope()->Lookup("Cluster::nodes"); @@ -972,8 +984,9 @@ RecordVal* Supervisor::Status(std::string_view node_name) for ( const auto& n : nodes ) { + const auto& name = n.first; const auto& node = n.second; - auto key = make_intrusive(node.name); + auto key = make_intrusive(name); auto val = node.ToRecord(); node_table_val->Assign(key.get(), val.detach()); } @@ -983,11 +996,11 @@ RecordVal* Supervisor::Status(std::string_view node_name) std::string Supervisor::Create(const RecordVal* node_val) { - auto node = Supervisor::Node::FromRecord(node_val); + auto node = Supervisor::NodeConfig::FromRecord(node_val); return Create(node); } -std::string Supervisor::Create(const Supervisor::Node& node) +std::string Supervisor::Create(const Supervisor::NodeConfig& node) { if ( node.name.find(' ') != std::string::npos ) return fmt("node names must not contain spaces: '%s'", diff --git a/src/Supervisor.h b/src/Supervisor.h index 1eeab43998..8759c9004f 100644 --- a/src/Supervisor.h +++ b/src/Supervisor.h @@ -35,11 +35,10 @@ public: std::optional interface; }; - struct Node { - static Node FromRecord(const RecordVal* node_val); - static Node FromJSON(std::string_view json); - + struct NodeConfig { static void InitCluster(); + static NodeConfig FromRecord(const RecordVal* node_val); + static NodeConfig FromJSON(std::string_view json); std::string ToJSON() const; IntrusivePtr ToRecord() const; @@ -52,7 +51,18 @@ public: std::optional cpu_affinity; std::vector scripts; std::map cluster; + }; + struct Node { + IntrusivePtr ToRecord() const; + + const std::string& Name() const + { return config.name; } + + Node(NodeConfig arg_config) : config(std::move(arg_config)) + { } + + NodeConfig config; pid_t pid = 0; int exit_status = 0; int signal_number = 0; @@ -61,7 +71,9 @@ public: std::chrono::time_point spawn_time; }; - static Node* RunStem(std::unique_ptr pipe); + static std::optional RunStem(std::unique_ptr pipe); + + using NodeMap = std::map>; Supervisor(Config cfg, std::unique_ptr stem_pipe, pid_t stem_pid); @@ -74,10 +86,13 @@ public: RecordVal* Status(std::string_view node_name); std::string Create(const RecordVal* node); - std::string Create(const Supervisor::Node& node); + std::string Create(const Supervisor::NodeConfig& node); bool Destroy(std::string_view node_name); bool Restart(std::string_view node_name); + const NodeMap& Nodes() + { return nodes; } + private: // IOSource interface overrides: @@ -99,11 +114,11 @@ private: pid_t stem_pid; std::unique_ptr stem_pipe; bro::Flare signal_flare; - std::map> nodes; + NodeMap nodes; std::string msg_buffer; }; extern Supervisor* supervisor; -extern Supervisor::Node* supervised_node; +extern std::optional supervised_node; } // namespace zeek diff --git a/src/main.cc b/src/main.cc index 9b7a15ca8d..b6bd67982a 100644 --- a/src/main.cc +++ b/src/main.cc @@ -100,7 +100,7 @@ zeekygen::Manager* zeekygen_mgr = 0; iosource::Manager* iosource_mgr = 0; bro_broker::Manager* broker_mgr = 0; zeek::Supervisor* zeek::supervisor = 0; -zeek::Supervisor::Node* zeek::supervised_node = 0; +std::optional zeek::supervised_node; std::vector zeek_script_prefixes; Stmt* stmts; @@ -286,7 +286,7 @@ struct zeek_options { * and discard the rest. * @param node the supervised-node whose Zeek options are to be modified. */ - void filter_supervised_node_options(zeek::Supervisor::Node* node) + void filter_supervised_node_options(const zeek::Supervisor::NodeConfig& node) { auto og = *this; *this = {}; @@ -752,7 +752,6 @@ void terminate_bro() delete file_mgr; // broker_mgr is deleted via iosource_mgr // supervisor is deleted via iosource_mgr - delete zeek::supervised_node; delete iosource_mgr; delete log_mgr; delete reporter; @@ -992,7 +991,7 @@ int main(int argc, char** argv) node_name.data(), strerror(errno)); } - options.filter_supervised_node_options(zeek::supervised_node); + options.filter_supervised_node_options(*zeek::supervised_node); if ( zeek::supervised_node->interface ) options.interfaces.emplace_back(*zeek::supervised_node->interface); diff --git a/src/supervisor.bif b/src/supervisor.bif index f55e99f65c..41f99c6e73 100644 --- a/src/supervisor.bif +++ b/src/supervisor.bif @@ -16,7 +16,8 @@ enum ClusterRole %{ type Supervisor::ClusterEndpoint: record; type Supervisor::Status: record; -type Supervisor::Node: record; +type Supervisor::NodeConfig: record; +type Supervisor::NodeStatus: record; function Supervisor::__status%(nodes: string%): Supervisor::Status %{ @@ -29,7 +30,7 @@ function Supervisor::__status%(nodes: string%): Supervisor::Status return zeek::supervisor->Status(nodes->CheckString()); %} -function Supervisor::__create%(node: Supervisor::Node%): string +function Supervisor::__create%(node: Supervisor::NodeConfig%): string %{ if ( ! zeek::supervisor ) { @@ -78,7 +79,7 @@ function Supervisor::__init_cluster%(%): bool function Supervisor::__is_supervised%(%): bool %{ - return val_mgr->GetBool(zeek::supervised_node != nullptr); + return val_mgr->GetBool(zeek::supervised_node.has_value()); %} function Supervisor::__is_supervisor%(%): bool