diff --git a/.gitmodules b/.gitmodules index 326d21d7e1..4318212fe0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -46,3 +46,6 @@ [submodule "auxil/package-manager"] path = auxil/package-manager url = https://github.com/zeek/package-manager +[submodule "auxil/zeek-client"] + path = auxil/zeek-client + url = https://github.com/zeek/zeek-client diff --git a/CHANGES b/CHANGES index cbffd5725d..5b4791cf61 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,74 @@ +4.1.0-dev.916 | 2021-07-08 16:49:55 -0700 + + * Add a cluster controller testcase for agent-controller checkin (Christian Kreibich, Corelight) + + This verifies that in a setup with the supervisor creating both controller and + agent, the agent successfully checks in with the controller. + + * Add zeek-client via new submodule (Christian Kreibich, Corelight) + + The new module resides in auxil/zeek-client. It does not get installed unless + one configures with --enable-zeek-client. + + * Introduce cluster controller and cluster agent scripting (Christian Kreibich, Corelight) + + This is a preliminary implementation of a subset of the functionality set out in + our cluster controller architecture. The controller is the central management + node, existing once in any Zeek cluster. The agent is a node that runs once per + instance, where an instance will commonly be a physical machine. The agent in + turn manages the "data cluster", i.e. the traditional notion of a Zeek cluster + with manager, worker nodes, etc. + + Agent and controller live in the policy folder, and are activated when loading + policy/frameworks/cluster/agent and policy/frameworks/cluster/controller, + respectively. Both run in nodes forked by the supervisor. When Zeek doesn't use + the supervisor, they do nothing. Otherwise, boot.zeek instructs the supervisor + to create the respective node, running main.zeek. + + Both controller and agent have their own config.zeek with relevant knobs. For + both, controller/types.zeek provides common data types, and controller/log.zeek + provides basic logging (without logger communication -- no such node might + exist). + + A primitive request-tracking abstraction can be found in controller/request.zeek + to track outstanding request events and their subsequent responses. + + * Establish a separate init script when using the supervisor (Christian Kreibich, Corelight) + + The supervisor does not require the full weight of scripts that + init-default.zeek brings with it. The new file, init-supervisor.zeek, contains + only what's required by the supervisor in addition to the other always-loaded + init files. + + * Add optional bare-mode boolean flag to Supervisor's node configuration (Christian Kreibich, Corelight) + + When omitted, the node inherits the Supervisor's bare-mode + status. When true/false, the new Zeek node will enable/disable bare + mode, respectively. It continues to load any scripts passed at the + command line and in the additional scripts list already provided in + the node configuration. + + Includes testcase. + + * Add support for making the supervisor listen for requests (Christian Kreibich, Corelight) + + The supervisor now starts listening on the configured Broker default address and + port when the new boolean SupervisorControl::enable_listen is T. Listening + remains disabled by default. Listening allows nodes to communicate with the + supervisor via the events laid out in control.zeek, to conduct further node + management. + + * Add support for setting environment variables via supervisor (Christian Kreibich, Corelight) + + The NodeConfig record now has a table for specifying environment variable names + and values, which the supervisor sets in the created node. + + This also repositions the cpu_affinity member to keep the order the same in + the corresponding script-layer and in-core types. + + Includes testcase. + 4.1.0-dev.907 | 2021-07-08 16:00:06 -0700 * Fix reference in the logging framework docs re. Log::StreamPolicyHook (Christian Kreibich, Corelight) diff --git a/CMakeLists.txt b/CMakeLists.txt index 775bbecce0..b15b8cc050 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -616,11 +616,12 @@ CheckOptionalBuildSources(auxil/package-manager ZKG INSTALL_ZKG) CheckOptionalBuildSources(auxil/zeekctl ZeekControl INSTALL_ZEEKCTL) CheckOptionalBuildSources(auxil/zeek-aux Zeek-Aux INSTALL_AUX_TOOLS) CheckOptionalBuildSources(auxil/zeek-archiver ZeekArchiver INSTALL_ZEEK_ARCHIVER) +CheckOptionalBuildSources(auxil/zeek-client ZeekClient INSTALL_ZEEK_CLIENT) ######################################################################## ## Packaging Setup -if ( INSTALL_ZEEKCTL OR INSTALL_ZKG ) +if ( INSTALL_ZEEKCTL OR INSTALL_ZKG OR INSTALL_ZEEK_CLIENT ) # CPack RPM Generator may not automatically detect this set(CPACK_RPM_PACKAGE_REQUIRES "python >= ${ZEEK_PYTHON_MIN}") endif () diff --git a/NEWS b/NEWS index 161fea5b44..4c739426bc 100644 --- a/NEWS +++ b/NEWS @@ -92,6 +92,27 @@ New Functionality - ``global disable_analyzer: function(tag: Files::Tag): bool;`` - ``global analyzer_enabled: function(tag: Files::Tag): bool;`` +- The Supervisor now defaults to starting with a minimal set of Zeek + scripts controlled by a new init file, ``base/init-supervisor.zeek``. + One may still run it with a larger configuration by loading additional + scripts, including ``init-default.zeek``, as always. (Bare mode continues + to work as usual, reducing the configuration to a minimum.) + + The ``NodeConfig`` record has two new members, providing additional + control over launched nodes. The ``env`` member allows setting environment + variables in the launched process. The ``bare_mode`` member, an optional + boolean, provides control over the bare-mode state of the new node. + When not provided, the node inherits the bare-mode status of the + Supervisor, and setting the variable enables/disables bare mode. + +- Zeek now includes an incomplete, preliminary version of the future + cluster controller framework. Loading ``policy/frameworks/cluster/agent`` + and/or ``policy/frameworks/cluster/agent`` in a Zeek running with the + Supervisor will launch the corresponding cluster management node(s). + An experimental management client, `zeek-client`, connects to the + controller and lets you issue commands. It requires configuration with + ``--enable-zeek-client``. This does not yet provide a functional + substitute for ``zeekctl``, which users should continue to use for now. Changed Functionality --------------------- diff --git a/VERSION b/VERSION index 0ba658d84a..bc3fe99cf4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.0-dev.907 +4.1.0-dev.916 diff --git a/auxil/zeek-client b/auxil/zeek-client new file mode 160000 index 0000000000..afe253c775 --- /dev/null +++ b/auxil/zeek-client @@ -0,0 +1 @@ +Subproject commit afe253c77591e87b2a6cf6d5682cd02caa78e9d2 diff --git a/configure b/configure index 6db1dcfc8d..f40846bc43 100755 --- a/configure +++ b/configure @@ -64,6 +64,7 @@ Usage: $0 [OPTION]... [VAR=VALUE]... --enable-static-broker build Broker statically (ignored if --with-broker is specified) --enable-static-binpac build binpac statically (ignored if --with-binpac is specified) --enable-cpp-tests build Zeek's C++ unit tests + --enable-zeek-client install the Zeek cluster management client (experimental) --disable-zeekctl don't install ZeekControl --disable-auxtools don't build or install auxiliary tools --disable-archiver don't build or install zeek-archiver tool @@ -290,6 +291,9 @@ while [ $# -ne 0 ]; do --enable-cpp-tests) append_cache_entry ENABLE_ZEEK_UNIT_TESTS BOOL true ;; + --enable-zeek-client) + append_cache_entry INSTALL_ZEEK_CLIENT BOOL true + ;; --disable-zeekctl) append_cache_entry INSTALL_ZEEKCTL BOOL false ;; diff --git a/scripts/base/frameworks/supervisor/api.zeek b/scripts/base/frameworks/supervisor/api.zeek index 78ef9cbeed..01c41f6f14 100644 --- a/scripts/base/frameworks/supervisor/api.zeek +++ b/scripts/base/frameworks/supervisor/api.zeek @@ -42,8 +42,13 @@ export { stdout_file: string &optional; ## The filename/path to which the node's stderr will be redirected. stderr_file: string &optional; + ## Whether to start the node in bare mode. When left out, the node + ## inherits the bare-mode status the supervisor itself runs with. + bare_mode: bool &optional; ## Additional script filenames/paths that the node should load. scripts: vector of string &default = vector(); + ## Environment variables to define in the supervised node. + env: table[string] of string &default=table(); ## A cpu/core number to which the node will try to pin itself. cpu_affinity: int &optional; ## The Cluster Layout definition. Each node in the Cluster Framework diff --git a/scripts/base/frameworks/supervisor/control.zeek b/scripts/base/frameworks/supervisor/control.zeek index fa20a9dba6..ed9c083cb9 100644 --- a/scripts/base/frameworks/supervisor/control.zeek +++ b/scripts/base/frameworks/supervisor/control.zeek @@ -4,6 +4,7 @@ ##! That is, it may change in various incompatible ways without warning or ##! deprecation until the stable 4.0.0 release. +@load base/frameworks/broker @load ./api module SupervisorControl; @@ -15,6 +16,10 @@ export { ## for their topic names. const topic_prefix = "zeek/supervisor" &redef; + ## When enabled, the Supervisor will listen on the configured Broker + ## :zeek:see:`Broker::default_listen_address`. + const enable_listen = F &redef; + ## Send a request to a remote Supervisor process to create a node. ## ## reqid: an arbitrary string that will be directly echoed in the response diff --git a/scripts/base/frameworks/supervisor/main.zeek b/scripts/base/frameworks/supervisor/main.zeek index f892907055..8b0161b79e 100644 --- a/scripts/base/frameworks/supervisor/main.zeek +++ b/scripts/base/frameworks/supervisor/main.zeek @@ -3,7 +3,6 @@ @load ./api @load ./control -@load base/frameworks/broker function Supervisor::status(node: string): Supervisor::Status { @@ -42,6 +41,13 @@ function Supervisor::node(): Supervisor::NodeConfig event zeek_init() &priority=10 { + if ( Supervisor::is_supervisor() && SupervisorControl::enable_listen ) + { + Broker::listen(Broker::default_listen_address, + Broker::default_port, + Broker::default_listen_retry); + } + Broker::subscribe(SupervisorControl::topic_prefix); } diff --git a/scripts/base/init-supervisor.zeek b/scripts/base/init-supervisor.zeek new file mode 100644 index 0000000000..da5af4b8b4 --- /dev/null +++ b/scripts/base/init-supervisor.zeek @@ -0,0 +1,5 @@ +##! This script loads functionality needed by the supervisor. Zeek only sources +##! this when the supervisor is active (-j). Like init-default.zeek, this isn't +##! loaded in bare mode. + +@load base/frameworks/supervisor diff --git a/scripts/policy/frameworks/cluster/agent/__load__.zeek b/scripts/policy/frameworks/cluster/agent/__load__.zeek new file mode 100644 index 0000000000..1db332f544 --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/__load__.zeek @@ -0,0 +1,5 @@ +# The entry point for the cluster agent. It only runs bootstrap logic for +# launching via the Supervisor. If we're not running the Supervisor, this does +# nothing. + +@load ./boot diff --git a/scripts/policy/frameworks/cluster/agent/api.zeek b/scripts/policy/frameworks/cluster/agent/api.zeek new file mode 100644 index 0000000000..a5334fbbef --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/api.zeek @@ -0,0 +1,33 @@ +@load base/frameworks/supervisor/control +@load policy/frameworks/cluster/controller/types + +module ClusterAgent::API; + +export { + const version = 1; + + # Agent API events + + global set_configuration_request: event(reqid: string, + config: ClusterController::Types::Configuration); + global set_configuration_response: event(reqid: string, + result: ClusterController::Types::Result); + + # Notification events, agent -> controller + + # Report agent being available. + global notify_agent_hello: event(instance: string, host: addr, + api_version: count); + + # Report node state changes. + global notify_change: event(instance: string, + n: ClusterController::Types::Node, + old: ClusterController::Types::State, + new: ClusterController::Types::State); + + # Report operational error. + global notify_error: event(instance: string, msg: string, node: string &default=""); + + # Report informational message. + global notify_log: event(instance: string, msg: string, node: string &default=""); +} diff --git a/scripts/policy/frameworks/cluster/agent/boot.zeek b/scripts/policy/frameworks/cluster/agent/boot.zeek new file mode 100644 index 0000000000..3eed5f6dd9 --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/boot.zeek @@ -0,0 +1,35 @@ +@load ./config + +# The agent needs the supervisor to listen for node management requests. We +# need to tell it to do so, and we need to do so here, in the agent +# bootstrapping code, so the redef applies prior to the fork of the agent +# process itself. +redef SupervisorControl::enable_listen = T; + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + local epi = ClusterAgent::endpoint_info(); + local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T, + $scripts=vector("policy/frameworks/cluster/agent/main.zeek")); + + if ( ClusterAgent::directory != "" ) + sn$directory = ClusterAgent::directory; + if ( ClusterAgent::stdout_file_suffix != "" ) + sn$stdout_file = epi$id + "." + ClusterAgent::stdout_file_suffix; + if ( ClusterAgent::stderr_file_suffix != "" ) + sn$stderr_file = epi$id + "." + ClusterAgent::stderr_file_suffix; + + # This helps Zeek run controller and agent with a minimal set of scripts. + sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "AGENT"; + + local res = Supervisor::create(sn); + + if ( res != "" ) + { + print(fmt("error: supervisor could not create agent node: %s", res)); + exit(1); + } + } diff --git a/scripts/policy/frameworks/cluster/agent/config.zeek b/scripts/policy/frameworks/cluster/agent/config.zeek new file mode 100644 index 0000000000..2e836d08ab --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/config.zeek @@ -0,0 +1,85 @@ +@load policy/frameworks/cluster/controller/types + +module ClusterAgent; + +export { + # The name this agent uses to represent the cluster instance + # it manages. When the environment variable isn't set and there's, + # no redef, this falls back to "agent-". + const name = getenv("ZEEK_AGENT_NAME") &redef; + + # Agent stdout/stderr log files to produce in Zeek's working + # directory. If empty, no such logs will result. The actual + # log files have the agent's name (as per above) dot-prefixed. + const stdout_file_suffix = "agent.stdout" &redef; + const stderr_file_suffix = "agent.stderr" &redef; + + # The address and port the agent listens on. When + # undefined, falls back to configurable default values. + const listen_address = getenv("ZEEK_AGENT_ADDR") &redef; + const default_address = Broker::default_listen_address &redef; + + const listen_port = getenv("ZEEK_AGENT_PORT") &redef; + const default_port = 2151/tcp &redef; + + # The agent communicates under to following topic prefix, + # suffixed with "/" (see above): + const topic_prefix = "zeek/cluster-control/agent" &redef; + + # The coordinates of the controller. When defined, it means + # agents peer with (connect to) the controller; otherwise the + # controller knows all agents and peers with them. + const controller: Broker::NetworkInfo = [ + $address="0.0.0.0", $bound_port=0/unknown] &redef; + + # Agent and controller currently log only, not via the data cluster's + # logger. (This might get added later.) For now, this means that + # if both write to the same log file, it gets garbled. The following + # lets you specify the working directory specifically for the agent. + const directory = "" &redef; + + # Working directory for data cluster nodes. When relative, note + # that this will apply from the working directory of the agent, + # since it creates data cluster nodes. + const cluster_directory = "" &redef; + + # The following functions return the effective network endpoint + # information for this agent, in two related forms. + global instance: function(): ClusterController::Types::Instance; + global endpoint_info: function(): Broker::EndpointInfo; +} + +function instance(): ClusterController::Types::Instance + { + local epi = endpoint_info(); + return ClusterController::Types::Instance($name=epi$id, + $host=to_addr(epi$network$address), + $listen_port=epi$network$bound_port); + } + +function endpoint_info(): Broker::EndpointInfo + { + local epi: Broker::EndpointInfo; + local network: Broker::NetworkInfo; + + if ( ClusterAgent::name != "" ) + epi$id = ClusterAgent::name; + else + epi$id = fmt("agent-%s", gethostname()); + + if ( ClusterAgent::listen_address != "" ) + network$address = ClusterAgent::listen_address; + else if ( ClusterAgent::default_address != "" ) + network$address = ClusterAgent::default_address; + else + network$address = "127.0.0.1"; + + if ( ClusterAgent::listen_port != "" ) + network$bound_port = to_port(ClusterAgent::listen_port); + else + network$bound_port = ClusterAgent::default_port; + + epi$network = network; + + return epi; + } diff --git a/scripts/policy/frameworks/cluster/agent/main.zeek b/scripts/policy/frameworks/cluster/agent/main.zeek new file mode 100644 index 0000000000..1956d47d0c --- /dev/null +++ b/scripts/policy/frameworks/cluster/agent/main.zeek @@ -0,0 +1,223 @@ +@load base/frameworks/broker + +@load policy/frameworks/cluster/controller/config +@load policy/frameworks/cluster/controller/log +@load policy/frameworks/cluster/controller/request + +@load ./api + +redef ClusterController::role = ClusterController::Types::AGENT; + +# The global configuration as passed to us by the controller +global global_config: ClusterController::Types::Configuration; + +# A map to make other instance info accessible +global instances: table[string] of ClusterController::Types::Instance; + +# A map for the nodes we run on this instance, via this agent. +global nodes: table[string] of ClusterController::Types::Node; + +# The node map employed by the supervisor to describe the cluster +# topology to newly forked nodes. We refresh it when we receive +# new configurations. +global data_cluster: table[string] of Supervisor::ClusterEndpoint; + +event SupervisorControl::create_response(reqid: string, result: string) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + local name = req$supervisor_state$node; + + if ( |result| > 0 ) + { + local msg = fmt("failed to create node %s: %s", name, result); + ClusterController::Log::error(msg); + event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); + } + + ClusterController::Request::finish(reqid); + } + +event SupervisorControl::destroy_response(reqid: string, result: bool) + { + local req = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(req) ) + return; + + local name = req$supervisor_state$node; + + if ( ! result ) + { + local msg = fmt("failed to destroy node %s, %s", name, reqid); + ClusterController::Log::error(msg); + event ClusterAgent::API::notify_error(ClusterAgent::name, msg, name); + } + + ClusterController::Request::finish(reqid); + } + +function supervisor_create(nc: Supervisor::NodeConfig) + { + local req = ClusterController::Request::create(); + req$supervisor_state = ClusterController::Request::SupervisorState($node = nc$name); + event SupervisorControl::create_request(req$id, nc); + ClusterController::Log::info(fmt("issued supervisor create for %s, %s", nc$name, req$id)); + } + +function supervisor_destroy(node: string) + { + local req = ClusterController::Request::create(); + req$supervisor_state = ClusterController::Request::SupervisorState($node = node); + event SupervisorControl::destroy_request(req$id, node); + ClusterController::Log::info(fmt("issued supervisor destroy for %s, %s", node, req$id)); + } + +event ClusterAgent::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_request %s", reqid)); + + local nodename: string; + local node: ClusterController::Types::Node; + local nc: Supervisor::NodeConfig; + local msg: string; + + # Adopt the global configuration provided. + # XXX this can later handle validation and persistence + # XXX should do this transactionally, only set when all else worked + global_config = config; + + # Refresh the instances table: + instances = table(); + for ( inst in config$instances ) + instances[inst$name] = inst; + + # Terminate existing nodes + for ( nodename in nodes ) + supervisor_destroy(nodename); + + nodes = table(); + + # Refresh the data cluster and nodes tables + + data_cluster = table(); + for ( node in config$nodes ) + { + if ( node$instance == ClusterAgent::name ) + nodes[node$name] = node; + + local cep = Supervisor::ClusterEndpoint( + $role = node$role, + $host = instances[node$instance]$host, + $p = node$p); + + if ( node?$interface ) + cep$interface = node$interface; + + data_cluster[node$name] = cep; + } + + # Apply the new configuration via the supervisor + + for ( nodename in nodes ) + { + node = nodes[nodename]; + nc = Supervisor::NodeConfig($name=nodename); + + if ( ClusterAgent::cluster_directory != "" ) + nc$directory = ClusterAgent::cluster_directory; + + if ( node?$interface ) + nc$interface = node$interface; + if ( node?$cpu_affinity ) + nc$cpu_affinity = node$cpu_affinity; + if ( node?$scripts ) + nc$scripts = node$scripts; + if ( node?$env ) + nc$env = node$env; + + # XXX could use options to enable per-node overrides for + # directory, stdout, stderr, others? + + nc$cluster = data_cluster; + supervisor_create(nc); + } + + # XXX this currently doesn not fail if any of above problems occurred, + # mainly due to the tediousness of handling the supervisor's response + # events asynchonously. The only indication of error will be + # notification events to the controller. + + local res = ClusterController::Types::Result( + $reqid = reqid, + $instance = ClusterAgent::name); + + ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_response %s", reqid)); + event ClusterAgent::API::set_configuration_response(reqid, res); + } + +event Broker::peer_added(peer: Broker::EndpointInfo, msg: string) + { + # This does not (cannot?) immediately verify that the new peer + # is in fact a controller, so we might send this redundantly. + # Controllers handle the hello event accordingly. + + local epi = ClusterAgent::endpoint_info(); + # XXX deal with unexpected peers, unless we're okay with it + event ClusterAgent::API::notify_agent_hello(epi$id, + to_addr(epi$network$address), ClusterAgent::API::version); + } + +event zeek_init() + { + local epi = ClusterAgent::endpoint_info(); + local agent_topic = ClusterAgent::topic_prefix + "/" + epi$id; + + # The agent needs to peer with the supervisor -- this doesn't currently + # happen automatically. The address defaults to Broker's default, which + # relies on ZEEK_DEFAULT_LISTEN_ADDR and so might just be "". Broker + # internally falls back to listening on any; we pick 127.0.0.1. + local supervisor_addr = Broker::default_listen_address; + if ( supervisor_addr == "" ) + supervisor_addr = "127.0.0.1"; + + Broker::peer(supervisor_addr, Broker::default_port, Broker::default_listen_retry); + + # Agents need receive communication targeted at it, and any responses + # from the supervisor. + Broker::subscribe(agent_topic); + Broker::subscribe(SupervisorControl::topic_prefix); + + # Auto-publish a bunch of events. Glob patterns or module-level + # auto-publish would be helpful here. + Broker::auto_publish(agent_topic, ClusterAgent::API::set_configuration_response); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_agent_hello); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_change); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_error); + Broker::auto_publish(agent_topic, ClusterAgent::API::notify_log); + + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::create_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::destroy_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_request); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::restart_response); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + + # Establish connectivity with the controller. + if ( ClusterAgent::controller$address != "0.0.0.0" ) + { + # We connect to the controller. + Broker::peer(ClusterAgent::controller$address, + ClusterAgent::controller$bound_port, + ClusterController::connect_retry); + } + else + { + # Controller connects to us; listen for it. + Broker::listen(cat(epi$network$address), epi$network$bound_port); + } + + ClusterController::Log::info("agent is live"); + } diff --git a/scripts/policy/frameworks/cluster/controller/__load__.zeek b/scripts/policy/frameworks/cluster/controller/__load__.zeek new file mode 100644 index 0000000000..c88fde804b --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/__load__.zeek @@ -0,0 +1,5 @@ +# The entry point for the cluster controller. It only runs bootstrap logic for +# launching via the Supervisor. If we're not running the Supervisor, this does +# nothing. + +@load ./boot diff --git a/scripts/policy/frameworks/cluster/controller/api.zeek b/scripts/policy/frameworks/cluster/controller/api.zeek new file mode 100644 index 0000000000..4d3e1ba70d --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/api.zeek @@ -0,0 +1,16 @@ +@load ./types + +module ClusterController::API; + +export { + const version = 1; + + global get_instances_request: event(reqid: string); + global get_instances_response: event(reqid: string, + instances: vector of ClusterController::Types::Instance); + + global set_configuration_request: event(reqid: string, + config: ClusterController::Types::Configuration); + global set_configuration_response: event(reqid: string, + result: ClusterController::Types::ResultVec); +} diff --git a/scripts/policy/frameworks/cluster/controller/boot.zeek b/scripts/policy/frameworks/cluster/controller/boot.zeek new file mode 100644 index 0000000000..9d23731946 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/boot.zeek @@ -0,0 +1,29 @@ +@load ./config + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + local epi = ClusterController::endpoint_info(); + local sn = Supervisor::NodeConfig($name=epi$id, $bare_mode=T, + $scripts=vector("policy/frameworks/cluster/controller/main.zeek")); + + if ( ClusterController::directory != "" ) + sn$directory = ClusterController::directory; + if ( ClusterController::stdout_file != "" ) + sn$stdout_file = ClusterController::stdout_file; + if ( ClusterController::stderr_file != "" ) + sn$stderr_file = ClusterController::stderr_file; + + # This helps Zeek run controller and agent with a minimal set of scripts. + sn$env["ZEEK_CLUSTER_MGMT_NODE"] = "CONTROLLER"; + + local res = Supervisor::create(sn); + + if ( res != "" ) + { + print(fmt("error: supervisor could not create controller node: %s", res)); + exit(1); + } + } diff --git a/scripts/policy/frameworks/cluster/controller/config.zeek b/scripts/policy/frameworks/cluster/controller/config.zeek new file mode 100644 index 0000000000..36c4a0b5bd --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/config.zeek @@ -0,0 +1,85 @@ +@load policy/frameworks/cluster/agent/config + +module ClusterController; + +export { + # The name of this controller in the cluster. + # Without the environment variable and no redef, this + # falls back to "controller-". + const name = getenv("ZEEK_CONTROLLER_NAME") &redef; + + # Controller stdout/stderr log files to produce in Zeek's + # working directory. If empty, no such logs will result. + const stdout_file = "controller.stdout" &redef; + const stderr_file = "controller.stderr" &redef; + + # The address and port the controller listens on. When + # undefined, falls back to the default_address, which you can + # likewise customize. + const listen_address = getenv("ZEEK_CONTROLLER_ADDR") &redef; + const default_address = Broker::default_listen_address &redef; + + const listen_port = getenv("ZEEK_CONTROLLER_PORT") &redef; + const default_port = 2150/tcp &redef; + + # A more aggressive default retry interval (vs default 30s) + const connect_retry = 1sec &redef; + + # The controller listens for messages on this topic: + const topic = "zeek/cluster-control/controller" &redef; + + # The set of agents to interact with. When this is non-empty + # at startup, the controller contacts the agents; when it is + # empty, it waits for agents to connect. They key is a name of + # each instance. This should match the $name member of the + # instance records. + const instances: table[string] of ClusterController::Types::Instance = { } &redef; + + # The role of this node in cluster management. Agent and + # controller both redef this. Used during logging. + const role = ClusterController::Types::NONE &redef; + + # Agent and controller currently log only, not via the data cluster's + # logger. (This might get added later.) For now, this means that + # if both write to the same log file, it gets garbled. The following + # lets you specify the working directory specifically for the agent. + const directory = "" &redef; + + # The following functions return the effective network endpoint + # information for this controller, in two related forms. + global network_info: function(): Broker::NetworkInfo; + global endpoint_info: function(): Broker::EndpointInfo; +} + +function network_info(): Broker::NetworkInfo + { + local ni: Broker::NetworkInfo; + + if ( ClusterController::listen_address != "" ) + ni$address = ClusterController::listen_address; + else if ( ClusterController::default_address != "" ) + ni$address = ClusterController::default_address; + else + ni$address = "127.0.0.1"; + + if ( ClusterController::listen_port != "" ) + ni$bound_port = to_port(ClusterController::listen_port); + else + ni$bound_port = ClusterController::default_port; + + return ni; + } + +function endpoint_info(): Broker::EndpointInfo + { + local epi: Broker::EndpointInfo; + + if ( ClusterController::name != "" ) + epi$id = ClusterController::name; + else + epi$id = fmt("controller-%s", gethostname()); + + epi$network = network_info(); + + return epi; + } diff --git a/scripts/policy/frameworks/cluster/controller/log.zeek b/scripts/policy/frameworks/cluster/controller/log.zeek new file mode 100644 index 0000000000..49aeb9b282 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/log.zeek @@ -0,0 +1,109 @@ +@load ./config + +module ClusterController::Log; + +export { + ## The cluster logging stream identifier. + redef enum Log::ID += { LOG }; + + ## A default logging policy hook for the stream. + global log_policy: Log::PolicyHook; + + type Level: enum { + DEBUG, + INFO, + WARNING, + ERROR, + }; + + ## The record type which contains the column fields of the cluster log. + type Info: record { + ## The time at which a cluster message was generated. + ts: time; + ## The name of the node that is creating the log record. + node: string; + ## Log level of this message, converted from the above Level enum + level: string; + ## The role of the node, translated from ClusterController::Types::Role. + role: string; + ## A message indicating information about cluster controller operation. + message: string; + } &log; + + global log_level = DEBUG &redef; + + global info: function(message: string); + global warning: function(message: string); + global error: function(message: string); +} + +# Enum translations to strings. This avoids those enums being reported +# with full qualifications in the logs, which is too verbose. + +global l2s: table[Level] of string = { + [DEBUG] = "DEBUG", + [INFO] = "INFO", + [WARNING] = "WARNING", + [ERROR] = "ERROR", +}; + +global r2s: table[ClusterController::Types::Role] of string = { + [ClusterController::Types::AGENT] = "AGENT", + [ClusterController::Types::CONTROLLER] = "CONTROLLER", +}; + +function debug(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(DEBUG) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[DEBUG], + $role=r2s[ClusterController::role], $message=message]); + } + +function info(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(INFO) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[INFO], + $role=r2s[ClusterController::role], $message=message]); + } + +function warning(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(WARNING) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[WARNING], + $role=r2s[ClusterController::role], $message=message]); + } + +function error(message: string) + { + if ( enum_to_int(log_level) > enum_to_int(ERROR) ) + return; + + local node = Supervisor::node(); + Log::write(LOG, [$ts=network_time(), $node=node$name, $level=l2s[ERROR], + $role=r2s[ClusterController::role], $message=message]); + } + +event zeek_init() + { + if ( ! Supervisor::is_supervised() ) + return; + + local node = Supervisor::node(); + + # Defining the stream outside of the stream creation call sidesteps + # the coverage.find-bro-logs test, which tries to inventory all logs. + # This log isn't yet ready for that level of scrutiny. + local stream = Log::Stream($columns=Info, $path=fmt("cluster-%s", node$name), + $policy=log_policy); + + Log::create_stream(ClusterController::Log::LOG, stream); + } diff --git a/scripts/policy/frameworks/cluster/controller/main.zeek b/scripts/policy/frameworks/cluster/controller/main.zeek new file mode 100644 index 0000000000..afc439b49d --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/main.zeek @@ -0,0 +1,250 @@ +@load base/frameworks/broker + +@load policy/frameworks/cluster/agent/config +@load policy/frameworks/cluster/agent/api + +@load ./api +@load ./log +@load ./request + +redef ClusterController::role = ClusterController::Types::CONTROLLER; + +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + # See if we already know about this agent; if not, register + # it. + # + # XXX protection against rogue agents? + + if ( instance in ClusterController::instances ) + { + # Do nothing, unless this known agent checks in with a mismatching + # API version, in which case we kick it out. + if ( api_version != ClusterController::API::version ) + { + local inst = ClusterController::instances[instance]; + if ( inst?$listen_port ) + { + # We peered with this instance, unpeer. + Broker::unpeer(cat(inst$host), inst$listen_port ); + # XXX what to do if they connected to us? + } + delete ClusterController::instances[instance]; + } + + # Update the instance name in the pointed-to record, in case it + # was previously named otherwise. Not being too picky here allows + # the user some leeway in spelling out the original config. + ClusterController::instances[instance]$name = instance; + + return; + } + + if ( api_version != ClusterController::API::version ) + { + ClusterController::Log::warning( + fmt("agent %s/%s speaks incompatible agent protocol (%s, need %s), unpeering", + instance, host, api_version, ClusterController::API::version)); + } + + ClusterController::instances[instance] = ClusterController::Types::Instance($name=instance, $host=host); + ClusterController::Log::info(fmt("instance %s/%s has checked in", instance, host)); + } + + +event ClusterAgent::API::notify_change(instance: string, n: ClusterController::Types::Node, + old: ClusterController::Types::State, + new: ClusterController::Types::State) + { + # XXX TODO + } + +event ClusterAgent::API::notify_error(instance: string, msg: string, node: string) + { + # XXX TODO + } + +event ClusterAgent::API::notify_log(instance: string, msg: string, node: string) + { + # XXX TODO + } + +event ClusterAgent::API::set_configuration_response(reqid: string, result: ClusterController::Types::Result) + { + ClusterController::Log::info(fmt("rx ClusterAgent::API::set_configuration_response %s", reqid)); + + # Retrieve state for the request we just got a response to + local areq = ClusterController::Request::lookup(reqid); + if ( ClusterController::Request::is_null(areq) ) + return; + + # Record the result and mark the request as done. This also + # marks the request as done in the parent-level request, since + # these records are stored by reference. + areq$results[0] = result; # We only have a single result here atm + areq$finished = T; + + # Update the original request from the client: + local req = ClusterController::Request::lookup(areq$parent_id); + if ( ClusterController::Request::is_null(req) ) + return; + + # If there are any requests to the agents still unfinished, + # we're not done yet. + for ( i in req$set_configuration_state$requests ) + if ( ! req$set_configuration_state$requests[i]$finished ) + return; + + # All set_configuration requests to instances are done, so respond + # back to client. We need to compose the result, aggregating + # the results we got from the requests to the agents. In the + # end we have one Result per instance requested in the + # original set_configuration_request. + # + # XXX we can likely generalize result aggregation in the request module. + for ( i in req$set_configuration_state$requests ) + { + local r = req$set_configuration_state$requests[i]; + + local success = T; + local errors: string_vec; + local instance = ""; + + for ( j in r$results ) + { + local res = r$results[j]; + instance = res$instance; + + if ( res$success ) + next; + + success = F; + errors += fmt("node %s failed: %s", res$node, res$error); + } + + req$results += ClusterController::Types::Result( + $reqid = req$id, + $instance = instance, + $success = success, + $error = join_string_vec(errors, ", ") + ); + + ClusterController::Request::finish(r$id); + } + + ClusterController::Log::info(fmt("tx ClusterController::API::set_configuration_response %s", req$id)); + event ClusterController::API::set_configuration_response(req$id, req$results); + ClusterController::Request::finish(req$id); + } + +event ClusterController::API::set_configuration_request(reqid: string, config: ClusterController::Types::Configuration) + { + ClusterController::Log::info(fmt("rx ClusterController::API::set_configuration_request %s", reqid)); + + local req = ClusterController::Request::create(reqid); + req$set_configuration_state = ClusterController::Request::SetConfigurationState(); + + # Compare new configuration to the current one and send updates + # to the instances as needed. + if ( config?$instances ) + { + # XXX properly handle instance update: connect to new instances provided + # when they are listening, accept connections from new instances that are + # not + for ( inst in config$instances ) + { + if ( inst$name !in ClusterController::instances ) + { + local res = ClusterController::Types::Result($reqid=reqid, $instance=inst$name); + res$error = fmt("instance %s is unknown, skipping", inst$name); + req$results += res; + } + } + } + + # XXX validate the configuration: + # - Are node instances among defined instances? + # - Are all names unique? + # - Are any node options understood? + # - Do node types with optional fields have required values? + # ... + + # Transmit the configuration on to the agents. They need to be aware of + # each other's location and nodes, so the data cluster nodes can connect + # (for example, so a worker on instance 1 can connect to a logger on + # instance 2). + for ( name in ClusterController::instances ) + { + local agent_topic = ClusterAgent::topic_prefix + "/" + name; + local areq = ClusterController::Request::create(); + areq$parent_id = reqid; + + # We track the requests sent off to each agent. As the + # responses come in, we can check them off as completed, + # and once all are, we respond back to the client. + req$set_configuration_state$requests += areq; + + # XXX could also broadcast just once on the agent prefix, but + # explicit request/response pairs for each agent seems cleaner. + ClusterController::Log::info(fmt("tx ClusterAgent::API::set_configuration_request %s to %s", + areq$id, name)); + Broker::publish(agent_topic, ClusterAgent::API::set_configuration_request, areq$id, config); + } + + # Response event gets sent via the agents' reponse event. + } + +event ClusterController::API::get_instances_request(reqid: string) + { + ClusterController::Log::info(fmt("rx ClusterController::API::set_instances_request %s", reqid)); + + local insts: vector of ClusterController::Types::Instance; + + for ( i in ClusterController::instances ) + insts += ClusterController::instances[i]; + + ClusterController::Log::info(fmt("tx ClusterController::API::get_instances_response %s", reqid)); + event ClusterController::API::get_instances_response(reqid, insts); + } + +event zeek_init() + { + # Controller always listens -- it needs to be able to respond + # to the Zeek client. This port is also used by the agents + # if they connect to the client. + local cni = ClusterController::network_info(); + Broker::listen(cat(cni$address), cni$bound_port); + + Broker::subscribe(ClusterAgent::topic_prefix); + Broker::subscribe(ClusterController::topic); + + Broker::auto_publish(ClusterController::topic, + ClusterController::API::get_instances_response); + Broker::auto_publish(ClusterController::topic, + ClusterController::API::set_configuration_response); + + if ( |ClusterController::instances| > 0 ) + { + # We peer with the agents -- otherwise, the agents peer + # with (i.e., connect to) us. + for ( i in ClusterController::instances ) + { + local inst = ClusterController::instances[i]; + + if ( ! inst?$listen_port ) + { + # XXX config error -- this must be there + next; + } + + Broker::peer(cat(inst$host), inst$listen_port, + ClusterController::connect_retry); + } + } + + # If ClusterController::instances is empty, agents peer with + # us and we do nothing. We'll build up state as the + # notify_agent_hello() events come int. + + ClusterController::Log::info("controller is live"); + } diff --git a/scripts/policy/frameworks/cluster/controller/request.zeek b/scripts/policy/frameworks/cluster/controller/request.zeek new file mode 100644 index 0000000000..868b84d0f0 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/request.zeek @@ -0,0 +1,86 @@ +@load ./types + +module ClusterController::Request; + +export { + type Request: record { + id: string; + parent_id: string &optional; + }; + + # API-specific state. XXX we may be able to generalize after this + # has settled a bit more. + + # State specific to the set_configuration request/response events + type SetConfigurationState: record { + requests: vector of Request &default=vector(); + }; + + # State specific to the set_nodes request/response events + type SetNodesState: record { + requests: vector of Request &default=vector(); + }; + + # State specific to supervisor interactions + type SupervisorState: record { + node: string; + }; + + # The redef is a workaround so we can use the Request type + # while it is still being defined + redef record Request += { + results: ClusterController::Types::ResultVec &default=vector(); + finished: bool &default=F; + + set_configuration_state: SetConfigurationState &optional; + set_nodes_state: SetNodesState &optional; + supervisor_state: SupervisorState &optional; + }; + + global null_req = Request($id="", $finished=T); + + global create: function(reqid: string &default=unique_id("")): Request; + global lookup: function(reqid: string): Request; + global finish: function(reqid: string): bool; + + global is_null: function(request: Request): bool; +} + +# XXX this needs a mechanism for expiring stale requests +global requests: table[string] of Request; + +function create(reqid: string): Request + { + local ret = Request($id=reqid); + requests[reqid] = ret; + return ret; + } + +function lookup(reqid: string): Request + { + if ( reqid in requests ) + return requests[reqid]; + + return null_req; + } + +function finish(reqid: string): bool + { + if ( reqid !in requests ) + return F; + + local req = requests[reqid]; + delete requests[reqid]; + + req$finished = T; + + return T; + } + +function is_null(request: Request): bool + { + if ( request$id == "" ) + return T; + + return F; + } diff --git a/scripts/policy/frameworks/cluster/controller/types.zeek b/scripts/policy/frameworks/cluster/controller/types.zeek new file mode 100644 index 0000000000..e2e0899a88 --- /dev/null +++ b/scripts/policy/frameworks/cluster/controller/types.zeek @@ -0,0 +1,80 @@ +# Types for the Cluster Controller framework. These are used by both agent and controller. + +module ClusterController::Types; + +export { + ## Management infrastructure node type. This intentionally does not + ## include the data cluster node types (worker, logger, etc) -- those + ## continue to be managed by the cluster framework. + type Role: enum { + NONE, + AGENT, + CONTROLLER, + }; + + ## A Zeek-side option with value. + type Option: record { + name: string; # Name of option + value: string; # Value of option + }; + + ## Configuration describing a Zeek instance running a Cluster + ## Agent. Normally, there'll be one instance per cluster + ## system: a single physical system. + type Instance: record { + # Unique, human-readable instance name + name: string; + # IP address of system + host: addr; + # Agent listening port. Not needed if agents connect to controller. + listen_port: port &optional; + }; + + ## State that a Cluster Node can be in. State changes trigger an + ## API notification (see notify_change()). + type State: enum { + Running, # Running and operating normally + Stopped, # Explicitly stopped + Failed, # Failed to start; and permanently halted + Crashed, # Crashed, will be restarted, + Unknown, # State not known currently (e.g., because of lost connectivity) + }; + + ## Configuration describing a Cluster Node process. + type Node: record { + name: string; # Cluster-unique, human-readable node name + instance: string; # Name of instance where node is to run + p: port; # Port on which this node will listen + role: Supervisor::ClusterRole; # Role of the node. + state: State; # Desired, or current, run state. + scripts: vector of string &optional; # Additional Zeek scripts for node + options: set[Option] &optional; # Zeek options for node + interface: string &optional; # Interface to sniff + cpu_affinity: int &optional; # CPU/core number to pin to + env: table[string] of string &default=table(); # Custom environment vars + }; + + # Data structure capturing a cluster's complete configuration. + type Configuration: record { + id: string &default=unique_id(""); # Unique identifier for a particular configuration + + ## The instances in the cluster. + ## XXX we may be able to make this optional + instances: set[Instance]; + + ## The set of nodes in the cluster, as distributed over the instances. + nodes: set[Node]; + }; + + # Return value for request-response API event pairs + type Result: record { + reqid: string; # Request ID of operation this result refers to + instance: string; # Name of associated instance (for context) + success: bool &default=T; # True if successful + data: any &optional; # Addl data returned for successful operation + error: string &default=""; # Descriptive error on failure + node: string &optional; # Name of associated node (for context) + }; + + type ResultVec: vector of Result; +} diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index 610712716f..ea36a239ef 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -11,6 +11,19 @@ # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/agent/__load__.zeek +@load frameworks/cluster/agent/api.zeek +@load frameworks/cluster/agent/boot.zeek +@load frameworks/cluster/agent/config.zeek +# @load frameworks/cluster/agent/main.zeek +@load frameworks/cluster/controller/__load__.zeek +@load frameworks/cluster/controller/api.zeek +@load frameworks/cluster/controller/boot.zeek +@load frameworks/cluster/controller/config.zeek +@load frameworks/cluster/controller/log.zeek +# @load frameworks/cluster/controller/main.zeek +@load frameworks/cluster/controller/request.zeek +@load frameworks/cluster/controller/types.zeek @load frameworks/dpd/detect-protocols.zeek @load frameworks/dpd/packet-segment-logging.zeek @load frameworks/intel/do_notice.zeek diff --git a/scripts/zeekygen/__load__.zeek b/scripts/zeekygen/__load__.zeek index 5c41a673a6..75a5f7a666 100644 --- a/scripts/zeekygen/__load__.zeek +++ b/scripts/zeekygen/__load__.zeek @@ -4,6 +4,8 @@ @load protocols/ssl/notary.zeek @load frameworks/control/controllee.zeek @load frameworks/control/controller.zeek +@load frameworks/cluster/agent/main.zeek +@load frameworks/cluster/controller/main.zeek @load frameworks/files/extract-all-files.zeek @load policy/misc/dump-events.zeek @load policy/protocols/conn/speculative-service.zeek diff --git a/src/supervisor/Supervisor.cc b/src/supervisor/Supervisor.cc index d751387a1e..ac16bae1ff 100644 --- a/src/supervisor/Supervisor.cc +++ b/src/supervisor/Supervisor.cc @@ -1251,6 +1251,11 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node) if ( affinity_val ) rval.cpu_affinity = affinity_val->AsInt(); + const auto& bare_mode_val = node->GetField("bare_mode"); + + if ( bare_mode_val ) + rval.bare_mode = bare_mode_val->AsBool(); + auto scripts_val = node->GetField("scripts")->AsVectorVal(); for ( auto i = 0u; i < scripts_val->Size(); ++i ) @@ -1259,6 +1264,21 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node) rval.scripts.emplace_back(std::move(script)); } + auto env_table_val = node->GetField("env")->AsTableVal(); + auto env_table = env_table_val->AsTable(); + + for ( const auto& ee : *env_table ) + { + auto k = ee.GetHashKey(); + auto* v = ee.GetValue(); + + auto key = env_table_val->RecreateIndex(*k); + auto name = key->Idx(0)->AsStringVal()->ToStdString(); + auto val = v->GetVal()->AsStringVal()->ToStdString(); + + rval.env[name] = val; + } + auto cluster_table_val = node->GetField("cluster")->AsTableVal(); auto cluster_table = cluster_table_val->AsTable(); @@ -1309,11 +1329,19 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) if ( auto it = j.FindMember("cpu_affinity"); it != j.MemberEnd() ) rval.cpu_affinity = it->value.GetInt(); + if ( auto it = j.FindMember("bare_mode"); it != j.MemberEnd() ) + rval.bare_mode = it->value.GetBool(); + auto& scripts = j["scripts"]; for ( auto it = scripts.Begin(); it != scripts.End(); ++it ) rval.scripts.emplace_back(it->GetString()); + auto& env = j["env"]; + + for ( auto it = env.MemberBegin(); it != env.MemberEnd(); ++it ) + rval.env[it->name.GetString()] = it->value.GetString(); + auto& cluster = j["cluster"]; for ( auto it = cluster.MemberBegin(); it != cluster.MemberEnd(); ++it ) @@ -1365,6 +1393,9 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const if ( cpu_affinity ) rval->AssignField("cpu_affinity", *cpu_affinity); + if ( bare_mode ) + rval->AssignField("bare_mode", *bare_mode); + auto st = rt->GetFieldType("scripts"); auto scripts_val = make_intrusive(std::move(st)); @@ -1373,6 +1404,17 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const rval->AssignField("scripts", std::move(scripts_val)); + auto et = rt->GetFieldType("env"); + auto env_val = make_intrusive(std::move(et)); + rval->AssignField("env", env_val); + + for ( const auto& e : env ) + { + auto name = make_intrusive(e.first); + auto val = make_intrusive(e.second); + env_val->Assign(std::move(name), std::move(val)); + } + auto tt = rt->GetFieldType("cluster"); auto cluster_val = make_intrusive(std::move(tt)); rval->AssignField("cluster", cluster_val); @@ -1454,6 +1496,7 @@ bool SupervisedNode::InitCluster() const { const auto& node_name = e.first; const auto& ep = e.second; + auto key = make_intrusive(node_name); auto val = make_intrusive(cluster_node_type); @@ -1533,6 +1576,19 @@ void SupervisedNode::Init(Options* options) const node_name.data(), strerror(errno)); } + if ( ! config.env.empty() ) + { + for ( const auto& e : config.env ) + { + if ( setenv(e.first.c_str(), e.second.c_str(), true) == -1 ) + { + fprintf(stderr, "node '%s' failed to setenv: %s\n", + node_name.data(), strerror(errno)); + exit(1); + } + } + } + if ( ! config.cluster.empty() ) { if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 ) @@ -1545,6 +1601,9 @@ void SupervisedNode::Init(Options* options) const options->filter_supervised_node_options(); + if ( config.bare_mode ) + options->bare_mode = *config.bare_mode; + if ( config.interface ) options->interface = *config.interface; diff --git a/src/supervisor/Supervisor.h b/src/supervisor/Supervisor.h index 5dbc878207..0fb02d35b9 100644 --- a/src/supervisor/Supervisor.h +++ b/src/supervisor/Supervisor.h @@ -186,10 +186,19 @@ public: * A cpu/core number to which the node will try to pin itself. */ std::optional cpu_affinity; + /** + * Whether to start the node in bare mode. When not present, the + * node inherits the bare-mode status of the supervisor. + */ + std::optional bare_mode; /** * Additional script filename/paths that the node should load. */ std::vector scripts; + /** + * Environment variables and values to define in the node. + */ + std::map env; /** * The Cluster Layout definition. Each node in the Cluster Framework * knows about the full, static cluster topology to which it belongs. diff --git a/src/zeek-setup.cc b/src/zeek-setup.cc index 240e2f2a37..838bf86ef8 100644 --- a/src/zeek-setup.cc +++ b/src/zeek-setup.cc @@ -532,7 +532,14 @@ SetupResult setup(int argc, char** argv, Options* zopts) add_essential_input_file("base/init-frameworks-and-bifs.zeek"); if ( ! options.bare_mode ) - add_input_file("base/init-default.zeek"); + { + // The supervisor only needs to load a limited set of + // scripts, since it won't be doing traffic processing. + if ( options.supervisor_mode ) + add_input_file("base/init-supervisor.zeek"); + else + add_input_file("base/init-default.zeek"); + } add_input_file("builtin-plugins/__preload__.zeek"); add_input_file("builtin-plugins/__load__.zeek"); diff --git a/testing/btest/Baseline/coverage.bare-mode-errors/errors b/testing/btest/Baseline/coverage.bare-mode-errors/errors index a8808581ef..ed8c1aa4a4 100644 --- a/testing/btest/Baseline/coverage.bare-mode-errors/errors +++ b/testing/btest/Baseline/coverage.bare-mode-errors/errors @@ -1,9 +1,9 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### NOTE: This file has been sorted with diff-sort. -warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:10 "Remove in v5.1. Use log-certs-base64.zeek instead." +warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:12 "Remove in v5.1. Use log-certs-base64.zeek instead." warning in <...>/extract-certs-pem.zeek, line 1: deprecated script loaded from command line arguments "Remove in v5.1. Use log-certs-base64.zeek instead." -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:45 ("Remove in v5.1. OCSP logging is now enabled by default") -warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:45 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:58 ("Remove in v5.1. OCSP logging is now enabled by default") +warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from <...>/test-all-policy.zeek:58 ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/log-ocsp.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. OCSP logging is now enabled by default") warning in <...>/notary.zeek, line 1: deprecated script loaded from <...>/__load__.zeek:4 ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") warning in <...>/notary.zeek, line 1: deprecated script loaded from command line arguments ("Remove in v5.1. Please switch to other more modern approaches like SCT validation (validate-sct.zeek).") diff --git a/testing/btest/Baseline/coverage.init-default/missing_loads b/testing/btest/Baseline/coverage.init-default/missing_loads index 79aa700647..fe23c7a04a 100644 --- a/testing/btest/Baseline/coverage.init-default/missing_loads +++ b/testing/btest/Baseline/coverage.init-default/missing_loads @@ -10,3 +10,4 @@ -./frameworks/openflow/cluster.zeek -./frameworks/packet-filter/cluster.zeek -./frameworks/sumstats/cluster.zeek +-./init-supervisor.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 5a9e7de7a5..12e752da97 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -661,6 +661,8 @@ 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugin, , (SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals})) -> 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugin, , (SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average})) -> 0.000000 MetaHookPost CallFunction(SumStats::register_observe_plugins, , ()) -> +0.000000 MetaHookPost CallFunction(Supervisor::__is_supervisor, , ()) -> +0.000000 MetaHookPost CallFunction(Supervisor::is_supervisor, , ()) -> 0.000000 MetaHookPost CallFunction(__init_primary_bifs, , ()) -> 0.000000 MetaHookPost CallFunction(__init_secondary_bifs, , ()) -> 0.000000 MetaHookPost CallFunction(current_time, , ()) -> @@ -1710,6 +1712,8 @@ 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugin, , (SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals})) 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugin, , (SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average})) 0.000000 MetaHookPre CallFunction(SumStats::register_observe_plugins, , ()) +0.000000 MetaHookPre CallFunction(Supervisor::__is_supervisor, , ()) +0.000000 MetaHookPre CallFunction(Supervisor::is_supervisor, , ()) 0.000000 MetaHookPre CallFunction(__init_primary_bifs, , ()) 0.000000 MetaHookPre CallFunction(__init_secondary_bifs, , ()) 0.000000 MetaHookPre CallFunction(current_time, , ()) @@ -2758,6 +2762,8 @@ 0.000000 | HookCallFunction SumStats::register_observe_plugin(SumStats::UNIQUE, lambda_<14393221830775341876>{ if (!SumStats::rv?$unique_vals) SumStats::rv$unique_vals = (coerce set() to set[SumStats::Observation])if (SumStats::r?$unique_max) SumStats::rv$unique_max = SumStats::r$unique_maxif (!SumStats::r?$unique_max || sizeofSumStats::rv$unique_vals <= SumStats::r$unique_max) add SumStats::rv$unique_vals[SumStats::obs]SumStats::rv$unique = sizeofSumStats::rv$unique_vals}) 0.000000 | HookCallFunction SumStats::register_observe_plugin(SumStats::VARIANCE, lambda_<6557258612059469785>{ if (1 < SumStats::rv$num) SumStats::rv$var_s += ((SumStats::val - SumStats::rv$prev_avg) * (SumStats::val - SumStats::rv$average))SumStats::calc_variance(SumStats::rv)SumStats::rv$prev_avg = SumStats::rv$average}) 0.000000 | HookCallFunction SumStats::register_observe_plugins() +0.000000 | HookCallFunction Supervisor::__is_supervisor() +0.000000 | HookCallFunction Supervisor::is_supervisor() 0.000000 | HookCallFunction __init_primary_bifs() 0.000000 | HookCallFunction __init_secondary_bifs() 0.000000 | HookCallFunction current_time() diff --git a/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout b/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout new file mode 100644 index 0000000000..3ce8a1f373 --- /dev/null +++ b/testing/btest/Baseline/scripts.policy.frameworks.cluster.controller.agent-checkin/zeek.controller.stdout @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +notify_agent_hello agent 127.0.0.1 1 diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out new file mode 100644 index 0000000000..45c8a185df --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.bare.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +bare mode +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out new file mode 100644 index 0000000000..9a9a51018c --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.default.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +default mode +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out new file mode 100644 index 0000000000..45c8a185df --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-bare-mode/zeek.inherit.node.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() +bare mode +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-env/zeek.node.out b/testing/btest/Baseline/supervisor.config-env/zeek.node.out new file mode 100644 index 0000000000..56945e2def --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-env/zeek.node.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervised node zeek_init() with env foo=bar +supervised node zeek_done() diff --git a/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out b/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out new file mode 100644 index 0000000000..b5c69a253d --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-env/zeek.supervisor.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +supervisor zeek_init() +destroying node +supervisor zeek_done() diff --git a/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek b/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek new file mode 100644 index 0000000000..cf1304e3f3 --- /dev/null +++ b/testing/btest/scripts/policy/frameworks/cluster/controller/agent-checkin.zeek @@ -0,0 +1,60 @@ +# This test verifies basic agent-controller communication. We launch agent and +# controller via the supervisor, add an extra handler for the notify_agent_hello +# event that travels agent -> controller, and verify its print output in the +# controller's stdout log. + +# The following env vars is known to the controller framework +# @TEST-PORT: ZEEK_CONTROLLER_PORT +# @TEST-PORT: BROKER_PORT + +# A bit of a detour to get the port number into the agent configuration +# @TEST-EXEC: btest-bg-run zeek zeek -j %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/controller.stdout + +@load policy/frameworks/cluster/agent +@load policy/frameworks/cluster/controller + +redef Broker::default_port = to_port(getenv("BROKER_PORT")); + +redef ClusterController::name = "controller"; +redef ClusterAgent::name = "agent"; + +# Tell the agent where to locate the controller. +redef ClusterAgent::controller = [$address="127.0.0.1", $bound_port=to_port(getenv("ZEEK_CONTROLLER_PORT"))]; + +@if ( Supervisor::is_supervised() ) + +@load policy/frameworks/cluster/agent/api + +global logged = F; + +event zeek_init() + { + # We're using the controller to shut everything down once the + # notify_agent_hello event has arrived. The controller doesn't normally + # talk to the supervisor, so connect to it. + if ( Supervisor::node()$name == "controller" ) + { + Broker::peer(getenv("ZEEK_DEFAULT_LISTEN_ADDRESS"), Broker::default_port, Broker::default_listen_retry); + Broker::auto_publish(SupervisorControl::topic_prefix, SupervisorControl::stop_request); + } + } + +event ClusterAgent::API::notify_agent_hello(instance: string, host: addr, api_version: count) + { + if ( Supervisor::node()$name == "controller" ) + { + # On rare occasion it can happen that we log this twice, which'll need + # investigating. For now we ensure we only do so once. + if ( ! logged ) + print(fmt("notify_agent_hello %s %s %s", instance, host, api_version)); + + logged = T; + + # This takes down the whole process tree. + event SupervisorControl::stop_request(); + } + } + +@endif diff --git a/testing/btest/supervisor/config-bare-mode.zeek b/testing/btest/supervisor/config-bare-mode.zeek new file mode 100644 index 0000000000..0e3f9c77e2 --- /dev/null +++ b/testing/btest/supervisor/config-bare-mode.zeek @@ -0,0 +1,74 @@ +# This test verifies the functionality of the bare_mode flag in NodeConfig. +# We launch two nodes, one regular, one in bare mode. Each outputs a different +# string depending on mode, and exits. We verify the resulting outputs. + +# @TEST-PORT: BROKER_PORT +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff zeek/inherit/node.out +# @TEST-EXEC: btest-diff zeek/bare/node.out +# @TEST-EXEC: btest-diff zeek/default/node.out + + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global node_output_file: file; +global topic = "test-topic"; + +event do_destroy(name: string) + { + Supervisor::destroy(name); + + # When no nodes are left, exit. + local status = Supervisor::status(); + if ( |status$nodes| == 0) + terminate(); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + + # Create a node that inherits basre mode from us. + local sn = Supervisor::NodeConfig($name="inherit", $directory="inherit"); + Supervisor::create(sn); + + # Create a node that specifies bare mode. + sn = Supervisor::NodeConfig($name="bare", $directory="bare", $bare_mode=T); + Supervisor::create(sn); + + # Create a node that specifies default mode. + sn = Supervisor::NodeConfig($name="default", $directory="default", $bare_mode=F); + Supervisor::create(sn); + + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + node_output_file = open("node.out"); + print node_output_file, "supervised node zeek_init()"; + +# This is only defined when we're loading init-default.zeek: +@ifdef ( Notice::Info ) + print node_output_file, "default mode"; +@else + print node_output_file, "bare mode"; +@endif + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy, Supervisor::node()$name); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + print node_output_file, "supervised node zeek_done()"; + } diff --git a/testing/btest/supervisor/config-env.zeek b/testing/btest/supervisor/config-env.zeek new file mode 100644 index 0000000000..dce6593612 --- /dev/null +++ b/testing/btest/supervisor/config-env.zeek @@ -0,0 +1,62 @@ +# @TEST-PORT: BROKER_PORT +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff zeek/supervisor.out +# @TEST-EXEC: btest-diff zeek/node.out + +# So the supervised node doesn't terminate right away. +redef exit_only_after_terminate=T; + +global supervisor_output_file: file; +global node_output_file: file; +global topic = "test-topic"; + +event do_destroy() + { + print supervisor_output_file, "destroying node"; + Supervisor::destroy("grault"); + } + +event zeek_init() + { + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + supervisor_output_file = open("supervisor.out"); + print supervisor_output_file, "supervisor zeek_init()"; + local sn = Supervisor::NodeConfig($name="grault", $env=table( + ["foo"] = "bar" + )); + local res = Supervisor::create(sn); + + if ( res != "" ) + print supervisor_output_file, res; + } + else + { + Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT"))); + node_output_file = open("node.out"); + print node_output_file, fmt("supervised node zeek_init() with env foo=%s", getenv("foo")); + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervised() ) + Broker::publish(topic, do_destroy); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + # Should only be run by supervisor + terminate(); + } + +event zeek_done() + { + if ( Supervisor::is_supervised() ) + print node_output_file, "supervised node zeek_done()"; + else + print supervisor_output_file, "supervisor zeek_done()"; + }