zeek/scripts/base/frameworks/cluster/main.zeek
Christian Kreibich 68fadd0464 Lower listen/connect retry intervals in Broker and the cluster framework to 1sec
The former defaults (30sec, 1min) can slow down cluster startup and recovery
considerably, and other systems have more aggressive intervals still.
2025-04-25 10:22:35 -07:00

688 lines
23 KiB
Text

##! A framework for establishing and controlling a cluster of Zeek instances.
##! In order to use the cluster framework, a script named
##! ``cluster-layout.zeek`` must exist somewhere in Zeek's script search path
##! which has a cluster definition of the :zeek:id:`Cluster::nodes` variable.
##! The ``CLUSTER_NODE`` environment variable or :zeek:id:`Cluster::node`
##! must also be sent and the cluster framework loaded as a package like
##! ``@load base/frameworks/cluster``.
##!
##! .. warning::
##!
##! The file ``cluster-layout.zeek`` should only contain the definition
##! of :zeek:id:`Cluster::nodes`. Specifically, avoid loading other Zeek
##! scripts or using :zeek:see:`redef` for anything but :zeek:id:`Cluster::nodes`.
##!
##! Due to ``cluster-layout.zeek`` being loaded very early, it is easy to
##! introduce circular loading issues.
@load base/frameworks/control
@load base/frameworks/broker
module Cluster;
export {
## Whether to distribute log messages among available logging nodes.
const enable_round_robin_logging = T &redef;
## The topic name used for exchanging messages that are relevant to
## logger nodes in a cluster. Used with broker-enabled cluster communication.
const logger_topic = "zeek/cluster/logger" &redef;
## The topic name used for exchanging messages that are relevant to
## manager nodes in a cluster. Used with broker-enabled cluster communication.
const manager_topic = "zeek/cluster/manager" &redef;
## The topic name used for exchanging messages that are relevant to
## proxy nodes in a cluster. Used with broker-enabled cluster communication.
const proxy_topic = "zeek/cluster/proxy" &redef;
## The topic name used for exchanging messages that are relevant to
## worker nodes in a cluster. Used with broker-enabled cluster communication.
const worker_topic = "zeek/cluster/worker" &redef;
## A set of topic names to be used for broadcasting messages that are
## relevant to all nodes in a cluster. Currently, there is not a common
## topic to broadcast to, because enabling implicit Broker forwarding would
## cause a routing loop for this topic.
const broadcast_topics = {
logger_topic,
manager_topic,
proxy_topic,
worker_topic,
};
## The topic prefix used for exchanging messages that are relevant to
## a named node in a cluster. Used with broker-enabled cluster communication.
const node_topic_prefix = "zeek/cluster/node/" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a unique node in a cluster. Used with broker-enabled cluster communication.
const nodeid_topic_prefix = "zeek/cluster/nodeid/" &redef;
## Name of the node on which master data stores will be created if no other
## has already been specified by the user in :zeek:see:`Cluster::stores`.
## An empty value means "use whatever name corresponds to the manager
## node".
const default_master_node = "" &redef;
## The type of data store backend that will be used for all data stores if
## no other has already been specified by the user in :zeek:see:`Cluster::stores`.
const default_backend = Broker::MEMORY &redef;
## The type of persistent data store backend that will be used for all data
## stores if no other has already been specified by the user in
## :zeek:see:`Cluster::stores`. This will be used when script authors call
## :zeek:see:`Cluster::create_store` with the *persistent* argument set true.
const default_persistent_backend = Broker::SQLITE &redef;
## The default maximum queue size for WebSocket event dispatcher instances.
##
## If the maximum queue size is reached, events from external WebSocket
## clients will be stalled and processed once the queue has been drained.
##
## An internal metric named ``cluster_onloop_queue_stalls`` and
## labeled with a ``WebSocketEventDispatcher:<host>:<port>`` tag
## is incremented when the maximum queue size is reached.
const default_websocket_max_event_queue_size = 32 &redef;
## Setting a default dir will, for persistent backends that have not
## been given an explicit file path via :zeek:see:`Cluster::stores`,
## automatically create a path within this dir that is based on the name of
## the data store.
const default_store_dir = "" &redef;
## Information regarding a cluster-enabled data store.
type StoreInfo: record {
## The name of the data store.
name: string &optional;
## The store handle.
store: opaque of Broker::Store &optional;
## The name of the cluster node on which the master version of the data
## store resides.
master_node: string &default=default_master_node;
## Whether the data store is the master version or a clone.
master: bool &default=F;
## The type of backend used for storing data.
backend: Broker::BackendType &default=default_backend;
## Parameters used for configuring the backend.
options: Broker::BackendOptions &default=Broker::BackendOptions();
## A resync/reconnect interval to pass through to
## :zeek:see:`Broker::create_clone`.
clone_resync_interval: interval &default=Broker::default_clone_resync_interval;
## A staleness duration to pass through to
## :zeek:see:`Broker::create_clone`.
clone_stale_interval: interval &default=Broker::default_clone_stale_interval;
## A mutation buffer interval to pass through to
## :zeek:see:`Broker::create_clone`.
clone_mutation_buffer_interval: interval &default=Broker::default_clone_mutation_buffer_interval;
};
## A table of cluster-enabled data stores that have been created, indexed
## by their name. This table will be populated automatically by
## :zeek:see:`Cluster::create_store`, but if you need to customize
## the options related to a particular data store, you may redef this
## table. Calls to :zeek:see:`Cluster::create_store` will first check
## the table for an entry of the same name and, if found, will use the
## predefined options there when setting up the store.
global stores: table[string] of StoreInfo &default=StoreInfo() &redef;
## Sets up a cluster-enabled data store. They will also still properly
## function for uses that are not operating a cluster.
##
## name: the name of the data store to create.
##
## persistent: whether the data store must be persistent.
##
## Returns: the store's information. For master stores, the store will be
## ready to use immediately. For clones, the store field will not
## be set until the node containing the master store has connected.
global create_store: function(name: string, persistent: bool &default=F): StoreInfo;
## The cluster logging stream identifier.
redef enum Log::ID += { LOG };
## A default logging policy hook for the stream.
global log_policy: Log::PolicyHook;
## The record type which contains the column fields of the cluster log.
type Info: record {
## The time at which a cluster message was generated.
ts: time;
## The name of the node that is creating the log record.
node: string;
## A message indicating information about the cluster's operation.
message: string;
} &log;
## Types of nodes that are allowed to participate in the cluster
## configuration.
type NodeType: enum {
## A dummy node type indicating the local node is not operating
## within a cluster.
NONE,
## A node type which is allowed to view/manipulate the configuration
## of other nodes in the cluster.
CONTROL,
## A node type responsible for log management.
LOGGER,
## A node type responsible for policy management.
MANAGER,
## A node type for relaying worker node communication and synchronizing
## worker node state.
PROXY,
## The node type doing all the actual traffic analysis.
WORKER,
};
## Record type to indicate a node in a cluster.
type Node: record {
## Identifies the type of cluster node in this node's configuration.
node_type: NodeType;
## The IP address of the cluster node.
ip: addr;
## If the *ip* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &default="";
## The port that this node will listen on for peer connections.
## A value of ``0/unknown`` means the node is not pre-configured to listen.
p: port &default=0/unknown;
## Name of the manager node this node uses. For workers and proxies.
manager: string &optional;
## A unique identifier assigned to the node by the broker framework.
## This field is only set while a node is connected.
id: string &optional;
## The port used to expose metrics to Prometheus. Setting this in a cluster
## configuration will override the setting for Telemetry::metrics_port for
## the node.
metrics_port: port &optional;
};
## Record to represent a cluster node including its name.
type NamedNode: record {
name: string;
node: Node;
};
## This function can be called at any time to determine if the cluster
## framework is being enabled for this run.
##
## Returns: True if :zeek:id:`Cluster::node` has been set.
global is_enabled: function(): bool;
## This function can be called at any time to determine what type of
## cluster node the current Zeek instance is going to be acting as.
## If :zeek:id:`Cluster::is_enabled` returns false, then
## :zeek:enum:`Cluster::NONE` is returned.
##
## Returns: The :zeek:type:`Cluster::NodeType` the calling node acts as.
global local_node_type: function(): NodeType;
## This function can be called at any time to determine the configured
## metrics port for Prometheus being used by current Zeek instance. If
## :zeek:id:`Cluster::is_enabled` returns false or the node isn't found,
## ``0/unknown`` is returned.
##
## Returns: The metrics port used by the calling node.
global local_node_metrics_port: function(): port;
## The cluster layout definition. This should be placed into a filter
## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be
## automatically loaded if the CLUSTER_NODE environment variable is set.
## Note that ZeekControl handles all of this automatically.
## The table is typically indexed by node names/labels (e.g. "manager"
## or "worker-1").
const nodes: table[string] of Node = {} &redef;
## Returns the number of nodes defined in the cluster layout for a given
## node type.
global get_node_count: function(node_type: NodeType): count;
## Returns the number of nodes per type, the calling node is currently
## connected to. This is primarily intended for use by the manager to find
## out how many nodes should be responding to requests.
global get_active_node_count: function(node_type: NodeType): count;
## Indicates whether or not the manager will act as the logger and receive
## logs. This value should be set in the cluster-layout.zeek script (the
## value should be true only if no logger is specified in Cluster::nodes).
## Note that ZeekControl handles this automatically.
const manager_is_logger = T &redef;
## This is usually supplied on the command line for each instance
## of the cluster that is started up.
const node = getenv("CLUSTER_NODE") &redef;
## Function returning this node's identifier.
##
## By default this is :zeek:see:`Broker::node_id`, but can be
## redefined by other cluster backends. This identifier should be
## a short lived identifier that resets when a node is restarted.
global node_id: function(): string = Broker::node_id &redef;
## Interval for retrying failed connections between cluster nodes.
## If set, the ZEEK_DEFAULT_CONNECT_RETRY (given in number of seconds)
## environment variable overrides this option.
const retry_interval = 1sec &redef;
## When using broker-enabled cluster framework, nodes broadcast this event
## to exchange their user-defined name along with a string that uniquely
## identifies it for the duration of its lifetime. This string may change
## if the node dies and has to reconnect later.
global hello: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a cluster node connects or reconnects.
global node_up: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a connected cluster node becomes disconnected.
global node_down: event(name: string, id: string);
## Write a message to the cluster logging stream.
global log: function(msg: string);
## Retrieve the topic associated with a specific node in the cluster.
##
## name: the name of the cluster node (e.g. "manager").
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global node_topic: function(name: string): string &redef;
## Retrieve the topic associated with a specific node in the cluster.
##
## id: the id of the cluster node (from :zeek:see:`Broker::EndpointInfo`
## or :zeek:see:`Broker::node_id`.
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string &redef;
## Retrieve the cluster-level naming of a node based on its node ID,
## a backend-specific identifier.
##
## id: the node ID of a peer.
##
## Returns: the :zeek:see:`Cluster::NamedNode` for the requested node, if
## known, otherwise a "null" instance with an empty name field.
global nodeid_to_node: function(id: string): NamedNode;
## Initialize the cluster backend.
##
## Cluster backends usually invoke this from a :zeek:see:`zeek_init` handler.
##
## Returns: T on success, else F.
global init: function(): bool;
## Subscribe to the given topic.
##
## topic: The topic to subscribe to.
##
## Returns: T on success, else F.
global subscribe: function(topic: string): bool;
## Unsubscribe from the given topic.
##
## topic: The topic to unsubscribe from.
##
## Returns: T on success, else F.
global unsubscribe: function(topic: string): bool;
## An event instance for cluster pub/sub.
##
## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`.
type Event: record {
## The event handler to be invoked on the remote node.
ev: any;
## The arguments for the event.
args: vector of any;
};
## The TLS options for a WebSocket server.
##
## If cert_file and key_file are set, TLS is enabled. If both
## are unset, TLS is disabled. Any other combination is an error.
type WebSocketTLSOptions: record {
## The cert file to use.
cert_file: string &optional;
## The key file to use.
key_file: string &optional;
## Expect peers to send client certificates.
enable_peer_verification: bool &default=F;
## The CA certificate or CA bundle used for peer verification.
## Empty will use the implementations's default when
## ``enable_peer_verification`` is T.
ca_file: string &default="";
## The ciphers to use. Empty will use the implementation's defaults.
ciphers: string &default="";
};
## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`.
type WebSocketServerOptions: record {
## The host address to listen on.
listen_host: string;
## The port the WebSocket server is supposed to listen on.
listen_port: port;
## The maximum event queue size for this server.
max_event_queue_size: count &default=default_websocket_max_event_queue_size;
## The TLS options used for this WebSocket server. By default,
## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`.
tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions();
};
## Start listening on a WebSocket address.
##
## options: The server :zeek:see:`Cluster::WebSocketServerOptions` to use.
##
## Returns: T on success, else F.
global listen_websocket: function(options: WebSocketServerOptions): bool;
## Network information of an endpoint.
type NetworkInfo: record {
## The IP address or hostname where the endpoint listens.
address: string;
## The port where the endpoint is bound to.
bound_port: port;
};
## Information about a WebSocket endpoint.
type EndpointInfo: record {
id: string;
network: NetworkInfo;
};
}
# Needs declaration of Cluster::Event type.
@load base/bif/cluster.bif
@load base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek
# Track active nodes per type.
global active_node_ids: table[NodeType] of set[string];
function nodes_with_type(node_type: NodeType): vector of NamedNode
{
local rval: vector of NamedNode = vector();
for ( name, n in Cluster::nodes )
{
if ( n$node_type != node_type )
next;
rval += NamedNode($name=name, $node=n);
}
return sort(rval, function(n1: NamedNode, n2: NamedNode): int
{ return strcmp(n1$name, n2$name); });
}
function get_node_count(node_type: NodeType): count
{
local cnt = 0;
for ( _, n in nodes )
{
if ( n$node_type == node_type )
++cnt;
}
return cnt;
}
function get_active_node_count(node_type: NodeType): count
{
return node_type in active_node_ids ? |active_node_ids[node_type]| : 0;
}
function is_enabled(): bool
{
return (node != "");
}
function local_node_type(): NodeType
{
if ( ! is_enabled() )
return NONE;
if ( node !in nodes )
return NONE;
return nodes[node]$node_type;
}
function local_node_metrics_port(): port
{
if ( ! is_enabled() )
return 0/unknown;
if ( node !in nodes )
return 0/unknown;
if ( ! nodes[node]?$metrics_port )
return 0/unknown;
return nodes[node]$metrics_port;
}
function node_topic(name: string): string
{
return node_topic_prefix + name + "/";
}
function nodeid_topic(id: string): string
{
return nodeid_topic_prefix + id + "/";
}
function nodeid_to_node(id: string): NamedNode
{
for ( name, n in nodes )
{
if ( n?$id && n$id == id )
return NamedNode($name=name, $node=n);
}
return NamedNode($name="", $node=[$node_type=NONE, $ip=0.0.0.0]);
}
event Cluster::hello(name: string, id: string) &priority=10
{
if ( name !in nodes )
{
Reporter::error(fmt("Got Cluster::hello msg from unexpected node: %s", name));
return;
}
local n = nodes[name];
if ( n?$id )
{
if ( n$id != id )
Reporter::error(fmt("Got Cluster::hello msg from duplicate node:%s",
name));
}
else
event Cluster::node_up(name, id);
n$id = id;
Cluster::log(fmt("got hello from %s (%s)", name, id));
if ( n$node_type !in active_node_ids )
active_node_ids[n$node_type] = set();
add active_node_ids[n$node_type][id];
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Cluster::node_id());
Broker::publish(nodeid_topic(endpoint$id), e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
for ( node_name, n in nodes )
{
if ( n?$id && n$id == endpoint$id )
{
event Cluster::node_down(node_name, endpoint$id);
break;
}
}
}
event node_down(name: string, id: string) &priority=10
{
local found = F;
for ( node_name, n in nodes )
{
if ( n?$id && n$id == id )
{
Cluster::log(fmt("node down: %s", node_name));
delete n$id;
delete active_node_ids[n$node_type][id];
found = T;
break;
}
}
if ( ! found )
Reporter::error(fmt("No node found in Cluster::node_down() node:%s id:%s",
name, id));
}
event zeek_init() &priority=5
{
# If a node is given, but it's an unknown name we need to fail.
if ( node != "" && node !in nodes )
{
Reporter::error(fmt("'%s' is not a valid node in the Cluster::nodes configuration", node));
terminate();
}
Log::create_stream(Cluster::LOG, [$columns=Info, $path="cluster", $policy=log_policy]);
}
function create_store(name: string, persistent: bool &default=F): Cluster::StoreInfo
{
local info = stores[name];
info$name = name;
if ( Cluster::default_store_dir != "" )
{
local default_options = Broker::BackendOptions();
local path = Cluster::default_store_dir + "/" + name;
if ( info$options$sqlite$path == default_options$sqlite$path )
info$options$sqlite$path = path + ".sqlite";
}
if ( persistent )
{
switch ( info$backend ) {
case Broker::MEMORY:
info$backend = Cluster::default_persistent_backend;
break;
case Broker::SQLITE:
# no-op: user already asked for a specific persistent backend.
break;
default:
Reporter::error(fmt("unhandled data store type: %s", info$backend));
break;
}
}
if ( ! Cluster::is_enabled() )
{
if ( info?$store )
{
Reporter::warning(fmt("duplicate cluster store creation for %s", name));
return info;
}
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
return info;
}
if ( info$master_node == "" )
{
local mgr_nodes = nodes_with_type(Cluster::MANAGER);
if ( |mgr_nodes| == 0 )
Reporter::fatal(fmt("empty master node name for cluster store " +
"'%s', but there's no manager node to default",
name));
info$master_node = mgr_nodes[0]$name;
}
else if ( info$master_node !in Cluster::nodes )
Reporter::fatal(fmt("master node '%s' for cluster store '%s' does not exist",
info$master_node, name));
if ( Cluster::node == info$master_node )
{
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
Cluster::log(fmt("created master store: %s", name));
return info;
}
info$master = F;
stores[name] = info;
info$store = Broker::create_clone(info$name,
info$clone_resync_interval,
info$clone_stale_interval,
info$clone_mutation_buffer_interval);
Cluster::log(fmt("created clone store: %s", info$name));
return info;
}
function log(msg: string)
{
Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]);
}
function init(): bool
{
return Cluster::Backend::__init(Cluster::node_id());
}
function subscribe(topic: string): bool
{
return Cluster::__subscribe(topic);
}
function unsubscribe(topic: string): bool
{
return Cluster::__unsubscribe(topic);
}
function listen_websocket(options: WebSocketServerOptions): bool
{
return Cluster::__listen_websocket(options);
}
event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec)
{
local msg = fmt("WebSocket client '%s' (%s:%d) subscribed to %s",
endpoint$id, endpoint$network$address, endpoint$network$bound_port, subscriptions);
Cluster::log(msg);
}
event websocket_client_lost(endpoint: EndpointInfo)
{
local msg = fmt("WebSocket client '%s' (%s:%d) gone",
endpoint$id, endpoint$network$address, endpoint$network$bound_port);
Cluster::log(msg);
}
# If a backend reports an error, propagate it via a reporter error message.
event Cluster::Backend::error(tag: string, message: string)
{
local msg = fmt("Cluster::Backend::error: %s (%s)", tag, message);
Reporter::error(msg);
}