From 36051dc9a1b3e4470130723f4849f471ae0d5cea Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 17:06:39 -0700 Subject: [PATCH 1/8] Add support for setting environment variables via supervisor The NodeConfig record now has a table for specifying environment variable names and values, which the supervisor sets in the created node. This also repositions the cpu_affinity member to keep the order the same in the corresponding script-layer and in-core types. Includes testcase. --- scripts/base/frameworks/supervisor/api.zeek | 2 + src/supervisor/Supervisor.cc | 45 ++++++++++++++ src/supervisor/Supervisor.h | 4 ++ .../supervisor.config-env/zeek.node.out | 3 + .../supervisor.config-env/zeek.supervisor.out | 4 ++ testing/btest/supervisor/config-env.zeek | 62 +++++++++++++++++++ 6 files changed, 120 insertions(+) create mode 100644 testing/btest/Baseline/supervisor.config-env/zeek.node.out create mode 100644 testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out create mode 100644 testing/btest/supervisor/config-env.zeek diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 78ef9cbeed..fa32d23b53 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -44,6 +44,8 @@ export { stderr_file: string &optional; ## Additional script filenames/paths that the node should load. scripts: vector of string &default = vector(); + ## Environment variables to define in the supervised node. + env: table[string] of string &default=table(); ## A cpu/core number to which the node will try to pin itself. cpu_affinity: int &optional; ## The Cluster Layout definition. Each node in the Cluster Framework diff --git a/src/supervisor/Supervisor.cc b/src/supervisor/Supervisor.cc index d751387a1e..d56f9b1083 100644 --- a/src/supervisor/Supervisor.cc +++ b/src/supervisor/Supervisor.cc @@ -1259,6 +1259,21 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node) rval.scripts.emplace_back(std::move(script)); } + auto env_table_val = node->GetField("env")->AsTableVal(); + auto env_table = env_table_val->AsTable(); + + for ( const auto& ee : *env_table ) + { + auto k = ee.GetHashKey(); + auto* v = ee.GetValue(); + + auto key = env_table_val->RecreateIndex(*k); + auto name = key->Idx(0)->AsStringVal()->ToStdString(); + auto val = v->GetVal()->AsStringVal()->ToStdString(); + + rval.env[name] = val; + } + auto cluster_table_val = node->GetField("cluster")->AsTableVal(); auto cluster_table = cluster_table_val->AsTable(); @@ -1314,6 +1329,11 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) for ( auto it = scripts.Begin(); it != scripts.End(); ++it ) rval.scripts.emplace_back(it->GetString()); + auto& env = j["env"]; + + for ( auto it = env.MemberBegin(); it != env.MemberEnd(); ++it ) + rval.env[it->name.GetString()] = it->value.GetString(); + auto& cluster = j["cluster"]; for ( auto it = cluster.MemberBegin(); it != cluster.MemberEnd(); ++it ) @@ -1373,6 +1393,17 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const rval->AssignField("scripts", std::move(scripts_val)); + auto et = rt->GetFieldType("env"); + auto env_val = make_intrusive(std::move(et)); + rval->AssignField("env", env_val); + + for ( const auto& e : env ) + { + auto name = make_intrusive(e.first); + auto val = make_intrusive(e.second); + env_val->Assign(std::move(name), std::move(val)); + } + auto tt = rt->GetFieldType("cluster"); auto cluster_val = make_intrusive(std::move(tt)); rval->AssignField("cluster", cluster_val); @@ -1454,6 +1485,7 @@ bool SupervisedNode::InitCluster() const { const auto& node_name = e.first; const auto& ep = e.second; + auto key = make_intrusive(node_name); auto val = make_intrusive(cluster_node_type); @@ -1533,6 +1565,19 @@ void SupervisedNode::Init(Options* options) const node_name.data(), strerror(errno)); } + if ( ! config.env.empty() ) + { + for ( const auto& e : config.env ) + { + if ( setenv(e.first.c_str(), e.second.c_str(), true) == -1 ) + { + fprintf(stderr, "node '%s' failed to setenv: %s\n", + node_name.data(), strerror(errno)); + exit(1); + } + } + } + if ( ! config.cluster.empty() ) { if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 ) diff --git a/src/supervisor/Supervisor.h b/src/supervisor/Supervisor.h index 5dbc878207..5c05c9e1ed 100644 --- a/src/supervisor/Supervisor.h +++ b/src/supervisor/Supervisor.h @@ -190,6 +190,10 @@ public: * Additional script filename/paths that the node should load. */ std::vector scripts; + /** + * Environment variables and values to define in the node. + */ + std::map env; /** * The Cluster Layout definition. Each node in the Cluster Framework * knows about the full, static cluster topology to which it belongs. diff --git a/testing/btest/Baseline/supervisor.config-env/zeek.node.out b/testing/btest/Baseline/supervisor.config-env/zeek.node.out new file mode 100644 index 0000000000..56945e2def --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-env/zeek.node.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() with env foo=bar +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out b/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out new file mode 100644 index 0000000000..b5c69a253d --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervisor zeek_init() +destroying node +supervisor zeek_done() diff --git a/testing/btest/supervisor/config-env.zeek b/testing/btest/supervisor/config-env.zeek new file mode 100644 index 0000000000..dce6593612 --- /dev/null +++ b/testing/btest/supervisor/config-env.zeek @@ -0,0 +1,62 @@ +# @TEST-PORT: BROKER_PORT +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff zeek/supervisor.out +# @TEST-EXEC: btest-diff zeek/node.out + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global supervisor_output_file: file; +global node_output_file: file; +global topic = "test-topic"; + +event do_destroy() + { + print supervisor_output_file, "destroying node"; + Supervisor::destroy("grault"); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + supervisor_output_file = open("supervisor.out"); + print supervisor_output_file, "supervisor zeek_init()"; + local sn = Supervisor::NodeConfig($name="grault", $env=table( + ["foo"] = "bar" + )); + local res = Supervisor::create(sn); + + if ( res != "" ) + print supervisor_output_file, res; + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + node_output_file = open("node.out"); + print node_output_file, fmt("supervised node zeek_init() with env foo=%s", getenv("foo")); + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + # Should only be run by supervisor + terminate(); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + print node_output_file, "supervised node zeek_done()"; + else + print supervisor_output_file, "supervisor zeek_done()"; + } From efaa9ec3be4267c32562959e51231f4cd8bccdfb Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 18:01:05 -0700 Subject: [PATCH 2/8] Add support for making the supervisor listen for requests The supervisor now starts listening on the configured Broker default address and port when the new boolean SupervisorControl::enable_listen is T. Listening remains disabled by default. Listening allows nodes to communicate with the supervisor via the events laid out in control.zeek, to conduct further node management. --- scripts/base/frameworks/supervisor/control.zeek | 5 +++++ scripts/base/frameworks/supervisor/main.zeek | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/scripts/base/frameworks/supervisor/control.zeek b/scripts/base/frameworks/supervisor/control.zeek index fa20a9dba6..ed9c083cb9 100644 --- a/scripts/base/frameworks/supervisor/control.zeek +++ b/scripts/base/frameworks/supervisor/control.zeek @@ -4,6 +4,7 @@ ##! That is, it may change in various incompatible ways without warning or ##! deprecation until the stable 4.0.0 release. +@load base/frameworks/broker @load ./api module SupervisorControl; @@ -15,6 +16,10 @@ export { ## for their topic names. const topic_prefix = "zeek/supervisor" &redef; + ## When enabled, the Supervisor will listen on the configured Broker + ## :zeek:see:`Broker::default_listen_address`. + const enable_listen = F &redef; + ## Send a request to a remote Supervisor process to create a node. ## ## reqid: an arbitrary string that will be directly echoed in the response diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index f892907055..8b0161b79e 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -3,7 +3,6 @@ @load ./api @load ./control -@load base/frameworks/broker function Supervisor::status(node: string): Supervisor::Status { @@ -42,6 +41,13 @@ function Supervisor::node(): Supervisor::NodeConfig event zeek_init() &priority=10 { + if ( Supervisor::is_supervisor() && SupervisorControl::enable_listen ) + { + Broker::listen(Broker::default_listen_address, + Broker::default_port, + Broker::default_listen_retry); + } + Broker::subscribe(SupervisorControl::topic_prefix); } From 7bee79b400b8fe048397aa208290b9faab3eb0f7 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Wed, 7 Jul 2021 15:17:39 -0700 Subject: [PATCH 3/8] Add optional bare-mode boolean flag to Supervisor's node configuration When omitted, the node inherits the Supervisor's bare-mode status. When true/false, the new Zeek node will enable/disable bare mode, respectively. It continues to load any scripts passed at the command line and in the additional scripts list already provided in the node configuration. Includes testcase. --- scripts/base/frameworks/supervisor/api.zeek | 3 + src/supervisor/Supervisor.cc | 14 ++++ src/supervisor/Supervisor.h | 5 ++ .../zeek.bare.node.out | 4 + .../zeek.default.node.out | 4 + .../zeek.inherit.node.out | 4 + .../btest/supervisor/config-bare-mode.zeek | 74 +++++++++++++++++++ 7 files changed, 108 insertions(+) create mode 100644 testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out create mode 100644 testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out create mode 100644 testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out create mode 100644 testing/btest/supervisor/config-bare-mode.zeek diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index fa32d23b53..01c41f6f14 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -42,6 +42,9 @@ export { stdout_file: string &optional; ## The filename/path to which the node's stderr will be redirected. stderr_file: string &optional; + ## Whether to start the node in bare mode. When left out, the node + ## inherits the bare-mode status the supervisor itself runs with. + bare_mode: bool &optional; ## Additional script filenames/paths that the node should load. scripts: vector of string &default = vector(); ## Environment variables to define in the supervised node. diff --git a/src/supervisor/Supervisor.cc b/src/supervisor/Supervisor.cc index d56f9b1083..ac16bae1ff 100644 --- a/src/supervisor/Supervisor.cc +++ b/src/supervisor/Supervisor.cc @@ -1251,6 +1251,11 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node) if ( affinity_val ) rval.cpu_affinity = affinity_val->AsInt(); + const auto& bare_mode_val = node->GetField("bare_mode"); + + if ( bare_mode_val ) + rval.bare_mode = bare_mode_val->AsBool(); + auto scripts_val = node->GetField("scripts")->AsVectorVal(); for ( auto i = 0u; i < scripts_val->Size(); ++i ) @@ -1324,6 +1329,9 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) if ( auto it = j.FindMember("cpu_affinity"); it != j.MemberEnd() ) rval.cpu_affinity = it->value.GetInt(); + if ( auto it = j.FindMember("bare_mode"); it != j.MemberEnd() ) + rval.bare_mode = it->value.GetBool(); + auto& scripts = j["scripts"]; for ( auto it = scripts.Begin(); it != scripts.End(); ++it ) @@ -1385,6 +1393,9 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const if ( cpu_affinity ) rval->AssignField("cpu_affinity", *cpu_affinity); + if ( bare_mode ) + rval->AssignField("bare_mode", *bare_mode); + auto st = rt->GetFieldType("scripts"); auto scripts_val = make_intrusive(std::move(st)); @@ -1590,6 +1601,9 @@ void SupervisedNode::Init(Options* options) const options->filter_supervised_node_options(); + if ( config.bare_mode ) + options->bare_mode = *config.bare_mode; + if ( config.interface ) options->interface = *config.interface; diff --git a/src/supervisor/Supervisor.h b/src/supervisor/Supervisor.h index 5c05c9e1ed..0fb02d35b9 100644 --- a/src/supervisor/Supervisor.h +++ b/src/supervisor/Supervisor.h @@ -186,6 +186,11 @@ public: * A cpu/core number to which the node will try to pin itself. */ std::optional cpu_affinity; + /** + * Whether to start the node in bare mode. When not present, the + * node inherits the bare-mode status of the supervisor. + */ + std::optional bare_mode; /** * Additional script filename/paths that the node should load. */ diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out new file mode 100644 index 0000000000..45c8a185df --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +bare mode +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out new file mode 100644 index 0000000000..9a9a51018c --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +default mode +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out new file mode 100644 index 0000000000..45c8a185df --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +bare mode +supervised node zeek_done() diff --git a/testing/btest/supervisor/config-bare-mode.zeek b/testing/btest/supervisor/config-bare-mode.zeek new file mode 100644 index 0000000000..0e3f9c77e2 --- /dev/null +++ b/testing/btest/supervisor/config-bare-mode.zeek @@ -0,0 +1,74 @@ +# This test verifies the functionality of the bare_mode flag in NodeConfig. +# We launch two nodes, one regular, one in bare mode. Each outputs a different +# string depending on mode, and exits. We verify the resulting outputs. + +# @TEST-PORT: BROKER_PORT +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff zeek/inherit/node.out +# @TEST-EXEC: btest-diff zeek/bare/node.out +# @TEST-EXEC: btest-diff zeek/default/node.out + + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global node_output_file: file; +global topic = "test-topic"; + +event do_destroy(name: string) + { + Supervisor::destroy(name); + + # When no nodes are left, exit. + local status = Supervisor::status(); + if ( |status$nodes| == 0) + terminate(); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + # Create a node that inherits basre mode from us. + local sn = Supervisor::NodeConfig($name="inherit", $directory="inherit"); + Supervisor::create(sn); + + # Create a node that specifies bare mode. + sn = Supervisor::NodeConfig($name="bare", $directory="bare", $bare_mode=T); + Supervisor::create(sn); + + # Create a node that specifies default mode. + sn = Supervisor::NodeConfig($name="default", $directory="default", $bare_mode=F); + Supervisor::create(sn); + + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + node_output_file = open("node.out"); + print node_output_file, "supervised node zeek_init()"; + +# This is only defined when we're loading init-default.zeek: +@ifdef ( Notice::Info ) + print node_output_file, "default mode"; +@else + print node_output_file, "bare mode"; +@endif + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy, Supervisor::node()$name); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + print node_output_file, "supervised node zeek_done()"; + } From a3623bfb2d1134a918f627fd916042bcee635973 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 17:17:09 -0700 Subject: [PATCH 4/8] Establish a separate init script when using the supervisor The supervisor does not require the full weight of scripts that init-default.zeek brings with it. The new file, init-supervisor.zeek, contains only what's required by the supervisor in addition to the other always-loaded init files. --- scripts/base/init-supervisor.zeek | 5 +++++ src/zeek-setup.cc | 9 ++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 scripts/base/init-supervisor.zeek diff --git a/scripts/base/init-supervisor.zeek b/scripts/base/init-supervisor.zeek new file mode 100644 index 0000000000..da5af4b8b4 --- /dev/null +++ b/scripts/base/init-supervisor.zeek @@ -0,0 +1,5 @@ +##! This script loads functionality needed by the supervisor. Zeek only sources +##! this when the supervisor is active (-j). Like init-default.zeek, this isn't +##! loaded in bare mode. + +@load base/frameworks/supervisor diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 240e2f2a37..838bf86ef8 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -532,7 +532,14 @@ SetupResult setup(int argc, char** argv, Options* zopts) add_essential_input_file("base/init-frameworks-and-bifs.zeek"); if ( ! options.bare_mode ) - add_input_file("base/init-default.zeek"); + { + // The supervisor only needs to load a limited set of + // scripts, since it won't be doing traffic processing. + if ( options.supervisor_mode ) + add_input_file("base/init-supervisor.zeek"); + else + add_input_file("base/init-default.zeek"); + } add_input_file("builtin-plugins/__preload__.zeek"); add_input_file("builtin-plugins/__load__.zeek"); From c744702f945cd093a62ee733b934c5e683ad10dc Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 17:24:56 -0700 Subject: [PATCH 5/8] Introduce cluster controller and cluster agent scripting This is a preliminary implementation of a subset of the functionality set out in our cluster controller architecture. The controller is the central management node, existing once in any Zeek cluster. The agent is a node that runs once per instance, where an instance will commonly be a physical machine. The agent in turn manages the "data cluster", i.e. the traditional notion of a Zeek cluster with manager, worker nodes, etc. Agent and controller live in the policy folder, and are activated when loading policy/frameworks/cluster/agent and policy/frameworks/cluster/controller, respectively. Both run in nodes forked by the supervisor. When Zeek doesn't use the supervisor, they do nothing. Otherwise, boot.zeek instructs the supervisor to create the respective node, running main.zeek. Both controller and agent have their own config.zeek with relevant knobs. For both, controller/types.zeek provides common data types, and controller/log.zeek provides basic logging (without logger communication -- no such node might exist). A primitive request-tracking abstraction can be found in controller/request.zeek to track outstanding request events and their subsequent responses. --- .../frameworks/cluster/agent/__load__.zeek | 5 + .../policy/frameworks/cluster/agent/api.zeek | 33 +++ .../policy/frameworks/cluster/agent/boot.zeek | 35 +++ .../frameworks/cluster/agent/config.zeek | 85 ++++++ .../policy/frameworks/cluster/agent/main.zeek | 223 ++++++++++++++++ .../cluster/controller/__load__.zeek | 5 + .../frameworks/cluster/controller/api.zeek | 16 ++ .../frameworks/cluster/controller/boot.zeek | 29 ++ .../frameworks/cluster/controller/config.zeek | 85 ++++++ .../frameworks/cluster/controller/log.zeek | 109 ++++++++ .../frameworks/cluster/controller/main.zeek | 250 ++++++++++++++++++ .../cluster/controller/request.zeek | 86 ++++++ .../frameworks/cluster/controller/types.zeek | 80 ++++++ 13 files changed, 1041 insertions(+) create mode 100644 scripts/policy/frameworks/cluster/agent/__load__.zeek create mode 100644 scripts/policy/frameworks/cluster/agent/api.zeek create mode 100644 scripts/policy/frameworks/cluster/agent/boot.zeek create mode 100644 scripts/policy/frameworks/cluster/agent/config.zeek create mode 100644 scripts/policy/frameworks/cluster/agent/main.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/__load__.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/api.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/boot.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/config.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/log.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/main.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/request.zeek create mode 100644 scripts/policy/frameworks/cluster/controller/types.zeek diff --git a/scripts/policy/frameworks/cluster/agent/__load__.zeek b/scripts/policy/frameworks/cluster/agent/__load__.zeek new file mode 100644 index 0000000000..1db332f544 --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/__load__.zeek @@ -0,0 +1,5 @@ +# The entry point for the cluster agent. It only runs bootstrap logic for +# launching via the Supervisor. If we're not running the Supervisor, this does +# nothing. + +@load ./boot diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek new file mode 100644 index 0000000000..a5334fbbef --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -0,0 +1,33 @@ +@load base/frameworks/supervisor/control +@load policy/frameworks/cluster/controller/types + +module ClusterAgent::API; + +export { + const version = 1; + + # Agent API events + + global set_configuration_request: event(reqid: string, + config: ClusterController::Types::Configuration); + global set_configuration_response: event(reqid: string, + result: ClusterController::Types::Result); + + # Notification events, agent -> controller + + # Report agent being available. + global notify_agent_hello: event(instance: string, host: addr, + api_version: count); + + # Report node state changes. + global notify_change: event(instance: string, + n: ClusterController::Types::Node, + old: ClusterController::Types::State, + new: ClusterController::Types::State); + + # Report operational error. + global notify_error: event(instance: string, msg: string, node: string &default=""); + + # Report informational message. + global notify_log: event(instance: string, msg: string, node: string &default=""); +} diff --git a/scripts/policy/frameworks/cluster/agent/boot.zeek b/scripts/policy/frameworks/cluster/agent/boot.zeek new file mode 100644 index 0000000000..3eed5f6dd9 --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/boot.zeek @@ -0,0 +1,35 @@ +@load ./config + +# The agent needs the supervisor to listen for node management requests. We +# need to tell it to do so, and we need to do so here, in the agent +# bootstrapping code, so the redef applies prior to the fork of the agent +# process itself. +redef SupervisorControl::enable_listen = T; + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + local epi = ClusterAgent::endpoint_info(); + local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T, + $scripts=vector("policy/frameworks/cluster/agent/main.zeek")); + + if ( ClusterAgent::directory != "" ) + sn$directory = ClusterAgent::directory; + if ( ClusterAgent::stdout_file_suffix != "" ) + sn$stdout_file = epi$id + "." + ClusterAgent::stdout_file_suffix; + if ( ClusterAgent::stderr_file_suffix != "" ) + sn$stderr_file = epi$id + "." + ClusterAgent::stderr_file_suffix; + + # This helps Zeek run controller and agent with a minimal set of scripts. + sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "AGENT"; + + local res = Supervisor::create(sn); + + if ( res != "" ) + { + print(fmt("error: supervisor could not create agent node: %s", res)); + exit(1); + } + } diff --git a/scripts/policy/frameworks/cluster/agent/config.zeek b/scripts/policy/frameworks/cluster/agent/config.zeek new file mode 100644 index 0000000000..2e836d08ab --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/config.zeek @@ -0,0 +1,85 @@ +@load policy/frameworks/cluster/controller/types + +module ClusterAgent; + +export { + # The name this agent uses to represent the cluster instance + # it manages. When the environment variable isn't set and there's, + # no redef, this falls back to "agent-". + const name = getenv("ZEEK_AGENT_NAME") &redef; + + # Agent stdout/stderr log files to produce in Zeek's working + # directory. If empty, no such logs will result. The actual + # log files have the agent's name (as per above) dot-prefixed. + const stdout_file_suffix = "agent.stdout" &redef; + const stderr_file_suffix = "agent.stderr" &redef; + + # The address and port the agent listens on. When + # undefined, falls back to configurable default values. + const listen_address = getenv("ZEEK_AGENT_ADDR") &redef; + const default_address = Broker::default_listen_address &redef; + + const listen_port = getenv("ZEEK_AGENT_PORT") &redef; + const default_port = 2151/tcp &redef; + + # The agent communicates under to following topic prefix, + # suffixed with "/" (see above): + const topic_prefix = "zeek/cluster-control/agent" &redef; + + # The coordinates of the controller. When defined, it means + # agents peer with (connect to) the controller; otherwise the + # controller knows all agents and peers with them. + const controller: Broker::NetworkInfo = [ + $address="0.0.0.0", $bound_port=0/unknown] &redef; + + # Agent and controller currently log only, not via the data cluster's + # logger. (This might get added later.) For now, this means that + # if both write to the same log file, it gets garbled. The following + # lets you specify the working directory specifically for the agent. + const directory = "" &redef; + + # Working directory for data cluster nodes. When relative, note + # that this will apply from the working directory of the agent, + # since it creates data cluster nodes. + const cluster_directory = "" &redef; + + # The following functions return the effective network endpoint + # information for this agent, in two related forms. + global instance: function(): ClusterController::Types::Instance; + global endpoint_info: function(): Broker::EndpointInfo; +} + +function instance(): ClusterController::Types::Instance + { + local epi = endpoint_info(); + return ClusterController::Types::Instance($name=epi$id, + $host=to_addr(epi$network$address), + $listen_port=epi$network$bound_port); + } + +function endpoint_info(): Broker::EndpointInfo + { + local epi: Broker::EndpointInfo; + local network: Broker::NetworkInfo; + + if ( ClusterAgent::name != "" ) + epi$id = ClusterAgent::name; + else + epi$id = fmt("agent-%s", gethostname()); + + if ( ClusterAgent::listen_address != "" ) + network$address = ClusterAgent::listen_address; + else if ( ClusterAgent::default_address != "" ) + network$address = ClusterAgent::default_address; + else + network$address = "127.0.0.1"; + + if ( ClusterAgent::listen_port != "" ) + network$bound_port = to_port(ClusterAgent::listen_port); + else + network$bound_port = ClusterAgent::default_port; + + epi$network = network; + + return epi; + } diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek new file mode 100644 index 0000000000..1956d47d0c --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -0,0 +1,223 @@ +@load base/frameworks/broker + +@load policy/frameworks/cluster/controller/config +@load policy/frameworks/cluster/controller/log +@load policy/frameworks/cluster/controller/request + +@load ./api + +redef ClusterController::role = ClusterController::Types::AGENT; + +# The global configuration as passed to us by the controller +global global_config: ClusterController::Types::Configuration; + +# A map to make other instance info accessible +global instances: table[string] of ClusterController::Types::Instance; + +# A map for the nodes we run on this instance, via this agent. +global nodes: table[string] of ClusterController::Types::Node; + +# The node map employed by the supervisor to describe the cluster +# topology to newly forked nodes. We refresh it when we receive +# new configurations. +global data_cluster: table[string] of Supervisor::ClusterEndpoint; + +event SupervisorControl::create_response(reqid: string, result: string) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + local name = req$supervisor_state$node; + + if ( |result| > 0 ) + { + local msg = fmt("failed to create node %s: %s", name, result); + ClusterController::Log::error(msg); + event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); + } + + ClusterController::Request::finish(reqid); + } + +event SupervisorControl::destroy_response(reqid: string, result: bool) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + local name = req$supervisor_state$node; + + if ( ! result ) + { + local msg = fmt("failed to destroy node %s, %s", name, reqid); + ClusterController::Log::error(msg); + event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); + } + + ClusterController::Request::finish(reqid); + } + +function supervisor_create(nc: Supervisor::NodeConfig) + { + local req = ClusterController::Request::create(); + req$supervisor_state = ClusterController::Request::SupervisorState($node = nc$name); + event SupervisorControl::create_request(req$id, nc); + ClusterController::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); + } + +function supervisor_destroy(node: string) + { + local req = ClusterController::Request::create(); + req$supervisor_state = ClusterController::Request::SupervisorState($node = node); + event SupervisorControl::destroy_request(req$id, node); + ClusterController::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); + } + +event ClusterAgent::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_request %s", reqid)); + + local nodename: string; + local node: ClusterController::Types::Node; + local nc: Supervisor::NodeConfig; + local msg: string; + + # Adopt the global configuration provided. + # XXX this can later handle validation and persistence + # XXX should do this transactionally, only set when all else worked + global_config = config; + + # Refresh the instances table: + instances = table(); + for ( inst in config$instances ) + instances[inst$name] = inst; + + # Terminate existing nodes + for ( nodename in nodes ) + supervisor_destroy(nodename); + + nodes = table(); + + # Refresh the data cluster and nodes tables + + data_cluster = table(); + for ( node in config$nodes ) + { + if ( node$instance == ClusterAgent::name ) + nodes[node$name] = node; + + local cep = Supervisor::ClusterEndpoint( + $role = node$role, + $host = instances[node$instance]$host, + $p = node$p); + + if ( node?$interface ) + cep$interface = node$interface; + + data_cluster[node$name] = cep; + } + + # Apply the new configuration via the supervisor + + for ( nodename in nodes ) + { + node = nodes[nodename]; + nc = Supervisor::NodeConfig($name=nodename); + + if ( ClusterAgent::cluster_directory != "" ) + nc$directory = ClusterAgent::cluster_directory; + + if ( node?$interface ) + nc$interface = node$interface; + if ( node?$cpu_affinity ) + nc$cpu_affinity = node$cpu_affinity; + if ( node?$scripts ) + nc$scripts = node$scripts; + if ( node?$env ) + nc$env = node$env; + + # XXX could use options to enable per-node overrides for + # directory, stdout, stderr, others? + + nc$cluster = data_cluster; + supervisor_create(nc); + } + + # XXX this currently doesn not fail if any of above problems occurred, + # mainly due to the tediousness of handling the supervisor's response + # events asynchonously. The only indication of error will be + # notification events to the controller. + + local res = ClusterController::Types::Result( + $reqid = reqid, + $instance = ClusterAgent::name); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s", reqid)); + event ClusterAgent::API::set_configuration_response(reqid, res); + } + +event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) + { + # This does not (cannot?) immediately verify that the new peer + # is in fact a controller, so we might send this redundantly. + # Controllers handle the hello event accordingly. + + local epi = ClusterAgent::endpoint_info(); + # XXX deal with unexpected peers, unless we're okay with it + event ClusterAgent::API::notify_agent_hello(epi$id, + to_addr(epi$network$address), ClusterAgent::API::version); + } + +event zeek_init() + { + local epi = ClusterAgent::endpoint_info(); + local agent_topic = ClusterAgent::topic_prefix + "/" + epi$id; + + # The agent needs to peer with the supervisor -- this doesn't currently + # happen automatically. The address defaults to Broker's default, which + # relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker + # internally falls back to listening on any; we pick 127.0.0.1. + local supervisor_addr = Broker::default_listen_address; + if ( supervisor_addr == "" ) + supervisor_addr = "127.0.0.1"; + + Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry); + + # Agents need receive communication targeted at it, and any responses + # from the supervisor. + Broker::subscribe(agent_topic); + Broker::subscribe(SupervisorControl::topic_prefix); + + # Auto-publish a bunch of events. Glob patterns or module-level + # auto-publish would be helpful here. + Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log); + + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + + # Establish connectivity with the controller. + if ( ClusterAgent::controller$address != "0.0.0.0" ) + { + # We connect to the controller. + Broker::peer(ClusterAgent::controller$address, + ClusterAgent::controller$bound_port, + ClusterController::connect_retry); + } + else + { + # Controller connects to us; listen for it. + Broker::listen(cat(epi$network$address), epi$network$bound_port); + } + + ClusterController::Log::info("agent is live"); + } diff --git a/scripts/policy/frameworks/cluster/controller/__load__.zeek b/scripts/policy/frameworks/cluster/controller/__load__.zeek new file mode 100644 index 0000000000..c88fde804b --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/__load__.zeek @@ -0,0 +1,5 @@ +# The entry point for the cluster controller. It only runs bootstrap logic for +# launching via the Supervisor. If we're not running the Supervisor, this does +# nothing. + +@load ./boot diff --git a/scripts/policy/frameworks/cluster/controller/api.zeek b/scripts/policy/frameworks/cluster/controller/api.zeek new file mode 100644 index 0000000000..4d3e1ba70d --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/api.zeek @@ -0,0 +1,16 @@ +@load ./types + +module ClusterController::API; + +export { + const version = 1; + + global get_instances_request: event(reqid: string); + global get_instances_response: event(reqid: string, + instances: vector of ClusterController::Types::Instance); + + global set_configuration_request: event(reqid: string, + config: ClusterController::Types::Configuration); + global set_configuration_response: event(reqid: string, + result: ClusterController::Types::ResultVec); +} diff --git a/scripts/policy/frameworks/cluster/controller/boot.zeek b/scripts/policy/frameworks/cluster/controller/boot.zeek new file mode 100644 index 0000000000..9d23731946 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/boot.zeek @@ -0,0 +1,29 @@ +@load ./config + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + local epi = ClusterController::endpoint_info(); + local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T, + $scripts=vector("policy/frameworks/cluster/controller/main.zeek")); + + if ( ClusterController::directory != "" ) + sn$directory = ClusterController::directory; + if ( ClusterController::stdout_file != "" ) + sn$stdout_file = ClusterController::stdout_file; + if ( ClusterController::stderr_file != "" ) + sn$stderr_file = ClusterController::stderr_file; + + # This helps Zeek run controller and agent with a minimal set of scripts. + sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "CONTROLLER"; + + local res = Supervisor::create(sn); + + if ( res != "" ) + { + print(fmt("error: supervisor could not create controller node: %s", res)); + exit(1); + } + } diff --git a/scripts/policy/frameworks/cluster/controller/config.zeek b/scripts/policy/frameworks/cluster/controller/config.zeek new file mode 100644 index 0000000000..36c4a0b5bd --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/config.zeek @@ -0,0 +1,85 @@ +@load policy/frameworks/cluster/agent/config + +module ClusterController; + +export { + # The name of this controller in the cluster. + # Without the environment variable and no redef, this + # falls back to "controller-". + const name = getenv("ZEEK_CONTROLLER_NAME") &redef; + + # Controller stdout/stderr log files to produce in Zeek's + # working directory. If empty, no such logs will result. + const stdout_file = "controller.stdout" &redef; + const stderr_file = "controller.stderr" &redef; + + # The address and port the controller listens on. When + # undefined, falls back to the default_address, which you can + # likewise customize. + const listen_address = getenv("ZEEK_CONTROLLER_ADDR") &redef; + const default_address = Broker::default_listen_address &redef; + + const listen_port = getenv("ZEEK_CONTROLLER_PORT") &redef; + const default_port = 2150/tcp &redef; + + # A more aggressive default retry interval (vs default 30s) + const connect_retry = 1sec &redef; + + # The controller listens for messages on this topic: + const topic = "zeek/cluster-control/controller" &redef; + + # The set of agents to interact with. When this is non-empty + # at startup, the controller contacts the agents; when it is + # empty, it waits for agents to connect. They key is a name of + # each instance. This should match the $name member of the + # instance records. + const instances: table[string] of ClusterController::Types::Instance = { } &redef; + + # The role of this node in cluster management. Agent and + # controller both redef this. Used during logging. + const role = ClusterController::Types::NONE &redef; + + # Agent and controller currently log only, not via the data cluster's + # logger. (This might get added later.) For now, this means that + # if both write to the same log file, it gets garbled. The following + # lets you specify the working directory specifically for the agent. + const directory = "" &redef; + + # The following functions return the effective network endpoint + # information for this controller, in two related forms. + global network_info: function(): Broker::NetworkInfo; + global endpoint_info: function(): Broker::EndpointInfo; +} + +function network_info(): Broker::NetworkInfo + { + local ni: Broker::NetworkInfo; + + if ( ClusterController::listen_address != "" ) + ni$address = ClusterController::listen_address; + else if ( ClusterController::default_address != "" ) + ni$address = ClusterController::default_address; + else + ni$address = "127.0.0.1"; + + if ( ClusterController::listen_port != "" ) + ni$bound_port = to_port(ClusterController::listen_port); + else + ni$bound_port = ClusterController::default_port; + + return ni; + } + +function endpoint_info(): Broker::EndpointInfo + { + local epi: Broker::EndpointInfo; + + if ( ClusterController::name != "" ) + epi$id = ClusterController::name; + else + epi$id = fmt("controller-%s", gethostname()); + + epi$network = network_info(); + + return epi; + } diff --git a/scripts/policy/frameworks/cluster/controller/log.zeek b/scripts/policy/frameworks/cluster/controller/log.zeek new file mode 100644 index 0000000000..49aeb9b282 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/log.zeek @@ -0,0 +1,109 @@ +@load ./config + +module ClusterController::Log; + +export { + ## The cluster logging stream identifier. + redef enum Log::ID += { LOG }; + + ## A default logging policy hook for the stream. + global log_policy: Log::PolicyHook; + + type Level: enum { + DEBUG, + INFO, + WARNING, + ERROR, + }; + + ## The record type which contains the column fields of the cluster log. + type Info: record { + ## The time at which a cluster message was generated. + ts: time; + ## The name of the node that is creating the log record. + node: string; + ## Log level of this message, converted from the above Level enum + level: string; + ## The role of the node, translated from ClusterController::Types::Role. + role: string; + ## A message indicating information about cluster controller operation. + message: string; + } &log; + + global log_level = DEBUG &redef; + + global info: function(message: string); + global warning: function(message: string); + global error: function(message: string); +} + +# Enum translations to strings. This avoids those enums being reported +# with full qualifications in the logs, which is too verbose. + +global l2s: table[Level] of string = { + [DEBUG] = "DEBUG", + [INFO] = "INFO", + [WARNING] = "WARNING", + [ERROR] = "ERROR", +}; + +global r2s: table[ClusterController::Types::Role] of string = { + [ClusterController::Types::AGENT] = "AGENT", + [ClusterController::Types::CONTROLLER] = "CONTROLLER", +}; + +function debug(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(DEBUG) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[DEBUG], + $role=r2s[ClusterController::role], $message=message]); + } + +function info(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(INFO) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[INFO], + $role=r2s[ClusterController::role], $message=message]); + } + +function warning(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(WARNING) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[WARNING], + $role=r2s[ClusterController::role], $message=message]); + } + +function error(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(ERROR) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[ERROR], + $role=r2s[ClusterController::role], $message=message]); + } + +event zeek_init() + { + if ( ! Supervisor::is_supervised() ) + return; + + local node = Supervisor::node(); + + # Defining the stream outside of the stream creation call sidesteps + # the coverage.find-bro-logs test, which tries to inventory all logs. + # This log isn't yet ready for that level of scrutiny. + local stream = Log::Stream($columns=Info, $path=fmt("cluster-%s", node$name), + $policy=log_policy); + + Log::create_stream(ClusterController::Log::LOG, stream); + } diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek new file mode 100644 index 0000000000..e82378527f --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -0,0 +1,250 @@ +@load base/frameworks/broker + +@load policy/frameworks/cluster/agent/config +@load policy/frameworks/cluster/agent/api + +@load ./api +@load ./log +@load ./request + +redef ClusterController::role = ClusterController::Types::CONTROLLER; + +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + # See if we already know about this agent; if not, register + # it. + # + # XXX protection against rogue agents? + + if ( instance in ClusterController::instances ) + { + # Do nothing, unless this known agent checks in with a mismatching + # API version, in which case we kick it out. + if ( api_version != ClusterController::API::version ) + { + local inst = ClusterController::instances[instance]; + if ( inst?$listen_port ) + { + # We peered with this instance, unpeer. + Broker::unpeer(cat(inst$host), inst$listen_port ); + # XXX what to do if they connected to us? + } + delete ClusterController::instances[instance]; + } + + # Update the instance name in the pointed-to record, in case it + # was previously named otherwise. Not being too picky here allows + # the user some leeway in spelling out the original config. + ClusterController::instances[instance]$name = instance; + + return; + } + + if ( api_version != ClusterController::API::version ) + { + ClusterController::Log::warning( + fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), unpeering", + instance, host, api_version, ClusterController::API::version)); + } + + ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host); + ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host)); + } + + +event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node, + old: ClusterController::Types::State, + new: ClusterController::Types::State) + { + # XXX TODO + } + +event ClusterAgent::API::notify_error(instance: string, msg: string, node: string) + { + # XXX TODO + } + +event ClusterAgent::API::notify_log(instance: string, msg: string, node: string) + { + # XXX TODO + } + +event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(areq) ) + return; + + # Record the result and mark the request as done. This also + # marks the request as done in the parent-level request, since + # these records are stored by reference. + areq$results[0] = result; # We only have a single result here atm + areq$finished = T; + + # Update the original request from the client: + local req = ClusterController::Request::lookup(areq$parent_id); + if ( ClusterController::Request::is_null(req) ) + return; + + # If there are any requests to the agents still unfinished, + # we're not done yet. + for ( i in req$set_configuration_state$requests ) + if ( ! req$set_configuration_state$requests[i]$finished ) + return; + + # All set_configuration requests to instances are done, so respond + # back to client. We need to compose the result, aggregating + # the results we got from the requests to the agents. In the + # end we have one Result per instance requested in the + # original set_configuration_request. + # + # XXX we can likely generalize result aggregation in the request module. + for ( i in req$set_configuration_state$requests ) + { + local r = req$set_configuration_state$requests[i]; + + local success = T; + local errors: string_vec; + local instance = ""; + + for ( j in r$results ) + { + local res = r$results[j]; + instance = res$instance; + + if ( res$success ) + next; + + success = F; + errors += fmt("node %s failed: %s", res$node, res$error); + } + + req$results += ClusterController::Types::Result( + $reqid = req$id, + $instance = instance, + $success = success, + $error = join_string_vec(errors, ", ") + ); + + ClusterController::Request::finish(r$id); + } + + ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", req$id)); + event ClusterController::API::set_configuration_response(req$id, req$results); + ClusterController::Request::finish(req$id); + } + +event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) + { + ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid)); + + local req = ClusterController::Request::create(reqid); + req$set_configuration_state = ClusterController::Request::SetConfigurationState(); + + # Compare new configuration to the current one and send updates + # to the instances as needed. + if ( config?$instances ) + { + # XXX properly handle instance update: connect to new instances provided + # when they are listening, accept connections from new instances that are + # not + for ( inst in config$instances ) + { + if ( inst$name !in ClusterController::instances ) + { + local res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name); + res$error = fmt("instance %s is unknown, skipping", inst$name); + req$results += res; + } + } + } + + # XXX validate the configuration: + # - Are node instances among defined instances? + # - Are all names unique? + # - Are any node options understood? + # - Do node types with optional fields have required values? + # ... + + # Transmit the configuration on to the agents. They need to be aware of + # each other's location and nodes, so the data cluster nodes can connect + # (for example, so a worker on instance 1 can connect to a logger on + # instance 2). + for ( name in ClusterController::instances ) + { + local agent_topic = ClusterAgent::topic_prefix + "/" + name; + local areq = ClusterController::Request::create(); + areq$parent_id = reqid; + + # We track the requests sent off to each agent. As the + # responses come in, we can check them off as completed, + # and once all are, we respond back to the client. + req$set_configuration_state$requests += areq; + + # XXX could also broadcast just once on the agent prefix, but + # explicit request/response pairs for each agent seems cleaner. + ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", + areq$id, name)); + Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config); + } + + # Response event gets sent via the agents' reponse event. + } + +event ClusterController::API::get_instances_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid)); + + local insts: vector of ClusterController::Types::Instance; + + for ( i in ClusterController::instances ) + insts += ClusterController::instances[i]; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid)); + event ClusterController::API::get_instances_response(reqid, insts); + } + +event zeek_init() + { + # Controller always listens -- it needs to be able to respond + # to the Zeek client. This port is also used by the agents + # if they connect to the client. + local cni = ClusterController::network_info(); + Broker::listen(cat(cni$address), cni$bound_port); + + Broker::subscribe(ClusterAgent::topic_prefix); + Broker::subscribe(ClusterController::topic); + + Broker::auto_publish(ClusterController::topic, + ClusterController::API::get_instances_response); + Broker::auto_publish(ClusterController::topic, + ClusterController::API::set_configuration_response); + + if ( |ClusterController::instances| > 0 ) + { + # We peer with the agents -- otherwise, the agents peer + # with (i.e., connect to) us. + for ( i in ClusterController::instances ) + { + local inst = ClusterController::instances[i]; + + if ( ! inst?$listen_port ) + { + # XXX config error -- this must be there + next; + } + + Broker::peer(cat(inst$host), inst$listen_port, + ClusterController::connect_retry); + } + } + + # If ClusterController::instances is empty, agents peer with + # us and we do nothing. We'll build up state as the + # notify_agent_hello() events come int. + + ClusterController::Log::info("controller is live"); + } diff --git a/scripts/policy/frameworks/cluster/controller/request.zeek b/scripts/policy/frameworks/cluster/controller/request.zeek new file mode 100644 index 0000000000..868b84d0f0 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/request.zeek @@ -0,0 +1,86 @@ +@load ./types + +module ClusterController::Request; + +export { + type Request: record { + id: string; + parent_id: string &optional; + }; + + # API-specific state. XXX we may be able to generalize after this + # has settled a bit more. + + # State specific to the set_configuration request/response events + type SetConfigurationState: record { + requests: vector of Request &default=vector(); + }; + + # State specific to the set_nodes request/response events + type SetNodesState: record { + requests: vector of Request &default=vector(); + }; + + # State specific to supervisor interactions + type SupervisorState: record { + node: string; + }; + + # The redef is a workaround so we can use the Request type + # while it is still being defined + redef record Request += { + results: ClusterController::Types::ResultVec &default=vector(); + finished: bool &default=F; + + set_configuration_state: SetConfigurationState &optional; + set_nodes_state: SetNodesState &optional; + supervisor_state: SupervisorState &optional; + }; + + global null_req = Request($id="", $finished=T); + + global create: function(reqid: string &default=unique_id("")): Request; + global lookup: function(reqid: string): Request; + global finish: function(reqid: string): bool; + + global is_null: function(request: Request): bool; +} + +# XXX this needs a mechanism for expiring stale requests +global requests: table[string] of Request; + +function create(reqid: string): Request + { + local ret = Request($id=reqid); + requests[reqid] = ret; + return ret; + } + +function lookup(reqid: string): Request + { + if ( reqid in requests ) + return requests[reqid]; + + return null_req; + } + +function finish(reqid: string): bool + { + if ( reqid !in requests ) + return F; + + local req = requests[reqid]; + delete requests[reqid]; + + req$finished = T; + + return T; + } + +function is_null(request: Request): bool + { + if ( request$id == "" ) + return T; + + return F; + } diff --git a/scripts/policy/frameworks/cluster/controller/types.zeek b/scripts/policy/frameworks/cluster/controller/types.zeek new file mode 100644 index 0000000000..e2e0899a88 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/types.zeek @@ -0,0 +1,80 @@ +# Types for the Cluster Controller framework. These are used by both agent and controller. + +module ClusterController::Types; + +export { + ## Management infrastructure node type. This intentionally does not + ## include the data cluster node types (worker, logger, etc) -- those + ## continue to be managed by the cluster framework. + type Role: enum { + NONE, + AGENT, + CONTROLLER, + }; + + ## A Zeek-side option with value. + type Option: record { + name: string; # Name of option + value: string; # Value of option + }; + + ## Configuration describing a Zeek instance running a Cluster + ## Agent. Normally, there'll be one instance per cluster + ## system: a single physical system. + type Instance: record { + # Unique, human-readable instance name + name: string; + # IP address of system + host: addr; + # Agent listening port. Not needed if agents connect to controller. + listen_port: port &optional; + }; + + ## State that a Cluster Node can be in. State changes trigger an + ## API notification (see notify_change()). + type State: enum { + Running, # Running and operating normally + Stopped, # Explicitly stopped + Failed, # Failed to start; and permanently halted + Crashed, # Crashed, will be restarted, + Unknown, # State not known currently (e.g., because of lost connectivity) + }; + + ## Configuration describing a Cluster Node process. + type Node: record { + name: string; # Cluster-unique, human-readable node name + instance: string; # Name of instance where node is to run + p: port; # Port on which this node will listen + role: Supervisor::ClusterRole; # Role of the node. + state: State; # Desired, or current, run state. + scripts: vector of string &optional; # Additional Zeek scripts for node + options: set[Option] &optional; # Zeek options for node + interface: string &optional; # Interface to sniff + cpu_affinity: int &optional; # CPU/core number to pin to + env: table[string] of string &default=table(); # Custom environment vars + }; + + # Data structure capturing a cluster's complete configuration. + type Configuration: record { + id: string &default=unique_id(""); # Unique identifier for a particular configuration + + ## The instances in the cluster. + ## XXX we may be able to make this optional + instances: set[Instance]; + + ## The set of nodes in the cluster, as distributed over the instances. + nodes: set[Node]; + }; + + # Return value for request-response API event pairs + type Result: record { + reqid: string; # Request ID of operation this result refers to + instance: string; # Name of associated instance (for context) + success: bool &default=T; # True if successful + data: any &optional; # Addl data returned for successful operation + error: string &default=""; # Descriptive error on failure + node: string &optional; # Name of associated node (for context) + }; + + type ResultVec: vector of Result; +} From 04dda8b4a78d73c906abb0ee50d8dc929755d969 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 18:38:51 -0700 Subject: [PATCH 6/8] Update baselines affected by cluster controller changes --- scripts/test-all-policy.zeek | 13 +++++++++++++ scripts/zeekygen/__load__.zeek | 2 ++ .../btest/Baseline/coverage.bare-mode-errors/errors | 6 +++--- .../Baseline/coverage.init-default/missing_loads | 1 + testing/btest/Baseline/plugins.hooks/output | 6 ++++++ 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 610712716f..ea36a239ef 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -11,6 +11,19 @@ # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/agent/__load__.zeek +@load frameworks/cluster/agent/api.zeek +@load frameworks/cluster/agent/boot.zeek +@load frameworks/cluster/agent/config.zeek +# @load frameworks/cluster/agent/main.zeek +@load frameworks/cluster/controller/__load__.zeek +@load frameworks/cluster/controller/api.zeek +@load frameworks/cluster/controller/boot.zeek +@load frameworks/cluster/controller/config.zeek +@load frameworks/cluster/controller/log.zeek +# @load frameworks/cluster/controller/main.zeek +@load frameworks/cluster/controller/request.zeek +@load frameworks/cluster/controller/types.zeek @load frameworks/dpd/detect-protocols.zeek @load frameworks/dpd/packet-segment-logging.zeek @load frameworks/intel/do_notice.zeek diff --git a/scripts/zeekygen/__load__.zeek b/scripts/zeekygen/__load__.zeek index 5c41a673a6..75a5f7a666 100644 --- a/scripts/zeekygen/__load__.zeek +++ b/scripts/zeekygen/__load__.zeek @@ -4,6 +4,8 @@ @load protocols/ssl/notary.zeek @load frameworks/control/controllee.zeek @load frameworks/control/controller.zeek +@load frameworks/cluster/agent/main.zeek +@load frameworks/cluster/controller/main.zeek @load frameworks/files/extract-all-files.zeek @load policy/misc/dump-events.zeek @load policy/protocols/conn/speculative-service.zeek diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index a8808581ef..ed8c1aa4a4 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,9 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. -warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:10 "Remove in v5.1. Use log-certs-base64.zeek instead." +warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:12 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:45 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:45 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:58 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:58 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:4 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 79aa700647..fe23c7a04a 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -10,3 +10,4 @@ -./frameworks/openflow/cluster.zeek -./frameworks/packet-filter/cluster.zeek -./frameworks/sumstats/cluster.zeek +-./init-supervisor.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 5a9e7de7a5..12e752da97 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -661,6 +661,8 @@ 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugin, , (SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals})) -> 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugin, , (SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average})) -> 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugins, , ()) -> +0.000000 MetaHookPost CallFunction(Supervisor::__is_supervisor, , ()) -> +0.000000 MetaHookPost CallFunction(Supervisor::is_supervisor, , ()) -> 0.000000 MetaHookPost CallFunction(__init_primary_bifs, , ()) -> 0.000000 MetaHookPost CallFunction(__init_secondary_bifs, , ()) -> 0.000000 MetaHookPost CallFunction(current_time, , ()) -> @@ -1710,6 +1712,8 @@ 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugin, , (SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals})) 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugin, , (SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average})) 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugins, , ()) +0.000000 MetaHookPre CallFunction(Supervisor::__is_supervisor, , ()) +0.000000 MetaHookPre CallFunction(Supervisor::is_supervisor, , ()) 0.000000 MetaHookPre CallFunction(__init_primary_bifs, , ()) 0.000000 MetaHookPre CallFunction(__init_secondary_bifs, , ()) 0.000000 MetaHookPre CallFunction(current_time, , ()) @@ -2758,6 +2762,8 @@ 0.000000 | HookCallFunction SumStats::register_observe_plugin(SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals}) 0.000000 | HookCallFunction SumStats::register_observe_plugin(SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average}) 0.000000 | HookCallFunction SumStats::register_observe_plugins() +0.000000 | HookCallFunction Supervisor::__is_supervisor() +0.000000 | HookCallFunction Supervisor::is_supervisor() 0.000000 | HookCallFunction __init_primary_bifs() 0.000000 | HookCallFunction __init_secondary_bifs() 0.000000 | HookCallFunction current_time() From a6b0fde65ff9803200f59520bfe8a8cc595838f2 Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 17:47:46 -0700 Subject: [PATCH 7/8] Add zeek-client via new submodule The new module resides in auxil/zeek-client. It does not get installed unless one configures with --enable-zeek-client. --- .gitmodules | 3 +++ CMakeLists.txt | 3 ++- auxil/zeek-client | 1 + configure | 4 ++++ 4 files changed, 10 insertions(+), 1 deletion(-) create mode 160000 auxil/zeek-client diff --git a/.gitmodules b/.gitmodules index 326d21d7e1..4318212fe0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -46,3 +46,6 @@ [submodule "auxil/package-manager"] path = auxil/package-manager url = https://github.com/zeek/package-manager +[submodule "auxil/zeek-client"] + path = auxil/zeek-client + url = https://github.com/zeek/zeek-client diff --git a/CMakeLists.txt b/CMakeLists.txt index 775bbecce0..b15b8cc050 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -616,11 +616,12 @@ CheckOptionalBuildSources(auxil/package-manager ZKG INSTALL_ZKG) CheckOptionalBuildSources(auxil/zeekctl ZeekControl INSTALL_ZEEKCTL) CheckOptionalBuildSources(auxil/zeek-aux Zeek-Aux INSTALL_AUX_TOOLS) CheckOptionalBuildSources(auxil/zeek-archiver ZeekArchiver INSTALL_ZEEK_ARCHIVER) +CheckOptionalBuildSources(auxil/zeek-client ZeekClient INSTALL_ZEEK_CLIENT) ######################################################################## ## Packaging Setup -if ( INSTALL_ZEEKCTL OR INSTALL_ZKG ) +if ( INSTALL_ZEEKCTL OR INSTALL_ZKG OR INSTALL_ZEEK_CLIENT ) # CPack RPM Generator may not automatically detect this set(CPACK_RPM_PACKAGE_REQUIRES "python >= ${ZEEK_PYTHON_MIN}") endif () diff --git a/auxil/zeek-client b/auxil/zeek-client new file mode 160000 index 0000000000..afe253c775 --- /dev/null +++ b/auxil/zeek-client @@ -0,0 +1 @@ +Subproject commit afe253c77591e87b2a6cf6d5682cd02caa78e9d2 diff --git a/configure b/configure index 6db1dcfc8d..f40846bc43 100755 --- a/configure +++ b/configure @@ -64,6 +64,7 @@ Usage: $0 [OPTION]... [VAR=VALUE]... --enable-static-broker build Broker statically (ignored if --with-broker is specified) --enable-static-binpac build binpac statically (ignored if --with-binpac is specified) --enable-cpp-tests build Zeek's C++ unit tests + --enable-zeek-client install the Zeek cluster management client (experimental) --disable-zeekctl don't install ZeekControl --disable-auxtools don't build or install auxiliary tools --disable-archiver don't build or install zeek-archiver tool @@ -290,6 +291,9 @@ while [ $# -ne 0 ]; do --enable-cpp-tests) append_cache_entry ENABLE_ZEEK_UNIT_TESTS BOOL true ;; + --enable-zeek-client) + append_cache_entry INSTALL_ZEEK_CLIENT BOOL true + ;; --disable-zeekctl) append_cache_entry INSTALL_ZEEKCTL BOOL false ;; From 767cf2b5187ae0bad9b55001ec552c1a9d62112d Mon Sep 17 00:00:00 2001 From: Christian Kreibich Date: Tue, 6 Jul 2021 21:29:17 -0700 Subject: [PATCH 8/8] Add a cluster controller testcase for agent-controller checkin This verifies that in a setup with the supervisor creating both controller and agent, the agent successfully checks in with the controller. --- .../zeek.controller.stdout | 2 + .../cluster/controller/agent-checkin.zeek | 60 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout create mode 100644 testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek diff --git a/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout b/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout new file mode 100644 index 0000000000..3ce8a1f373 --- /dev/null +++ b/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +notify_agent_hello agent 127.0.0.1 1 diff --git a/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek new file mode 100644 index 0000000000..cf1304e3f3 --- /dev/null +++ b/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek @@ -0,0 +1,60 @@ +# This test verifies basic agent-controller communication. We launch agent and +# controller via the supervisor, add an extra handler for the notify_agent_hello +# event that travels agent -> controller, and verify its print output in the +# controller's stdout log. + +# The following env vars is known to the controller framework +# @TEST-PORT: ZEEK_CONTROLLER_PORT +# @TEST-PORT: BROKER_PORT + +# A bit of a detour to get the port number into the agent configuration +# @TEST-EXEC: btest-bg-run zeek zeek -j %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/controller.stdout + +@load policy/frameworks/cluster/agent +@load policy/frameworks/cluster/controller + +redef Broker::default_port = to_port(getenv("BROKER_PORT")); + +redef ClusterController::name = "controller"; +redef ClusterAgent::name = "agent"; + +# Tell the agent where to locate the controller. +redef ClusterAgent::controller = [$address="127.0.0.1", $bound_port=to_port(getenv("ZEEK_CONTROLLER_PORT"))]; + +@if ( Supervisor::is_supervised() ) + +@load policy/frameworks/cluster/agent/api + +global logged = F; + +event zeek_init() + { + # We're using the controller to shut everything down once the + # notify_agent_hello event has arrived. The controller doesn't normally + # talk to the supervisor, so connect to it. + if ( Supervisor::node()$name == "controller" ) + { + Broker::peer(getenv("ZEEK_DEFAULT_LISTEN_ADDRESS"), Broker::default_port, Broker::default_listen_retry); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + } + } + +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + if ( Supervisor::node()$name == "controller" ) + { + # On rare occasion it can happen that we log this twice, which'll need + # investigating. For now we ensure we only do so once. + if ( ! logged ) + print(fmt("notify_agent_hello %s %s %s", instance, host, api_version)); + + logged = T; + + # This takes down the whole process tree. + event SupervisorControl::stop_request(); + } + } + +@endif