##! The Broker-based communication API and its various options. module Broker; export { ## Default port for native Broker communication. Where not specified ## otherwise, this is the port to connect to and listen on. const default_port = 9999/tcp &redef; ## Default port for Broker WebSocket communication. Where not specified ## otherwise, this is the port to connect to and listen on for ## WebSocket connections. ## ## See the Broker documentation for a specification of the message ## format over WebSocket connections. const default_port_websocket = 9997/tcp &redef; ## Default interval to retry listening on a port if it's currently in ## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable ## (set as a number of seconds) will override this option and also ## any values given to :zeek:see:`Broker::listen`. const default_listen_retry = 1sec &redef; ## Default address on which to listen. ## ## .. zeek:see:: Broker::listen const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef; ## Default address on which to listen for WebSocket connections. ## ## .. zeek:see:: Cluster::listen_websocket const default_listen_address_websocket = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef; ## Default interval to retry connecting to a peer if it cannot be made to ## work initially, or if it ever becomes disconnected. Use of the ## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of ## seconds) will override this option and also any values given to ## :zeek:see:`Broker::peer`. const default_connect_retry = 1sec &redef; ## If true, do not use SSL for network connections. By default, SSL will ## even be used if no certificates / CAs have been configured. In that case ## (which is the default) the communication will be encrypted, but not ## authenticated. const disable_ssl = F &redef; ## Path to a file containing concatenated trusted certificates ## in PEM format. If set, Zeek will require valid certificates for ## all peers. const ssl_cafile = "" &redef; ## Path to an OpenSSL-style directory of trusted certificates. ## If set, Zeek will require valid certificates for ## all peers. const ssl_capath = "" &redef; ## Path to a file containing a X.509 certificate for this ## node in PEM format. If set, Zeek will require valid certificates for ## all peers. const ssl_certificate = "" &redef; ## Passphrase to decrypt the private key specified by ## :zeek:see:`Broker::ssl_keyfile`. If set, Zeek will require valid ## certificates for all peers. const ssl_passphrase = "" &redef; ## Path to the file containing the private key for this node's ## certificate. If set, Zeek will require valid certificates for ## all peers. const ssl_keyfile = "" &redef; ## The max number of log entries per log stream to batch together when ## sending log messages to a remote logger. const log_batch_size = 400 &redef; ## Max time to buffer log messages before sending the current set out as a ## batch. const log_batch_interval = 1sec &redef; ## Max number of threads to use for Broker/CAF functionality. The ## ``ZEEK_BROKER_MAX_THREADS`` environment variable overrides this setting. const max_threads = 1 &redef; ## Max number of items we buffer at most per peer. What action to take when ## the buffer reaches its maximum size is determined by ## :zeek:see:`Broker::peer_overflow_policy`. const peer_buffer_size = 8192 &redef; ## Configures how Broker responds to peers that cannot keep up with the ## incoming message rate. Available strategies: ## - disconnect: drop the connection to the unresponsive peer ## - drop_newest: replace the newest message in the buffer ## - drop_oldest: removed the olsted message from the buffer, then append const peer_overflow_policy = "drop_oldest" &redef; ## Same as :zeek:see:`Broker::peer_buffer_size` but for WebSocket clients. const web_socket_buffer_size = 8192 &redef; ## Same as :zeek:see:`Broker::peer_overflow_policy` but for WebSocket clients. const web_socket_overflow_policy = "drop_oldest" &redef; ## How frequently Zeek resets some peering/client buffer statistics, ## such as ``max_queued_recently`` in :zeek:see:`BrokerPeeringStats`. const buffer_stats_reset_interval = 1min &redef; ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be ## better for cases that don't require much concurrency or need lower power ## consumption. The "stealing" policy uses multiple work queues protected ## by spinlocks, which may be better for use-cases that have more ## concurrency needs. E.g. may be worth testing the "stealing" policy ## along with dedicating more threads if a lot of data store processing is ## required. const scheduler_policy = "sharing" &redef; ## Interval of time for under-utilized Broker/CAF threads to sleep ## when in "moderate" mode. Only used for the "stealing" scheduler policy. const moderate_sleep = 16 msec &redef; ## Interval of time for under-utilized Broker/CAF threads to sleep ## when in "relaxed" mode. Only used for the "stealing" scheduler policy. const relaxed_sleep = 64 msec &redef; ## Number of work-stealing polling attempts for Broker/CAF threads ## in "aggressive" mode. Only used for the "stealing" scheduler policy. const aggressive_polls = 5 &redef; ## Number of work-stealing polling attempts for Broker/CAF threads ## in "moderate" mode. Only used for the "stealing" scheduler policy. const moderate_polls = 5 &redef; ## Frequency of work-stealing polling attempts for Broker/CAF threads ## in "aggressive" mode. Only used for the "stealing" scheduler policy. const aggressive_interval = 4 &redef; ## Frequency of work-stealing polling attempts for Broker/CAF threads ## in "moderate" mode. Only used for the "stealing" scheduler policy. const moderate_interval = 2 &redef; ## Frequency of work-stealing polling attempts for Broker/CAF threads ## in "relaxed" mode. Only used for the "stealing" scheduler policy. const relaxed_interval = 1 &redef; ## Forward all received messages to subscribing peers. const forward_messages = F &redef; ## Whether calling :zeek:see:`Broker::peer` will register the Broker ## system as an I/O source that will block the process from shutting ## down. For example, set this to false when you are reading pcaps, ## but also want to initiate a Broker peering and still shutdown after ## done reading the pcap. option peer_counts_as_iosource = T; ## The default topic prefix where logs will be published. The log's stream ## id is appended when writing to a particular stream. const default_log_topic_prefix = "zeek/logs/" &redef; ## The default implementation for :zeek:see:`Broker::log_topic`. function default_log_topic(id: Log::ID, path: string): string { return default_log_topic_prefix + cat(id); } ## A function that will be called for each log entry to determine what ## broker topic string will be used for sending it to peers. The ## default implementation will return a value based on ## :zeek:see:`Broker::default_log_topic_prefix`. ## ## id: the ID associated with the log stream entry that will be sent. ## ## path: the path to which the log stream entry will be output. ## ## Returns: a string representing the broker topic to which the log ## will be sent. const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef; ## The possible log event severity levels for Broker. type LogSeverityLevel: enum { ## Fatal event, normal operation has most likely broken down. LOG_CRITICAL, ## Unrecoverable event that imparts at least part of the system. LOG_ERROR, ## Unexpected or conspicuous event that may still be recoverable. LOG_WARNING, ## Noteworthy event during normal operation. LOG_INFO, ## Information that might be relevant for a user to understand system behavior. LOG_VERBOSE, ## An event that is relevant only for troubleshooting and debugging. LOG_DEBUG, }; ## The log event severity level for the Broker log output. const log_severity_level = LOG_WARNING &redef; ## Event severity level for also printing the Broker log output to stderr. const log_stderr_severity_level = LOG_CRITICAL &redef; type ErrorCode: enum { ## The unspecified default error code. UNSPECIFIED = 1, ## Version incompatibility. PEER_INCOMPATIBLE = 2, ## Referenced peer does not exist. PEER_INVALID = 3, ## Remote peer not listening. PEER_UNAVAILABLE = 4, ## Remote peer disconnected during the handshake. PEER_DISCONNECT_DURING_HANDSHAKE = 5, ## A peering request timed out. PEER_TIMEOUT = 6, ## Master with given name already exists. MASTER_EXISTS = 7, ## Master with given name does not exist. NO_SUCH_MASTER = 8, ## The given data store key does not exist. NO_SUCH_KEY = 9, ## The store operation timed out. REQUEST_TIMEOUT = 10, ## The operation expected a different type than provided. TYPE_CLASH = 11, ## The data value cannot be used to carry out the desired operation. INVALID_DATA = 12, ## The storage backend failed to execute the operation. BACKEND_FAILURE = 13, ## The storage backend failed to execute the operation. STALE_DATA = 14, ## Catch-all for a CAF-level problem. CAF_ERROR = 100 }; ## The possible states of a peer endpoint. type PeerStatus: enum { ## The peering process is initiated. INITIALIZING, ## Connection establishment in process. CONNECTING, ## Connection established, peering pending. CONNECTED, ## Successfully peered. PEERED, ## Connection to remote peer lost. DISCONNECTED, ## Reconnecting to peer after a lost connection. RECONNECTING, }; type NetworkInfo: record { ## The IP address or hostname where the endpoint listens. address: string &log; ## The port where the endpoint is bound to. bound_port: port &log; }; type EndpointInfo: record { ## A unique identifier of the node. id: string; ## Network-level information. network: NetworkInfo &optional; }; type PeerInfo: record { peer: EndpointInfo; status: PeerStatus; ## Whether the local node created the peering, as opposed to a ## remote establishing it by connecting to us. is_outbound: bool; }; type PeerInfos: vector of PeerInfo; ## Opaque communication data. type Data: record { data: opaque of Broker::Data &optional; }; ## Opaque communication data sequence. type DataVector: vector of Broker::Data; ## Opaque event communication data. type Event: record { ## The name of the event. Not set if invalid event or arguments. name: string &optional; ## The arguments to the event. args: DataVector; }; ## Opaque communication data used as a convenient way to wrap key-value ## pairs that comprise table entries. type TableItem : record { key: Broker::Data; val: Broker::Data; }; ## Listen for remote connections using the native Broker protocol. ## ## a: an address string on which to accept connections, e.g. ## "127.0.0.1". An empty string refers to INADDR_ANY. ## ## p: the TCP port to listen on. The value 0 means that the OS should choose ## the next available free port. ## ## retry: If non-zero, retries listening in regular intervals if the port cannot be ## acquired immediately. 0 disables retries. If the ## ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number ## of seconds), it overrides any value given here. ## ## Returns: the bound port or 0/? on failure. ## ## .. zeek:see:: Broker::status global listen: function(a: string &default = default_listen_address, p: port &default = default_port, retry: interval &default = default_listen_retry): port; ## Initiate a remote connection. ## ## a: an address to connect to, e.g. "localhost" or "127.0.0.1". ## ## p: the TCP port on which the remote side is listening. ## ## retry: an interval at which to retry establishing the ## connection with the remote peer if it cannot be made initially, or ## if it ever becomes disconnected. If the ## ZEEK_DEFAULT_CONNECT_RETRY environment variable is set (as number ## of seconds), it overrides any value given here. ## ## Returns: true if it's possible to try connecting with the peer and ## it's a new peer. The actual connection may not be established ## until a later point in time. ## ## .. zeek:see:: Broker::status global peer: function(a: string, p: port &default=default_port, retry: interval &default=default_connect_retry): bool; ## Remove a remote connection. ## ## Note that this does not terminate the connection to the peer, it ## just means that we won't exchange any further information with it ## unless peering resumes later. ## ## a: the address used in previous successful call to :zeek:see:`Broker::peer`. ## ## p: the port used in previous successful call to :zeek:see:`Broker::peer`. ## ## Returns: true if the arguments match a previously successful call to ## :zeek:see:`Broker::peer`. ## ## TODO: We do not have a function yet to terminate a connection. global unpeer: function(a: string, p: port): bool; ## Whether the local node originally initiated the peering with the ## given endpoint. ## ## a: the address used in previous successful call to :zeek:see:`Broker::peer`. ## ## p: the port used in previous successful call to :zeek:see:`Broker::peer`. ## ## Returns:: True if this node initiated the peering. global is_outbound_peering: function(a: string, p: port): bool; ## Get a list of all peer connections. ## ## Returns: a list of all peer connections. global peers: function(): vector of PeerInfo; ## Get a unique identifier for the local broker endpoint. ## ## Returns: a unique identifier for the local broker endpoint. global node_id: function(): string; ## Obtain each peering's send-buffer statistics. The keys are Broker ## endpoint IDs. ## ## Returns: per-peering statistics. global peering_stats: function(): table[string] of BrokerPeeringStats; ## Sends all pending log messages to remote peers. This normally ## doesn't need to be used except for test cases that are time-sensitive. global flush_logs: function(): count; ## Publishes the value of an identifier to a given topic. The subscribers ## will update their local value for that identifier on receipt. ## ## topic: a topic associated with the message. ## ## id: the identifier to publish. ## ## Returns: true if the message is sent. global publish_id: function(topic: string, id: string): bool; ## Register interest in all peer event messages that use a certain topic ## prefix. Note that subscriptions may not be altered immediately after ## calling (except during :zeek:see:`zeek_init`). ## ## topic_prefix: a prefix to match against remote message topics. ## e.g. an empty prefix matches everything and "a" matches ## "alice" and "amy" but not "bob". ## ## Returns: true if it's a new event subscription and it is now registered. global subscribe: function(topic_prefix: string): bool; ## Unregister interest in all peer event messages that use a topic prefix. ## Note that subscriptions may not be altered immediately after calling ## (except during :zeek:see:`zeek_init`). ## ## topic_prefix: a prefix previously supplied to a successful call to ## :zeek:see:`Broker::subscribe` or :zeek:see:`Broker::forward`. ## ## Returns: true if interest in the topic prefix is no longer advertised. global unsubscribe: function(topic_prefix: string): bool; ## Register a topic prefix subscription for events that should only be ## forwarded to any subscribing peers and not raise any event handlers ## on the receiving/forwarding node. i.e. it's the same as ## :zeek:see:`Broker::subscribe` except matching events are not raised ## on the receiver, just forwarded. Use :zeek:see:`Broker::unsubscribe` ## with the same argument to undo this operation. ## ## topic_prefix: a prefix to match against remote message topics. ## e.g. an empty prefix matches everything and "a" matches ## "alice" and "amy" but not "bob". ## ## Returns: true if a new event forwarding/subscription is now registered. global forward: function(topic_prefix: string): bool; } @load base/bif/comm.bif @load base/bif/messaging.bif module Broker; event Broker::log_flush() &priority=10 { Broker::flush_logs(); schedule Broker::log_batch_interval { Broker::log_flush() }; } event zeek_init() { schedule Broker::log_batch_interval { Broker::log_flush() }; } event retry_listen(a: string, p: port, retry: interval) { listen(a, p, retry); } function listen(a: string, p: port, retry: interval): port { local bound = __listen(a, p, Broker::NATIVE); if ( bound == 0/tcp ) { local e = getenv("ZEEK_DEFAULT_LISTEN_RETRY"); if ( e != "" ) retry = double_to_interval(to_double(e)); if ( retry != 0secs ) schedule retry { retry_listen(a, p, retry) }; } return bound; } function peer(a: string, p: port, retry: interval): bool { return __peer(a, p, retry); } function unpeer(a: string, p: port): bool { return __unpeer(a, p); } function is_outbound_peering(a: string, p: port): bool { return __is_outbound_peering(a, p); } function peers(): vector of PeerInfo { return __peers(); } function node_id(): string { return __node_id(); } function peering_stats(): table[string] of BrokerPeeringStats { return __peering_stats(); } function flush_logs(): count { return __flush_logs(); } function publish_id(topic: string, id: string): bool { return __publish_id(topic, id); } function subscribe(topic_prefix: string): bool { return __subscribe(topic_prefix); } function forward(topic_prefix: string): bool { return __forward(topic_prefix); } function unsubscribe(topic_prefix: string): bool { return __unsubscribe(topic_prefix); }