zeek/scripts/base/frameworks/cluster/main.zeek
Seth Hall cd330c801d
Apply suggestions from code review
Co-authored-by: Jon Siwek <jsiwek@corelight.com>
2020-10-13 16:48:15 -04:00

469 lines
16 KiB
Text

##! A framework for establishing and controlling a cluster of Zeek instances.
##! In order to use the cluster framework, a script named
##! ``cluster-layout.zeek`` must exist somewhere in Zeek's script search path
##! which has a cluster definition of the :zeek:id:`Cluster::nodes` variable.
##! The ``CLUSTER_NODE`` environment variable or :zeek:id:`Cluster::node`
##! must also be sent and the cluster framework loaded as a package like
##! ``@load base/frameworks/cluster``.
@load base/frameworks/control
@load base/frameworks/broker
module Cluster;
export {
## Whether to distribute log messages among available logging nodes.
const enable_round_robin_logging = T &redef;
## The topic name used for exchanging messages that are relevant to
## logger nodes in a cluster. Used with broker-enabled cluster communication.
const logger_topic = "zeek/cluster/logger" &redef;
## The topic name used for exchanging messages that are relevant to
## manager nodes in a cluster. Used with broker-enabled cluster communication.
const manager_topic = "zeek/cluster/manager" &redef;
## The topic name used for exchanging messages that are relevant to
## proxy nodes in a cluster. Used with broker-enabled cluster communication.
const proxy_topic = "zeek/cluster/proxy" &redef;
## The topic name used for exchanging messages that are relevant to
## worker nodes in a cluster. Used with broker-enabled cluster communication.
const worker_topic = "zeek/cluster/worker" &redef;
## The topic name used for exchanging messages that are relevant to
## time machine nodes in a cluster. Used with broker-enabled cluster communication.
const time_machine_topic = "zeek/cluster/time_machine" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a named node in a cluster. Used with broker-enabled cluster communication.
const node_topic_prefix = "zeek/cluster/node/" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a unique node in a cluster. Used with broker-enabled cluster communication.
const nodeid_topic_prefix = "zeek/cluster/nodeid/" &redef;
## Name of the node on which master data stores will be created if no other
## has already been specified by the user in :zeek:see:`Cluster::stores`.
## An empty value means "use whatever name corresponds to the manager
## node".
const default_master_node = "" &redef;
## The type of data store backend that will be used for all data stores if
## no other has already been specified by the user in :zeek:see:`Cluster::stores`.
const default_backend = Broker::MEMORY &redef;
## The type of persistent data store backend that will be used for all data
## stores if no other has already been specified by the user in
## :zeek:see:`Cluster::stores`. This will be used when script authors call
## :zeek:see:`Cluster::create_store` with the *persistent* argument set true.
const default_persistent_backend = Broker::SQLITE &redef;
## Setting a default dir will, for persistent backends that have not
## been given an explicit file path via :zeek:see:`Cluster::stores`,
## automatically create a path within this dir that is based on the name of
## the data store.
const default_store_dir = "" &redef;
## Information regarding a cluster-enabled data store.
type StoreInfo: record {
## The name of the data store.
name: string &optional;
## The store handle.
store: opaque of Broker::Store &optional;
## The name of the cluster node on which the master version of the data
## store resides.
master_node: string &default=default_master_node;
## Whether the data store is the master version or a clone.
master: bool &default=F;
## The type of backend used for storing data.
backend: Broker::BackendType &default=default_backend;
## Parameters used for configuring the backend.
options: Broker::BackendOptions &default=Broker::BackendOptions();
## A resync/reconnect interval to pass through to
## :zeek:see:`Broker::create_clone`.
clone_resync_interval: interval &default=Broker::default_clone_resync_interval;
## A staleness duration to pass through to
## :zeek:see:`Broker::create_clone`.
clone_stale_interval: interval &default=Broker::default_clone_stale_interval;
## A mutation buffer interval to pass through to
## :zeek:see:`Broker::create_clone`.
clone_mutation_buffer_interval: interval &default=Broker::default_clone_mutation_buffer_interval;
};
## A table of cluster-enabled data stores that have been created, indexed
## by their name. This table will be populated automatically by
## :zeek:see:`Cluster::create_store`, but if you need to customize
## the options related to a particular data store, you may redef this
## table. Calls to :zeek:see:`Cluster::create_store` will first check
## the table for an entry of the same name and, if found, will use the
## predefined options there when setting up the store.
global stores: table[string] of StoreInfo &default=StoreInfo() &redef;
## Sets up a cluster-enabled data store. They will also still properly
## function for uses that are not operating a cluster.
##
## name: the name of the data store to create.
##
## persistent: whether the data store must be persistent.
##
## Returns: the store's information. For master stores, the store will be
## ready to use immediately. For clones, the store field will not
## be set until the node containing the master store has connected.
global create_store: function(name: string, persistent: bool &default=F): StoreInfo;
## The cluster logging stream identifier.
redef enum Log::ID += { LOG };
## A default logging policy hook for the stream.
global log_policy: Log::PolicyHook;
## The record type which contains the column fields of the cluster log.
type Info: record {
## The time at which a cluster message was generated.
ts: time;
## The name of the node that is creating the log record.
node: string;
## A message indicating information about the cluster's operation.
message: string;
} &log;
## Types of nodes that are allowed to participate in the cluster
## configuration.
type NodeType: enum {
## A dummy node type indicating the local node is not operating
## within a cluster.
NONE,
## A node type which is allowed to view/manipulate the configuration
## of other nodes in the cluster.
CONTROL,
## A node type responsible for log management.
LOGGER,
## A node type responsible for policy management.
MANAGER,
## A node type for relaying worker node communication and synchronizing
## worker node state.
PROXY,
## The node type doing all the actual traffic analysis.
WORKER,
## A node acting as a traffic recorder using the
## `Time Machine <https://www.zeek.org/community/time-machine.html>`_
## software.
TIME_MACHINE,
};
## Record type to indicate a node in a cluster.
type Node: record {
## Identifies the type of cluster node in this node's configuration.
node_type: NodeType;
## The IP address of the cluster node.
ip: addr;
## If the *ip* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &default="";
## The port that this node will listen on for peer connections.
## A value of ``0/unknown`` means the node is not pre-configured to listen.
p: port &default=0/unknown;
## Identifier for the interface a worker is sniffing.
interface: string &optional;
## Name of the manager node this node uses. For workers and proxies.
manager: string &optional;
## Name of a time machine node with which this node connects.
time_machine: string &optional;
## A unique identifier assigned to the node by the broker framework.
## This field is only set while a node is connected.
id: string &optional;
};
## This function can be called at any time to determine if the cluster
## framework is being enabled for this run.
##
## Returns: True if :zeek:id:`Cluster::node` has been set.
global is_enabled: function(): bool;
## This function can be called at any time to determine what type of
## cluster node the current Zeek instance is going to be acting as.
## If :zeek:id:`Cluster::is_enabled` returns false, then
## :zeek:enum:`Cluster::NONE` is returned.
##
## Returns: The :zeek:type:`Cluster::NodeType` the calling node acts as.
global local_node_type: function(): NodeType;
## This gives the value for the number of workers currently connected to,
## and it's maintained internally by the cluster framework. It's
## primarily intended for use by managers to find out how many workers
## should be responding to requests.
global worker_count: count = 0;
## The cluster layout definition. This should be placed into a filter
## named cluster-layout.zeek somewhere in the ZEEKPATH. It will be
## automatically loaded if the CLUSTER_NODE environment variable is set.
## Note that ZeekControl handles all of this automatically.
## The table is typically indexed by node names/labels (e.g. "manager"
## or "worker-1").
const nodes: table[string] of Node = {} &redef;
## Indicates whether or not the manager will act as the logger and receive
## logs. This value should be set in the cluster-layout.zeek script (the
## value should be true only if no logger is specified in Cluster::nodes).
## Note that ZeekControl handles this automatically.
const manager_is_logger = T &redef;
## This is usually supplied on the command line for each instance
## of the cluster that is started up.
const node = getenv("CLUSTER_NODE") &redef;
## Interval for retrying failed connections between cluster nodes.
## If set, the ZEEK_DEFAULT_CONNECT_RETRY (given in number of seconds)
## environment variable overrides this option.
const retry_interval = 1min &redef;
## When using broker-enabled cluster framework, nodes broadcast this event
## to exchange their user-defined name along with a string that uniquely
## identifies it for the duration of its lifetime. This string may change
## if the node dies and has to reconnect later.
global hello: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a cluster node connects or reconnects.
global node_up: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a connected cluster node becomes disconnected.
global node_down: event(name: string, id: string);
## Write a message to the cluster logging stream.
global log: function(msg: string);
## Retrieve the topic associated with a specific node in the cluster.
##
## name: the name of the cluster node (e.g. "manager").
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global node_topic: function(name: string): string;
## Retrieve the topic associated with a specific node in the cluster.
##
## id: the id of the cluster node (from :zeek:see:`Broker::EndpointInfo`
## or :zeek:see:`Broker::node_id`.
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string;
}
global active_worker_ids: set[string] = set();
type NamedNode: record {
name: string;
node: Node;
};
function nodes_with_type(node_type: NodeType): vector of NamedNode
{
local rval: vector of NamedNode = vector();
local names: vector of string = vector();
for ( name in Cluster::nodes )
names += name;
names = sort(names, strcmp);
for ( i in names )
{
name = names[i];
local n = Cluster::nodes[name];
if ( n$node_type != node_type )
next;
rval += NamedNode($name=name, $node=n);
}
return rval;
}
function is_enabled(): bool
{
return (node != "");
}
function local_node_type(): NodeType
{
if ( ! is_enabled() )
return NONE;
if ( node !in nodes )
return NONE;
return nodes[node]$node_type;
}
function node_topic(name: string): string
{
return node_topic_prefix + name + "/";
}
function nodeid_topic(id: string): string
{
return nodeid_topic_prefix + id + "/";
}
event Cluster::hello(name: string, id: string) &priority=10
{
if ( name !in nodes )
{
Reporter::error(fmt("Got Cluster::hello msg from unexpected node: %s", name));
return;
}
local n = nodes[name];
if ( n?$id )
{
if ( n$id != id )
Reporter::error(fmt("Got Cluster::hello msg from duplicate node:%s",
name));
}
else
event Cluster::node_up(name, id);
n$id = id;
Cluster::log(fmt("got hello from %s (%s)", name, id));
if ( n$node_type == WORKER )
{
add active_worker_ids[id];
worker_count = |active_worker_ids|;
}
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
Broker::publish(nodeid_topic(endpoint$id), e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
for ( node_name, n in nodes )
{
if ( n?$id && n$id == endpoint$id )
{
Cluster::log(fmt("node down: %s", node_name));
delete n$id;
if ( n$node_type == WORKER )
{
delete active_worker_ids[endpoint$id];
worker_count = |active_worker_ids|;
}
event Cluster::node_down(node_name, endpoint$id);
break;
}
}
}
event zeek_init() &priority=5
{
# If a node is given, but it's an unknown name we need to fail.
if ( node != "" && node !in nodes )
{
Reporter::error(fmt("'%s' is not a valid node in the Cluster::nodes configuration", node));
terminate();
}
Log::create_stream(Cluster::LOG, [$columns=Info, $path="cluster", $policy=log_policy]);
}
function create_store(name: string, persistent: bool &default=F): Cluster::StoreInfo
{
local info = stores[name];
info$name = name;
if ( Cluster::default_store_dir != "" )
{
local default_options = Broker::BackendOptions();
local path = Cluster::default_store_dir + "/" + name;
if ( info$options$sqlite$path == default_options$sqlite$path )
info$options$sqlite$path = path + ".sqlite";
if ( info$options$rocksdb$path == default_options$rocksdb$path )
info$options$rocksdb$path = path + ".rocksdb";
}
if ( persistent )
{
switch ( info$backend ) {
case Broker::MEMORY:
info$backend = Cluster::default_persistent_backend;
break;
case Broker::SQLITE:
fallthrough;
case Broker::ROCKSDB:
# no-op: user already asked for a specific persistent backend.
break;
default:
Reporter::error(fmt("unhandled data store type: %s", info$backend));
break;
}
}
if ( ! Cluster::is_enabled() )
{
if ( info?$store )
{
Reporter::warning(fmt("duplicate cluster store creation for %s", name));
return info;
}
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
return info;
}
if ( info$master_node == "" )
{
local mgr_nodes = nodes_with_type(Cluster::MANAGER);
if ( |mgr_nodes| == 0 )
Reporter::fatal(fmt("empty master node name for cluster store " +
"'%s', but there's no manager node to default",
name));
info$master_node = mgr_nodes[0]$name;
}
else if ( info$master_node !in Cluster::nodes )
Reporter::fatal(fmt("master node '%s' for cluster store '%s' does not exist",
info$master_node, name));
if ( Cluster::node == info$master_node )
{
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
Cluster::log(fmt("created master store: %s", name));
return info;
}
info$master = F;
stores[name] = info;
info$store = Broker::create_clone(info$name,
info$clone_resync_interval,
info$clone_stale_interval,
info$clone_mutation_buffer_interval);
Cluster::log(fmt("created clone store: %s", info$name));
return info;
}
function log(msg: string)
{
Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]);
}