zeek/scripts/base/frameworks/broker/main.zeek
Christian Kreibich 841a40ff88 Switch Broker's default backpressure policy to drop_oldest, bump buffer sizes
At every site where we've dug into backpressure disconnect findings, it has been
the case that the default values were too small. 8192, so 4x the old default,
suffices at every site to drown out premature disconnects.

With metrics now available for the send buffers regardless of backpressure
overflow policy, this also switches the default from "disconnect" to
"drop_oldest" (for both peers and websockets), meaning that peerings remain
untouched but the oldest queued message simply gets dropped when a new message
is enqueued. With this policy, the number of backpressure overflows is then
simply the count of discarded messages, something that users can tune to see
drop to zero in everyday use.  Another benefit is that marginal overflows cause
less message loss than when an entire buffer's worth (plus potentially more
in-flight messages) gets thrown out with a disconnect.
2025-04-25 10:22:35 -07:00

605 lines
21 KiB
Text

##! The Broker-based communication API and its various options.
module Broker;
export {
## Default port for native Broker communication. Where not specified
## otherwise, this is the port to connect to and listen on.
const default_port = 9999/tcp &redef;
## Default port for Broker WebSocket communication. Where not specified
## otherwise, this is the port to connect to and listen on for
## WebSocket connections.
##
## See the Broker documentation for a specification of the message
## format over WebSocket connections.
const default_port_websocket = 9997/tcp &redef;
## Default interval to retry listening on a port if it's currently in
## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable
## (set as a number of seconds) will override this option and also
## any values given to :zeek:see:`Broker::listen`.
const default_listen_retry = 30sec &redef;
## Default address on which to listen.
##
## .. zeek:see:: Broker::listen
const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
## Default address on which to listen for WebSocket connections.
##
## .. zeek:see:: Broker::listen_websocket
const default_listen_address_websocket = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
## Default interval to retry connecting to a peer if it cannot be made to
## work initially, or if it ever becomes disconnected. Use of the
## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of
## seconds) will override this option and also any values given to
## :zeek:see:`Broker::peer`.
const default_connect_retry = 30sec &redef;
## If true, do not use SSL for network connections. By default, SSL will
## even be used if no certificates / CAs have been configured. In that case
## (which is the default) the communication will be encrypted, but not
## authenticated.
const disable_ssl = F &redef;
## Path to a file containing concatenated trusted certificates
## in PEM format. If set, Zeek will require valid certificates for
## all peers.
const ssl_cafile = "" &redef;
## Path to an OpenSSL-style directory of trusted certificates.
## If set, Zeek will require valid certificates for
## all peers.
const ssl_capath = "" &redef;
## Path to a file containing a X.509 certificate for this
## node in PEM format. If set, Zeek will require valid certificates for
## all peers.
const ssl_certificate = "" &redef;
## Passphrase to decrypt the private key specified by
## :zeek:see:`Broker::ssl_keyfile`. If set, Zeek will require valid
## certificates for all peers.
const ssl_passphrase = "" &redef;
## Path to the file containing the private key for this node's
## certificate. If set, Zeek will require valid certificates for
## all peers.
const ssl_keyfile = "" &redef;
## The number of buffered messages at the Broker/CAF layer after which
## a subscriber considers themselves congested (i.e. tune the congestion
## control mechanisms).
const congestion_queue_size = 200 &redef &deprecated="Remove in v8.1. Non-functional since v5.0";
## The max number of log entries per log stream to batch together when
## sending log messages to a remote logger.
const log_batch_size = 400 &redef;
## Max time to buffer log messages before sending the current set out as a
## batch.
const log_batch_interval = 1sec &redef;
## Max number of threads to use for Broker/CAF functionality. The
## ``ZEEK_BROKER_MAX_THREADS`` environment variable overrides this setting.
const max_threads = 1 &redef;
## Max number of items we buffer at most per peer. What action to take when
## the buffer reaches its maximum size is determined by
## :zeek:see:`Broker::peer_overflow_policy`.
const peer_buffer_size = 8192 &redef;
## Configures how Broker responds to peers that cannot keep up with the
## incoming message rate. Available strategies:
## - disconnect: drop the connection to the unresponsive peer
## - drop_newest: replace the newest message in the buffer
## - drop_oldest: removed the olsted message from the buffer, then append
const peer_overflow_policy = "drop_oldest" &redef;
## Same as :zeek:see:`Broker::peer_buffer_size` but for WebSocket clients.
const web_socket_buffer_size = 8192 &redef;
## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients.
const web_socket_overflow_policy = "drop_oldest" &redef;
## How frequently Zeek resets some peering/client buffer statistics,
## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`.
const buffer_stats_reset_interval = 1min &redef;
## The CAF scheduling policy to use. Available options are "sharing" and
## "stealing". The "sharing" policy uses a single, global work queue along
## with mutex and condition variable used for accessing it, which may be
## better for cases that don't require much concurrency or need lower power
## consumption. The "stealing" policy uses multiple work queues protected
## by spinlocks, which may be better for use-cases that have more
## concurrency needs. E.g. may be worth testing the "stealing" policy
## along with dedicating more threads if a lot of data store processing is
## required.
const scheduler_policy = "sharing" &redef;
## Interval of time for under-utilized Broker/CAF threads to sleep
## when in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_sleep = 16 msec &redef;
## Interval of time for under-utilized Broker/CAF threads to sleep
## when in "relaxed" mode. Only used for the "stealing" scheduler policy.
const relaxed_sleep = 64 msec &redef;
## Number of work-stealing polling attempts for Broker/CAF threads
## in "aggressive" mode. Only used for the "stealing" scheduler policy.
const aggressive_polls = 5 &redef;
## Number of work-stealing polling attempts for Broker/CAF threads
## in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_polls = 5 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "aggressive" mode. Only used for the "stealing" scheduler policy.
const aggressive_interval = 4 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_interval = 2 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "relaxed" mode. Only used for the "stealing" scheduler policy.
const relaxed_interval = 1 &redef;
## Forward all received messages to subscribing peers.
const forward_messages = F &redef;
## Whether calling :zeek:see:`Broker::peer` will register the Broker
## system as an I/O source that will block the process from shutting
## down. For example, set this to false when you are reading pcaps,
## but also want to initiate a Broker peering and still shutdown after
## done reading the pcap.
option peer_counts_as_iosource = T;
## The default topic prefix where logs will be published. The log's stream
## id is appended when writing to a particular stream.
const default_log_topic_prefix = "zeek/logs/" &redef;
## The default implementation for :zeek:see:`Broker::log_topic`.
function default_log_topic(id: Log::ID, path: string): string
{
return default_log_topic_prefix + cat(id);
}
## A function that will be called for each log entry to determine what
## broker topic string will be used for sending it to peers. The
## default implementation will return a value based on
## :zeek:see:`Broker::default_log_topic_prefix`.
##
## id: the ID associated with the log stream entry that will be sent.
##
## path: the path to which the log stream entry will be output.
##
## Returns: a string representing the broker topic to which the log
## will be sent.
const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef;
## The possible log event severity levels for Broker.
type LogSeverityLevel: enum {
## Fatal event, normal operation has most likely broken down.
LOG_CRITICAL,
## Unrecoverable event that imparts at least part of the system.
LOG_ERROR,
## Unexpected or conspicuous event that may still be recoverable.
LOG_WARNING,
## Noteworthy event during normal operation.
LOG_INFO,
## Information that might be relevant for a user to understand system behavior.
LOG_VERBOSE,
## An event that is relevant only for troubleshooting and debugging.
LOG_DEBUG,
};
## The log event severity level for the Broker log output.
const log_severity_level = LOG_WARNING &redef;
## Event severity level for also printing the Broker log output to stderr.
const log_stderr_severity_level = LOG_CRITICAL &redef;
type ErrorCode: enum {
## The unspecified default error code.
UNSPECIFIED = 1,
## Version incompatibility.
PEER_INCOMPATIBLE = 2,
## Referenced peer does not exist.
PEER_INVALID = 3,
## Remote peer not listening.
PEER_UNAVAILABLE = 4,
## Remote peer disconnected during the handshake.
PEER_DISCONNECT_DURING_HANDSHAKE = 5,
## A peering request timed out.
PEER_TIMEOUT = 6,
## Master with given name already exists.
MASTER_EXISTS = 7,
## Master with given name does not exist.
NO_SUCH_MASTER = 8,
## The given data store key does not exist.
NO_SUCH_KEY = 9,
## The store operation timed out.
REQUEST_TIMEOUT = 10,
## The operation expected a different type than provided.
TYPE_CLASH = 11,
## The data value cannot be used to carry out the desired operation.
INVALID_DATA = 12,
## The storage backend failed to execute the operation.
BACKEND_FAILURE = 13,
## The storage backend failed to execute the operation.
STALE_DATA = 14,
## Catch-all for a CAF-level problem.
CAF_ERROR = 100
};
## The possible states of a peer endpoint.
type PeerStatus: enum {
## The peering process is initiated.
INITIALIZING,
## Connection establishment in process.
CONNECTING,
## Connection established, peering pending.
CONNECTED,
## Successfully peered.
PEERED,
## Connection to remote peer lost.
DISCONNECTED,
## Reconnecting to peer after a lost connection.
RECONNECTING,
};
type NetworkInfo: record {
## The IP address or hostname where the endpoint listens.
address: string &log;
## The port where the endpoint is bound to.
bound_port: port &log;
};
type EndpointInfo: record {
## A unique identifier of the node.
id: string;
## Network-level information.
network: NetworkInfo &optional;
};
type PeerInfo: record {
peer: EndpointInfo;
status: PeerStatus;
## Whether the local node created the peering, as opposed to a
## remote establishing it by connecting to us.
is_outbound: bool;
};
type PeerInfos: vector of PeerInfo;
## Opaque communication data.
type Data: record {
data: opaque of Broker::Data &optional;
};
## Opaque communication data sequence.
type DataVector: vector of Broker::Data;
## Opaque event communication data.
type Event: record {
## The name of the event. Not set if invalid event or arguments.
name: string &optional;
## The arguments to the event.
args: DataVector;
};
## Opaque communication data used as a convenient way to wrap key-value
## pairs that comprise table entries.
type TableItem : record {
key: Broker::Data;
val: Broker::Data;
};
## Listen for remote connections using the native Broker protocol.
##
## a: an address string on which to accept connections, e.g.
## "127.0.0.1". An empty string refers to INADDR_ANY.
##
## p: the TCP port to listen on. The value 0 means that the OS should choose
## the next available free port.
##
## retry: If non-zero, retries listening in regular intervals if the port cannot be
## acquired immediately. 0 disables retries. If the
## ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number
## of seconds), it overrides any value given here.
##
## Returns: the bound port or 0/? on failure.
##
## .. zeek:see:: Broker::status
global listen: function(a: string &default = default_listen_address,
p: port &default = default_port,
retry: interval &default = default_listen_retry): port;
## Listen for remote connections using WebSocket.
##
## a: an address string on which to accept connections, e.g.
## "127.0.0.1". An empty string refers to INADDR_ANY.
##
## p: the TCP port to listen on. The value 0 means that the OS should choose
## the next available free port.
##
## retry: If non-zero, retries listening in regular intervals if the port cannot be
## acquired immediately. 0 disables retries. If the
## ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number
## of seconds), it overrides any value given here.
##
## Returns: the bound port or 0/? on failure.
##
## .. zeek:see:: Broker::status
global listen_websocket: function(a: string &default = default_listen_address_websocket,
p: port &default = default_port_websocket,
retry: interval &default = default_listen_retry): port
&deprecated="Remove in v8.1. Switch to Cluster::listen_websocket() instead.";
## Initiate a remote connection.
##
## a: an address to connect to, e.g. "localhost" or "127.0.0.1".
##
## p: the TCP port on which the remote side is listening.
##
## retry: an interval at which to retry establishing the
## connection with the remote peer if it cannot be made initially, or
## if it ever becomes disconnected. If the
## ZEEK_DEFAULT_CONNECT_RETRY environment variable is set (as number
## of seconds), it overrides any value given here.
##
## Returns: true if it's possible to try connecting with the peer and
## it's a new peer. The actual connection may not be established
## until a later point in time.
##
## .. zeek:see:: Broker::status
global peer: function(a: string, p: port &default=default_port,
retry: interval &default=default_connect_retry): bool;
## Remove a remote connection.
##
## Note that this does not terminate the connection to the peer, it
## just means that we won't exchange any further information with it
## unless peering resumes later.
##
## a: the address used in previous successful call to :zeek:see:`Broker::peer`.
##
## p: the port used in previous successful call to :zeek:see:`Broker::peer`.
##
## Returns: true if the arguments match a previously successful call to
## :zeek:see:`Broker::peer`.
##
## TODO: We do not have a function yet to terminate a connection.
global unpeer: function(a: string, p: port): bool;
## Whether the local node originally initiated the peering with the
## given endpoint.
##
## a: the address used in previous successful call to :zeek:see:`Broker::peer`.
##
## p: the port used in previous successful call to :zeek:see:`Broker::peer`.
##
## Returns:: True if this node initiated the peering.
global is_outbound_peering: function(a: string, p: port): bool;
## Get a list of all peer connections.
##
## Returns: a list of all peer connections.
global peers: function(): vector of PeerInfo;
## Get a unique identifier for the local broker endpoint.
##
## Returns: a unique identifier for the local broker endpoint.
global node_id: function(): string;
## Obtain each peering's send-buffer statistics. The keys are Broker
## endpoint IDs.
##
## Returns: per-peering statistics.
global peering_stats: function(): table[string] of BrokerPeeringStats;
## Sends all pending log messages to remote peers. This normally
## doesn't need to be used except for test cases that are time-sensitive.
global flush_logs: function(): count;
## Publishes the value of an identifier to a given topic. The subscribers
## will update their local value for that identifier on receipt.
##
## topic: a topic associated with the message.
##
## id: the identifier to publish.
##
## Returns: true if the message is sent.
global publish_id: function(topic: string, id: string): bool;
## Register interest in all peer event messages that use a certain topic
## prefix. Note that subscriptions may not be altered immediately after
## calling (except during :zeek:see:`zeek_init`).
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if it's a new event subscription and it is now registered.
global subscribe: function(topic_prefix: string): bool;
## Unregister interest in all peer event messages that use a topic prefix.
## Note that subscriptions may not be altered immediately after calling
## (except during :zeek:see:`zeek_init`).
##
## topic_prefix: a prefix previously supplied to a successful call to
## :zeek:see:`Broker::subscribe` or :zeek:see:`Broker::forward`.
##
## Returns: true if interest in the topic prefix is no longer advertised.
global unsubscribe: function(topic_prefix: string): bool;
## Register a topic prefix subscription for events that should only be
## forwarded to any subscribing peers and not raise any event handlers
## on the receiving/forwarding node. i.e. it's the same as
## :zeek:see:`Broker::subscribe` except matching events are not raised
## on the receiver, just forwarded. Use :zeek:see:`Broker::unsubscribe`
## with the same argument to undo this operation.
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if a new event forwarding/subscription is now registered.
global forward: function(topic_prefix: string): bool;
## Automatically send an event to any interested peers whenever it is
## locally dispatched. (For example, using "event my_event(...);" in a
## script.)
##
## topic: a topic string associated with the event message.
## Peers advertise interest by registering a subscription to some
## prefix of this topic name.
##
## ev: a Zeek event value.
##
## Returns: true if automatic event sending is now enabled.
global auto_publish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. Switch to explicit Cluster::publish() calls. Auto-publish won't work with all cluster backends.";
## Stop automatically sending an event to peers upon local dispatch.
##
## topic: a topic originally given to :zeek:see:`Broker::auto_publish`.
##
## ev: an event originally given to :zeek:see:`Broker::auto_publish`.
##
## Returns: true if automatic events will not occur for the topic/event
## pair.
global auto_unpublish: function(topic: string, ev: any): bool &deprecated="Remove in v8.1. See Broker::auto_publish()";
}
@load base/bif/comm.bif
@load base/bif/messaging.bif
module Broker;
event Broker::log_flush() &priority=10
{
Broker::flush_logs();
schedule Broker::log_batch_interval { Broker::log_flush() };
}
event zeek_init()
{
schedule Broker::log_batch_interval { Broker::log_flush() };
}
event retry_listen(a: string, p: port, retry: interval)
{
listen(a, p, retry);
}
function listen(a: string, p: port, retry: interval): port
{
local bound = __listen(a, p, Broker::NATIVE);
if ( bound == 0/tcp )
{
local e = getenv("ZEEK_DEFAULT_LISTEN_RETRY");
if ( e != "" )
retry = double_to_interval(to_double(e));
if ( retry != 0secs )
schedule retry { retry_listen(a, p, retry) };
}
return bound;
}
event retry_listen_websocket(a: string, p: port, retry: interval)
{
@pragma push ignore-deprecations
listen_websocket(a, p, retry);
@pragma pop ignore-deprecations
}
function listen_websocket(a: string, p: port, retry: interval): port
{
local bound = __listen(a, p, Broker::WEBSOCKET);
if ( bound == 0/tcp )
{
local e = getenv("ZEEK_DEFAULT_LISTEN_RETRY");
if ( e != "" )
retry = double_to_interval(to_double(e));
if ( retry != 0secs )
schedule retry { retry_listen_websocket(a, p, retry) };
}
return bound;
}
function peer(a: string, p: port, retry: interval): bool
{
return __peer(a, p, retry);
}
function unpeer(a: string, p: port): bool
{
return __unpeer(a, p);
}
function is_outbound_peering(a: string, p: port): bool
{
return __is_outbound_peering(a, p);
}
function peers(): vector of PeerInfo
{
return __peers();
}
function node_id(): string
{
return __node_id();
}
function peering_stats(): table[string] of BrokerPeeringStats
{
return __peering_stats();
}
function flush_logs(): count
{
return __flush_logs();
}
function publish_id(topic: string, id: string): bool
{
return __publish_id(topic, id);
}
function subscribe(topic_prefix: string): bool
{
return __subscribe(topic_prefix);
}
function forward(topic_prefix: string): bool
{
return __forward(topic_prefix);
}
function unsubscribe(topic_prefix: string): bool
{
return __unsubscribe(topic_prefix);
}
function auto_publish(topic: string, ev: any): bool
{
return __auto_publish(topic, ev);
}
function auto_unpublish(topic: string, ev: any): bool
{
return __auto_unpublish(topic, ev);
}