mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Remove the Supervisor's internal ClusterEndpoint struct.
This eliminates one place in which we currently need to mirror changes to the script-land Cluster::Node record. Instead of keeping an exact in-core equivalent, the Supervisor now treats the data structure as opaque, and stores the whole cluster table as a JSON string. We may replace the script-layer Supervisor::ClusterEndpoint in the future, using Cluster::Node directly. But that's a more invasive change that will affect how people invoke Supervisor::create() and similars. Relying on JSON for serialization has the side-effect of removing the Supervisor's earlier quirk of using 0/tcp, not 0/unknown, to indicate unused ports in the Supervisor::ClusterEndpoint record.
This commit is contained in:
parent
a98ec6b08b
commit
737b1a2013
7 changed files with 51 additions and 181 deletions
4
NEWS
4
NEWS
|
@ -151,6 +151,10 @@ Changed Functionality
|
|||
it aligns with the same requirement for traditional analyzers and
|
||||
enables customizing file handles for protocol-specific semantics.
|
||||
|
||||
- The Supervisor's API now returns NodeConfig records with a cluster table whose
|
||||
ClusterEndpoints have a port value of 0/unknown, rather than 0/tcp, to
|
||||
indicate that the node in question has no listening port.
|
||||
|
||||
Removed Functionality
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -14,14 +14,21 @@ redef Broker::log_topic = Cluster::rr_log_topic;
|
|||
# Add a cluster prefix.
|
||||
@prefixes += cluster
|
||||
|
||||
# If this script isn't found anywhere, the cluster bombs out.
|
||||
# Loading the cluster framework requires that a script by this name exists
|
||||
# somewhere in the ZEEKPATH. The only thing in the file should be the
|
||||
# cluster definition in the :zeek:id:`Cluster::nodes` variable.
|
||||
@if ( Supervisor::is_supervised() )
|
||||
# When running a supervised cluster, populate Cluster::nodes from the node table
|
||||
# the Supervisor provides to new Zeek nodes. The management framework configures
|
||||
# the cluster this way.
|
||||
@load ./supervisor
|
||||
@if ( Cluster::Supervisor::__init_cluster_nodes() && Cluster::get_node_count(Cluster::LOGGER) > 0 )
|
||||
redef Cluster::manager_is_logger = F;
|
||||
@endif
|
||||
@endif
|
||||
|
||||
@if ( ! Supervisor::__init_cluster() )
|
||||
# When running a supervised cluster, Cluster::nodes is instead populated
|
||||
# from the internal C++-layer directly via the above BIF.
|
||||
@if ( |Cluster::nodes| == 0 )
|
||||
# Fall back to loading a cluster topology from cluster-layout.zeek. If Zeek
|
||||
# cannot find this script in your ZEEKPATH, it will exit. The script should only
|
||||
# contain the cluster definition in the :zeek:id:`Cluster::nodes` variable.
|
||||
# The zeekctl tool manages this file for you.
|
||||
@load cluster-layout
|
||||
@endif
|
||||
|
||||
|
|
|
@ -625,10 +625,9 @@ function get_nodes_request_finish(areq: Management::Request::Request)
|
|||
if ( node in g_nodes )
|
||||
cns$state = g_nodes[node]$state;
|
||||
|
||||
# The supervisor's responses use 0/tcp (not 0/unknown)
|
||||
# when indicating an unused port because its internal
|
||||
# serialization always assumes TCP.
|
||||
if ( sns$node$cluster[node]$p != 0/tcp )
|
||||
# The supervisor's responses use 0/unknown to indicate
|
||||
# unused ports. (Prior to Zeek 7 this used to be 0/tcp.)
|
||||
if ( sns$node$cluster[node]$p != 0/unknown )
|
||||
cns$p = sns$node$cluster[node]$p;
|
||||
}
|
||||
else
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
#define RAPIDJSON_HAS_STDSTRING 1
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <rapidjson/writer.h>
|
||||
|
||||
extern "C" {
|
||||
#include "zeek/3rdparty/setsignal.h"
|
||||
|
@ -1243,34 +1245,9 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromRecord(const RecordVal* node)
|
|||
rval.env[name] = v->GetVal()->AsStringVal()->ToStdString();
|
||||
}
|
||||
|
||||
auto cluster_table_val = node->GetField("cluster")->AsTableVal();
|
||||
auto cluster_table = cluster_table_val->AsTable();
|
||||
|
||||
for ( const auto& cte : *cluster_table ) {
|
||||
auto k = cte.GetHashKey();
|
||||
auto* v = cte.value;
|
||||
|
||||
auto key = cluster_table_val->RecreateIndex(*k);
|
||||
auto name = key->Idx(0)->AsStringVal()->ToStdString();
|
||||
auto rv = v->GetVal()->AsRecordVal();
|
||||
|
||||
Supervisor::ClusterEndpoint ep;
|
||||
ep.role = static_cast<BifEnum::Supervisor::ClusterRole>(rv->GetFieldAs<EnumVal>("role"));
|
||||
ep.host = rv->GetFieldAs<AddrVal>("host").AsString();
|
||||
ep.port = rv->GetFieldAs<PortVal>("p")->Port();
|
||||
|
||||
const auto& iface = rv->GetField("interface");
|
||||
|
||||
if ( iface )
|
||||
ep.interface = iface->AsStringVal()->ToStdString();
|
||||
|
||||
const auto& pcap_file = rv->GetField("pcap_file");
|
||||
|
||||
if ( pcap_file )
|
||||
ep.pcap_file = pcap_file->AsStringVal()->ToStdString();
|
||||
|
||||
rval.cluster.emplace(name, std::move(ep));
|
||||
}
|
||||
auto cluster_table_val = node->GetField("cluster");
|
||||
auto re = std::make_unique<RE_Matcher>("^_");
|
||||
rval.cluster = cluster_table_val->ToJSON(false, re.get())->ToStdString();
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
@ -1319,26 +1296,10 @@ Supervisor::NodeConfig Supervisor::NodeConfig::FromJSON(std::string_view json) {
|
|||
|
||||
auto& cluster = j["cluster"];
|
||||
|
||||
for ( auto it = cluster.MemberBegin(); it != cluster.MemberEnd(); ++it ) {
|
||||
Supervisor::ClusterEndpoint ep;
|
||||
|
||||
auto key = it->name.GetString();
|
||||
auto& val = it->value;
|
||||
|
||||
auto& role_str = val["role"];
|
||||
ep.role = role_str_to_enum(role_str.GetString());
|
||||
|
||||
ep.host = val["host"].GetString();
|
||||
ep.port = val["p"]["port"].GetInt();
|
||||
|
||||
if ( auto it = val.FindMember("interface"); it != val.MemberEnd() )
|
||||
ep.interface = it->value.GetString();
|
||||
|
||||
if ( auto it = val.FindMember("pcap_file"); it != val.MemberEnd() )
|
||||
ep.pcap_file = it->value.GetString();
|
||||
|
||||
rval.cluster.emplace(key, std::move(ep));
|
||||
}
|
||||
rapidjson::StringBuffer sb;
|
||||
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
|
||||
cluster.Accept(writer);
|
||||
rval.cluster = sb.GetString();
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
@ -1349,7 +1310,7 @@ std::string Supervisor::NodeConfig::ToJSON() const {
|
|||
}
|
||||
|
||||
RecordValPtr Supervisor::NodeConfig::ToRecord() const {
|
||||
const auto& rt = BifType::Record::Supervisor::NodeConfig;
|
||||
const auto& rt = id::find_type<RecordType>("Supervisor::NodeConfig");
|
||||
auto rval = make_intrusive<RecordVal>(rt);
|
||||
rval->AssignField("name", name);
|
||||
|
||||
|
@ -1401,27 +1362,18 @@ RecordValPtr Supervisor::NodeConfig::ToRecord() const {
|
|||
}
|
||||
|
||||
auto tt = rt->GetFieldType<TableType>("cluster");
|
||||
auto cluster_val = make_intrusive<TableVal>(std::move(tt));
|
||||
rval->AssignField("cluster", cluster_val);
|
||||
|
||||
for ( const auto& e : cluster ) {
|
||||
auto& name = e.first;
|
||||
auto& ep = e.second;
|
||||
auto key = make_intrusive<StringVal>(name);
|
||||
const auto& ept = BifType::Record::Supervisor::ClusterEndpoint;
|
||||
auto val = make_intrusive<RecordVal>(ept);
|
||||
|
||||
val->AssignField("role", BifType::Enum::Supervisor::ClusterRole->GetEnumVal(ep.role));
|
||||
val->AssignField("host", make_intrusive<AddrVal>(ep.host));
|
||||
val->AssignField("p", val_mgr->Port(ep.port, TRANSPORT_TCP));
|
||||
|
||||
if ( ep.interface )
|
||||
val->AssignField("interface", *ep.interface);
|
||||
|
||||
if ( ep.pcap_file )
|
||||
val->AssignField("pcap_file", *ep.pcap_file);
|
||||
|
||||
cluster_val->Assign(std::move(key), std::move(val));
|
||||
auto json_res = detail::ValFromJSON(cluster, tt, Func::nil);
|
||||
if ( auto val = std::get_if<ValPtr>(&json_res) ) {
|
||||
rval->AssignField("cluster", *val);
|
||||
}
|
||||
else {
|
||||
// This should never happen: the JSON data comes from a table[string] of
|
||||
// ClusterEndpoint and should therefore allow instantiation. Exiting
|
||||
// here can be hard to debug. Other JSON code (see FromJSON()) fails
|
||||
// silently when the JSON is misformatted. We just warn:
|
||||
fprintf(stderr, "Could not parse %s's cluster table from '%s': %s\n", name.c_str(), cluster.c_str(),
|
||||
std::get<std::string>(json_res).c_str());
|
||||
rval->AssignField("cluster", make_intrusive<TableVal>(std::move(tt)));
|
||||
}
|
||||
|
||||
return rval;
|
||||
|
@ -1439,62 +1391,6 @@ RecordValPtr SupervisorNode::ToRecord() const {
|
|||
return rval;
|
||||
}
|
||||
|
||||
static ValPtr supervisor_role_to_cluster_node_type(BifEnum::Supervisor::ClusterRole role) {
|
||||
static auto node_type = id::find_type<zeek::EnumType>("Cluster::NodeType");
|
||||
|
||||
switch ( role ) {
|
||||
case BifEnum::Supervisor::LOGGER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "LOGGER"));
|
||||
case BifEnum::Supervisor::MANAGER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "MANAGER"));
|
||||
case BifEnum::Supervisor::PROXY: return node_type->GetEnumVal(node_type->Lookup("Cluster", "PROXY"));
|
||||
case BifEnum::Supervisor::WORKER: return node_type->GetEnumVal(node_type->Lookup("Cluster", "WORKER"));
|
||||
default: return node_type->GetEnumVal(node_type->Lookup("Cluster", "NONE"));
|
||||
}
|
||||
}
|
||||
|
||||
bool SupervisedNode::InitCluster() const {
|
||||
if ( config.cluster.empty() )
|
||||
return false;
|
||||
|
||||
const auto& cluster_node_type = id::find_type<RecordType>("Cluster::Node");
|
||||
const auto& cluster_nodes_id = id::find("Cluster::nodes");
|
||||
const auto& cluster_manager_is_logger_id = id::find("Cluster::manager_is_logger");
|
||||
auto cluster_nodes = cluster_nodes_id->GetVal()->AsTableVal();
|
||||
auto has_logger = false;
|
||||
std::optional<std::string> manager_name;
|
||||
|
||||
for ( const auto& e : config.cluster ) {
|
||||
if ( e.second.role == BifEnum::Supervisor::MANAGER )
|
||||
manager_name = e.first;
|
||||
else if ( e.second.role == BifEnum::Supervisor::LOGGER )
|
||||
has_logger = true;
|
||||
}
|
||||
|
||||
for ( const auto& e : config.cluster ) {
|
||||
const auto& node_name = e.first;
|
||||
const auto& ep = e.second;
|
||||
|
||||
auto key = make_intrusive<StringVal>(node_name);
|
||||
auto val = make_intrusive<RecordVal>(cluster_node_type);
|
||||
|
||||
auto node_type = supervisor_role_to_cluster_node_type(ep.role);
|
||||
val->AssignField("node_type", std::move(node_type));
|
||||
val->AssignField("ip", make_intrusive<AddrVal>(ep.host));
|
||||
val->AssignField("p", val_mgr->Port(ep.port, TRANSPORT_TCP));
|
||||
|
||||
// Remove in v7.1: Interface removed from Cluster::Node.
|
||||
if ( ep.interface )
|
||||
val->AssignField("interface", *ep.interface);
|
||||
|
||||
if ( manager_name && ep.role != BifEnum::Supervisor::MANAGER )
|
||||
val->AssignField("manager", *manager_name);
|
||||
|
||||
cluster_nodes->Assign(std::move(key), std::move(val));
|
||||
}
|
||||
|
||||
cluster_manager_is_logger_id->SetVal(val_mgr->Bool(! has_logger));
|
||||
return true;
|
||||
}
|
||||
|
||||
void SupervisedNode::Init(Options* options) const {
|
||||
const auto& node_name = config.name;
|
||||
|
||||
|
@ -1546,7 +1442,7 @@ void SupervisedNode::Init(Options* options) const {
|
|||
}
|
||||
}
|
||||
|
||||
if ( ! config.cluster.empty() ) {
|
||||
if ( ! config.cluster.empty() && config.cluster != "{}" ) {
|
||||
if ( setenv("CLUSTER_NODE", node_name.data(), true) == -1 ) {
|
||||
fprintf(stderr, "node '%s' failed to setenv: %s\n", node_name.data(), strerror(errno));
|
||||
exit(1);
|
||||
|
|
|
@ -110,35 +110,6 @@ public:
|
|||
std::string zeek_exe_path;
|
||||
};
|
||||
|
||||
/**
|
||||
* Configuration options that influence how a Supervised Zeek node
|
||||
* integrates into the normal Zeek Cluster Framework.
|
||||
*/
|
||||
struct ClusterEndpoint {
|
||||
/**
|
||||
* The node's role within the cluster. E.g. manager, logger, worker.
|
||||
*/
|
||||
BifEnum::Supervisor::ClusterRole role;
|
||||
/**
|
||||
* The TCP port number at which the cluster node listens for connections.
|
||||
*/
|
||||
int port;
|
||||
/**
|
||||
* The host/IP at which the cluster node is listening for connections.
|
||||
*/
|
||||
std::string host;
|
||||
/**
|
||||
* The interface name from which the node read/analyze packets.
|
||||
* Typically used by worker nodes.
|
||||
*/
|
||||
std::optional<std::string> interface;
|
||||
/**
|
||||
* The PCAP file name from which the node read/analyze packets.
|
||||
* Typically used by worker nodes.
|
||||
*/
|
||||
std::optional<std::string> pcap_file;
|
||||
};
|
||||
|
||||
/**
|
||||
* Configuration options that influence behavior of a Supervised Zeek node.
|
||||
*/
|
||||
|
@ -237,11 +208,12 @@ public:
|
|||
*/
|
||||
std::map<std::string, std::string> env;
|
||||
/**
|
||||
* The Cluster Layout definition. Each node in the Cluster Framework
|
||||
* knows about the full, static cluster topology to which it belongs.
|
||||
* Entries in the map use node names for keys.
|
||||
* The cluster layout definition. Each node in the Cluster Framework
|
||||
* knows the full, static cluster topology to which it belongs. The
|
||||
* layout is encoded as the JSON map resulting from ToJSON() on the
|
||||
* corresponding cluster table in the script layer's NodeConfig record.
|
||||
*/
|
||||
std::map<std::string, ClusterEndpoint> cluster;
|
||||
std::string cluster;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -14,7 +14,6 @@ enum ClusterRole %{
|
|||
WORKER,
|
||||
%}
|
||||
|
||||
type Supervisor::ClusterEndpoint: record;
|
||||
type Supervisor::Status: record;
|
||||
type Supervisor::NodeConfig: record;
|
||||
type Supervisor::NodeStatus: record;
|
||||
|
@ -66,14 +65,6 @@ function Supervisor::__restart%(node: string%): bool
|
|||
return zeek::val_mgr->Bool(rval);
|
||||
%}
|
||||
|
||||
function Supervisor::__init_cluster%(%): bool
|
||||
%{
|
||||
if ( zeek::Supervisor::ThisNode() )
|
||||
return zeek::val_mgr->Bool(zeek::Supervisor::ThisNode()->InitCluster());
|
||||
|
||||
return zeek::val_mgr->Bool(false);
|
||||
%}
|
||||
|
||||
function Supervisor::__is_supervised%(%): bool
|
||||
%{
|
||||
return zeek::val_mgr->Bool(zeek::Supervisor::ThisNode().has_value());
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
-./frameworks/cluster/nodes/proxy.zeek
|
||||
-./frameworks/cluster/nodes/worker.zeek
|
||||
-./frameworks/cluster/setup-connections.zeek
|
||||
-./frameworks/cluster/supervisor.zeek
|
||||
-./frameworks/intel/cluster.zeek
|
||||
-./frameworks/netcontrol/cluster.zeek
|
||||
-./frameworks/openflow/cluster.zeek
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue