Merge branch 'topic/christian/supervisor-node-simplification'

* topic/christian/supervisor-node-simplification:
  Remove the Supervisor's internal ClusterEndpoint struct.
  Provide a script-layer equivalent to Supervisor::__init_cluster().
This commit is contained in:
Christian Kreibich 2024-07-02 15:35:36 -07:00
commit f9af58a5c2
10 changed files with 115 additions and 182 deletions

View file

@ -1,3 +1,9 @@
7.0.0-dev.423 | 2024-07-02 15:35:36 -0700
* Remove the Supervisor's internal ClusterEndpoint struct. (Christian Kreibich, Corelight)
* Provide a script-layer equivalent to Supervisor::__init_cluster(). (Christian Kreibich, Corelight)
7.0.0-dev.420 | 2024-07-02 14:46:29 -0700
* Update NEWS file to cover JSON enhancements (Christian Kreibich, Corelight)

4
NEWS
View file

@ -151,6 +151,10 @@ Changed Functionality
it aligns with the same requirement for traditional analyzers and
enables customizing file handles for protocol-specific semantics.
- The Supervisor's API now returns NodeConfig records with a cluster table whose
ClusterEndpoints have a port value of 0/unknown, rather than 0/tcp, to
indicate that the node in question has no listening port.
Removed Functionality
---------------------

View file

@ -1 +1 @@
7.0.0-dev.420
7.0.0-dev.423

View file

@ -14,14 +14,21 @@ redef Broker::log_topic = Cluster::rr_log_topic;
# Add a cluster prefix.
@prefixes += cluster
# If this script isn't found anywhere, the cluster bombs out.
# Loading the cluster framework requires that a script by this name exists
# somewhere in the ZEEKPATH. The only thing in the file should be the
# cluster definition in the :zeek:id:`Cluster::nodes` variable.
@if ( Supervisor::is_supervised() )
# When running a supervised cluster, populate Cluster::nodes from the node table
# the Supervisor provides to new Zeek nodes. The management framework configures
# the cluster this way.
@load ./supervisor
@if ( Cluster::Supervisor::__init_cluster_nodes() && Cluster::get_node_count(Cluster::LOGGER) > 0 )
redef Cluster::manager_is_logger = F;
@endif
@endif
@if ( ! Supervisor::__init_cluster() )
# When running a supervised cluster, Cluster::nodes is instead populated
# from the internal C++-layer directly via the above BIF.
@if ( |Cluster::nodes| == 0 )
# Fall back to loading a cluster topology from cluster-layout.zeek. If Zeek
# cannot find this script in your ZEEKPATH, it will exit. The script should only
# contain the cluster definition in the :zeek:id:`Cluster::nodes` variable.
# The zeekctl tool manages this file for you.
@load cluster-layout
@endif

View file

@ -0,0 +1,57 @@
##! Cluster-related functionality specific to running under the Supervisor
##! framework.
@load base/frameworks/supervisor/api
module Cluster::Supervisor;
export {
## Populates the current node's :zeek:id:`Cluster::nodes` table from the
## supervisor's node configuration in :zeek:id:`Supervisor::NodeConfig`.
##
## Returns: true if initialization completed, false otherwise.
global __init_cluster_nodes: function(): bool;
}
function __init_cluster_nodes(): bool
{
local config = Supervisor::node();
if ( |config$cluster| == 0 )
return F;
local rolemap: table[Supervisor::ClusterRole] of Cluster::NodeType = {
[Supervisor::LOGGER] = Cluster::LOGGER,
[Supervisor::MANAGER] = Cluster::MANAGER,
[Supervisor::PROXY] = Cluster::PROXY,
[Supervisor::WORKER] = Cluster::WORKER,
};
local manager_name = "";
local cnode: Cluster::Node;
local typ: Cluster::NodeType = Cluster::NONE;
for ( node_name, endp in config$cluster )
{
if ( endp$role == Supervisor::MANAGER )
manager_name = node_name;
}
for ( node_name, endp in config$cluster )
{
if ( endp$role in rolemap )
typ = rolemap[endp$role];
cnode = [$node_type=typ, $ip=endp$host, $p=endp$p];
@pragma push ignore-deprecations
if ( endp?$interface )
cnode$interface = endp$interface;
@pragma pop ignore-deprecations
if ( |manager_name| > 0 && cnode$node_type != Cluster::MANAGER )
cnode$manager = manager_name;
Cluster::nodes[node_name] = cnode;
}
return T;
}

View file

@ -625,10 +625,9 @@ function get_nodes_request_finish(areq: Management::Request::Request)
if ( node in g_nodes )
cns$state = g_nodes[node]$state;
# The supervisor's responses use 0/tcp (not 0/unknown)
# when indicating an unused port because its internal
# serialization always assumes TCP.
if ( sns$node$cluster[node]$p != 0/tcp )
# The supervisor's responses use 0/unknown to indicate
# unused ports. (Prior to Zeek 7 this used to be 0/tcp.)
if ( sns$node$cluster[node]$p != 0/unknown )
cns$p = sns$node$cluster[node]$p;
}
else

View file

@ -18,6 +18,8 @@
#define RAPIDJSON_HAS_STDSTRING 1
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
extern "C" {
#include "zeek/3rdparty/setsignal.h"
@ -1243,34 +1245,9 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node)
rval.env[name] = v->GetVal()->AsStringVal()->ToStdString();
}
auto cluster_table_val = node->GetField("cluster")->AsTableVal();
auto cluster_table = cluster_table_val->AsTable();
for ( const auto& cte : *cluster_table ) {
auto k = cte.GetHashKey();
auto* v = cte.value;
auto key = cluster_table_val->RecreateIndex(*k);
auto name = key->Idx(0)->AsStringVal()->ToStdString();
auto rv = v->GetVal()->AsRecordVal();
Supervisor::ClusterEndpoint ep;
ep.role = static_cast<BifEnum::Supervisor::ClusterRole>(rv->GetFieldAs<EnumVal>("role"));
ep.host = rv->GetFieldAs<AddrVal>("host").AsString();
ep.port = rv->GetFieldAs<PortVal>("p")->Port();
const auto& iface = rv->GetField("interface");
if ( iface )
ep.interface = iface->AsStringVal()->ToStdString();
const auto& pcap_file = rv->GetField("pcap_file");
if ( pcap_file )
ep.pcap_file = pcap_file->AsStringVal()->ToStdString();
rval.cluster.emplace(name, std::move(ep));
}
auto cluster_table_val = node->GetField("cluster");
auto re = std::make_unique<RE_Matcher>("^_");
rval.cluster = cluster_table_val->ToJSON(false, re.get())->ToStdString();
return rval;
}
@ -1319,26 +1296,10 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) {
auto& cluster = j["cluster"];
for ( auto it = cluster.MemberBegin(); it != cluster.MemberEnd(); ++it ) {
Supervisor::ClusterEndpoint ep;
auto key = it->name.GetString();
auto& val = it->value;
auto& role_str = val["role"];
ep.role = role_str_to_enum(role_str.GetString());
ep.host = val["host"].GetString();
ep.port = val["p"]["port"].GetInt();
if ( auto it = val.FindMember("interface"); it != val.MemberEnd() )
ep.interface = it->value.GetString();
if ( auto it = val.FindMember("pcap_file"); it != val.MemberEnd() )
ep.pcap_file = it->value.GetString();
rval.cluster.emplace(key, std::move(ep));
}
rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
cluster.Accept(writer);
rval.cluster = sb.GetString();
return rval;
}
@ -1349,7 +1310,7 @@ std::string Supervisor::NodeConfig::ToJSON() const {
}
RecordValPtr Supervisor::NodeConfig::ToRecord() const {
const auto& rt = BifType::Record::Supervisor::NodeConfig;
const auto& rt = id::find_type<RecordType>("Supervisor::NodeConfig");
auto rval = make_intrusive<RecordVal>(rt);
rval->AssignField("name", name);
@ -1401,27 +1362,18 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const {
}
auto tt = rt->GetFieldType<TableType>("cluster");
auto cluster_val = make_intrusive<TableVal>(std::move(tt));
rval->AssignField("cluster", cluster_val);
for ( const auto& e : cluster ) {
auto& name = e.first;
auto& ep = e.second;
auto key = make_intrusive<StringVal>(name);
const auto& ept = BifType::Record::Supervisor::ClusterEndpoint;
auto val = make_intrusive<RecordVal>(ept);
val->AssignField("role", BifType::Enum::Supervisor::ClusterRole->GetEnumVal(ep.role));
val->AssignField("host", make_intrusive<AddrVal>(ep.host));
val->AssignField("p", val_mgr->Port(ep.port, TRANSPORT_TCP));
if ( ep.interface )
val->AssignField("interface", *ep.interface);
if ( ep.pcap_file )
val->AssignField("pcap_file", *ep.pcap_file);
cluster_val->Assign(std::move(key), std::move(val));
auto json_res = detail::ValFromJSON(cluster, tt, Func::nil);
if ( auto val = std::get_if<ValPtr>(&json_res) ) {
rval->AssignField("cluster", *val);
}
else {
// This should never happen: the JSON data comes from a table[string] of
// ClusterEndpoint and should therefore allow instantiation. Exiting
// here can be hard to debug. Other JSON code (see FromJSON()) fails
// silently when the JSON is misformatted. We just warn:
fprintf(stderr, "Could not parse %s's cluster table from '%s': %s\n", name.c_str(), cluster.c_str(),
std::get<std::string>(json_res).c_str());
rval->AssignField("cluster", make_intrusive<TableVal>(std::move(tt)));
}
return rval;
@ -1439,62 +1391,6 @@ RecordValPtr SupervisorNode::ToRecord() const {
return rval;
}
static ValPtr supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRole role) {
static auto node_type = id::find_type<zeek::EnumType>("Cluster::NodeType");
switch ( role ) {
case BifEnum::Supervisor::LOGGER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "LOGGER"));
case BifEnum::Supervisor::MANAGER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "MANAGER"));
case BifEnum::Supervisor::PROXY: return node_type->GetEnumVal(node_type->Lookup("Cluster", "PROXY"));
case BifEnum::Supervisor::WORKER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "WORKER"));
default: return node_type->GetEnumVal(node_type->Lookup("Cluster", "NONE"));
}
}
bool SupervisedNode::InitCluster() const {
if ( config.cluster.empty() )
return false;
const auto& cluster_node_type = id::find_type<RecordType>("Cluster::Node");
const auto& cluster_nodes_id = id::find("Cluster::nodes");
const auto& cluster_manager_is_logger_id = id::find("Cluster::manager_is_logger");
auto cluster_nodes = cluster_nodes_id->GetVal()->AsTableVal();
auto has_logger = false;
std::optional<std::string> manager_name;
for ( const auto& e : config.cluster ) {
if ( e.second.role == BifEnum::Supervisor::MANAGER )
manager_name = e.first;
else if ( e.second.role == BifEnum::Supervisor::LOGGER )
has_logger = true;
}
for ( const auto& e : config.cluster ) {
const auto& node_name = e.first;
const auto& ep = e.second;
auto key = make_intrusive<StringVal>(node_name);
auto val = make_intrusive<RecordVal>(cluster_node_type);
auto node_type = supervisor_role_to_cluster_node_type(ep.role);
val->AssignField("node_type", std::move(node_type));
val->AssignField("ip", make_intrusive<AddrVal>(ep.host));
val->AssignField("p", val_mgr->Port(ep.port, TRANSPORT_TCP));
// Remove in v7.1: Interface removed from Cluster::Node.
if ( ep.interface )
val->AssignField("interface", *ep.interface);
if ( manager_name && ep.role != BifEnum::Supervisor::MANAGER )
val->AssignField("manager", *manager_name);
cluster_nodes->Assign(std::move(key), std::move(val));
}
cluster_manager_is_logger_id->SetVal(val_mgr->Bool(! has_logger));
return true;
}
void SupervisedNode::Init(Options* options) const {
const auto& node_name = config.name;
@ -1546,7 +1442,7 @@ void SupervisedNode::Init(Options* options) const {
}
}
if ( ! config.cluster.empty() ) {
if ( ! config.cluster.empty() && config.cluster != "{}" ) {
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 ) {
fprintf(stderr, "node '%s' failed to setenv: %s\n", node_name.data(), strerror(errno));
exit(1);

View file

@ -110,35 +110,6 @@ public:
std::string zeek_exe_path;
};
/**
* Configuration options that influence how a Supervised Zeek node
* integrates into the normal Zeek Cluster Framework.
*/
struct ClusterEndpoint {
/**
* The node's role within the cluster. E.g. manager, logger, worker.
*/
BifEnum::Supervisor::ClusterRole role;
/**
* The TCP port number at which the cluster node listens for connections.
*/
int port;
/**
* The host/IP at which the cluster node is listening for connections.
*/
std::string host;
/**
* The interface name from which the node read/analyze packets.
* Typically used by worker nodes.
*/
std::optional<std::string> interface;
/**
* The PCAP file name from which the node read/analyze packets.
* Typically used by worker nodes.
*/
std::optional<std::string> pcap_file;
};
/**
* Configuration options that influence behavior of a Supervised Zeek node.
*/
@ -237,11 +208,12 @@ public:
*/
std::map<std::string, std::string> env;
/**
* The Cluster Layout definition. Each node in the Cluster Framework
* knows about the full, static cluster topology to which it belongs.
* Entries in the map use node names for keys.
* The cluster layout definition. Each node in the Cluster Framework
* knows the full, static cluster topology to which it belongs. The
* layout is encoded as the JSON map resulting from ToJSON() on the
* corresponding cluster table in the script layer's NodeConfig record.
*/
std::map<std::string, ClusterEndpoint> cluster;
std::string cluster;
};
/**

View file

@ -14,7 +14,6 @@ enum ClusterRole %{
WORKER,
%}
type Supervisor::ClusterEndpoint: record;
type Supervisor::Status: record;
type Supervisor::NodeConfig: record;
type Supervisor::NodeStatus: record;
@ -66,14 +65,6 @@ function Supervisor::__restart%(node: string%): bool
return zeek::val_mgr->Bool(rval);
%}
function Supervisor::__init_cluster%(%): bool
%{
if ( zeek::Supervisor::ThisNode() )
return zeek::val_mgr->Bool(zeek::Supervisor::ThisNode()->InitCluster());
return zeek::val_mgr->Bool(false);
%}
function Supervisor::__is_supervised%(%): bool
%{
return zeek::val_mgr->Bool(zeek::Supervisor::ThisNode().has_value());

View file

@ -5,6 +5,7 @@
-./frameworks/cluster/nodes/proxy.zeek
-./frameworks/cluster/nodes/worker.zeek
-./frameworks/cluster/setup-connections.zeek
-./frameworks/cluster/supervisor.zeek
-./frameworks/intel/cluster.zeek
-./frameworks/netcontrol/cluster.zeek
-./frameworks/openflow/cluster.zeek