diff --git a/scripts/base/frameworks/cluster/__load__.zeek b/scripts/base/frameworks/cluster/__load__.zeek index 2072b457a7..2ca60fb67a 100644 --- a/scripts/base/frameworks/cluster/__load__.zeek +++ b/scripts/base/frameworks/cluster/__load__.zeek @@ -1,7 +1,9 @@ # Load the core cluster support. @load ./main @load ./pools +@load ./pubsub @load ./telemetry +@load ./types @if ( Cluster::is_enabled() ) diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 7124cf23d8..8b86188996 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -18,6 +18,8 @@ @load base/frameworks/control @load base/frameworks/broker +@load ./types + module Cluster; export { @@ -75,19 +77,6 @@ export { ## :zeek:see:`Cluster::create_store` with the *persistent* argument set true. const default_persistent_backend = Broker::SQLITE &redef; - ## The default maximum queue size for WebSocket event dispatcher instances. - ## - ## If the maximum queue size is reached, events from external WebSocket - ## clients will be stalled and processed once the queue has been drained. - ## - ## An internal metric named ``cluster_onloop_queue_stalls`` and - ## labeled with a ``WebSocketEventDispatcher::`` tag - ## is incremented when the maximum queue size is reached. - const default_websocket_max_event_queue_size = 32 &redef; - - ## The default ping interval for WebSocket clients. - const default_websocket_ping_interval = 5 sec &redef; - ## Setting a default dir will, for persistent backends that have not ## been given an explicit file path via :zeek:see:`Cluster::stores`, ## automatically create a path within this dir that is based on the name of @@ -157,55 +146,6 @@ export { message: string; } &log; - ## Types of nodes that are allowed to participate in the cluster - ## configuration. - type NodeType: enum { - ## A dummy node type indicating the local node is not operating - ## within a cluster. - NONE, - ## A node type which is allowed to view/manipulate the configuration - ## of other nodes in the cluster. - CONTROL, - ## A node type responsible for log management. - LOGGER, - ## A node type responsible for policy management. - MANAGER, - ## A node type for relaying worker node communication and synchronizing - ## worker node state. - PROXY, - ## The node type doing all the actual traffic analysis. - WORKER, - }; - - ## Record type to indicate a node in a cluster. - type Node: record { - ## Identifies the type of cluster node in this node's configuration. - node_type: NodeType; - ## The IP address of the cluster node. - ip: addr; - ## If the *ip* field is a non-global IPv6 address, this field - ## can specify a particular :rfc:`4007` ``zone_id``. - zone_id: string &default=""; - ## The port that this node will listen on for peer connections. - ## A value of ``0/unknown`` means the node is not pre-configured to listen. - p: port &default=0/unknown; - ## Name of the manager node this node uses. For workers and proxies. - manager: string &optional; - ## A unique identifier assigned to the node by the broker framework. - ## This field is only set while a node is connected. - id: string &optional; - ## The port used to expose metrics to Prometheus. Setting this in a cluster - ## configuration will override the setting for Telemetry::metrics_port for - ## the node. - metrics_port: port &optional; - }; - - ## Record to represent a cluster node including its name. - type NamedNode: record { - name: string; - node: Node; - }; - ## This function can be called at any time to determine if the cluster ## framework is being enabled for this run. ## @@ -317,105 +257,14 @@ export { ## Returns: T on success, else F. global init: function(): bool; - ## Subscribe to the given topic. - ## - ## topic: The topic to subscribe to. - ## - ## Returns: T on success, else F. - global subscribe: function(topic: string): bool; - - ## Unsubscribe from the given topic. - ## - ## topic: The topic to unsubscribe from. - ## - ## Returns: T on success, else F. - global unsubscribe: function(topic: string): bool; - - ## An event instance for cluster pub/sub. - ## - ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. - type Event: record { - ## The event handler to be invoked on the remote node. - ev: any; - ## The arguments for the event. - args: vector of any; - }; - - ## The TLS options for a WebSocket server. - ## - ## If cert_file and key_file are set, TLS is enabled. If both - ## are unset, TLS is disabled. Any other combination is an error. - type WebSocketTLSOptions: record { - ## The cert file to use. - cert_file: string &optional; - ## The key file to use. - key_file: string &optional; - ## Expect peers to send client certificates. - enable_peer_verification: bool &default=F; - ## The CA certificate or CA bundle used for peer verification. - ## Empty will use the implementations's default when - ## ``enable_peer_verification`` is T. - ca_file: string &default=""; - ## The ciphers to use. Empty will use the implementation's defaults. - ciphers: string &default=""; - }; - - ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. - type WebSocketServerOptions: record { - ## The address to listen on, cannot be used together with ``listen_host``. - listen_addr: addr &optional; - ## The port the WebSocket server is supposed to listen on. - listen_port: port; - ## The maximum event queue size for this server. - max_event_queue_size: count &default=default_websocket_max_event_queue_size; - ## Ping interval to use. A WebSocket client not responding to - ## the pings will be disconnected. Set to a negative value to - ## disable pings. Subsecond intervals are currently not supported. - ping_interval: interval &default=default_websocket_ping_interval; - ## The TLS options used for this WebSocket server. By default, - ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. - tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); - }; - ## Start listening on a WebSocket address. ## ## options: The server :zeek:see:`Cluster::WebSocketServerOptions` to use. ## ## Returns: T on success, else F. global listen_websocket: function(options: WebSocketServerOptions): bool; - - ## Network information of an endpoint. - type NetworkInfo: record { - ## The IP address or hostname where the endpoint listens. - address: string; - ## The port where the endpoint is bound to. - bound_port: port; - }; - - ## Information about a WebSocket endpoint. - type EndpointInfo: record { - id: string; - network: NetworkInfo; - ## The value of the X-Application-Name HTTP header, if any. - application_name: string &optional; - }; - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. - ## - ## Breaking from this hook has no effect. - ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_subscribe: hook(topic: string); - - ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. - ## - ## Breaking from this hook has no effect. - ## - ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. - global on_unsubscribe: hook(topic: string); } -# Needs declaration of Cluster::Event type. @load base/bif/cluster.bif @load base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek @@ -674,16 +523,6 @@ function init(): bool return Cluster::Backend::__init(Cluster::node_id()); } -function subscribe(topic: string): bool - { - return Cluster::__subscribe(topic); - } - -function unsubscribe(topic: string): bool - { - return Cluster::__unsubscribe(topic); - } - function listen_websocket(options: WebSocketServerOptions): bool { return Cluster::__listen_websocket(options); diff --git a/scripts/base/frameworks/cluster/pubsub.zeek b/scripts/base/frameworks/cluster/pubsub.zeek new file mode 100644 index 0000000000..591a80f986 --- /dev/null +++ b/scripts/base/frameworks/cluster/pubsub.zeek @@ -0,0 +1,49 @@ +# A script that can be loaded by other scripts to access the publish subscribe API. + +@load ./types + +module Cluster; + +export { + ## Subscribe to the given topic. + ## + ## topic: The topic to subscribe to. + ## + ## Returns: T on success, else F. + global subscribe: function(topic: string): bool; + + ## Unsubscribe from the given topic. + ## + ## topic: The topic to unsubscribe from. + ## + ## Returns: T on success, else F. + global unsubscribe: function(topic: string): bool; + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_subscribe: hook(topic: string); + + ## A hook invoked for every :zeek:see:`Cluster::subscribe` call. + ## + ## Breaking from this hook has no effect. + ## + ## topic: The topic string as given to :zeek:see:`Cluster::subscribe`. + global on_unsubscribe: hook(topic: string); +} + +# base/bif/cluster.bif.zeek generated from from src/cluster/cluster.bif contains the +# Cluster::publish(), Cluster::publish_hrw() and Cluster::publish_rr() APIs +@load base/bif/cluster.bif + +function subscribe(topic: string): bool + { + return Cluster::__subscribe(topic); + } + +function unsubscribe(topic: string): bool + { + return Cluster::__unsubscribe(topic); + } diff --git a/scripts/base/frameworks/cluster/types.zeek b/scripts/base/frameworks/cluster/types.zeek new file mode 100644 index 0000000000..f3243a93c6 --- /dev/null +++ b/scripts/base/frameworks/cluster/types.zeek @@ -0,0 +1,128 @@ +# Types used by the Cluster framework. +module Cluster; + +export { + ## Types of nodes that are allowed to participate in the cluster + ## configuration. + type NodeType: enum { + ## A dummy node type indicating the local node is not operating + ## within a cluster. + NONE, + ## A node type which is allowed to view/manipulate the configuration + ## of other nodes in the cluster. + CONTROL, + ## A node type responsible for log management. + LOGGER, + ## A node type responsible for policy management. + MANAGER, + ## A node type for relaying worker node communication and synchronizing + ## worker node state. + PROXY, + ## The node type doing all the actual traffic analysis. + WORKER, + }; + + ## Record type to indicate a node in a cluster. + type Node: record { + ## Identifies the type of cluster node in this node's configuration. + node_type: NodeType; + ## The IP address of the cluster node. + ip: addr; + ## If the *ip* field is a non-global IPv6 address, this field + ## can specify a particular :rfc:`4007` ``zone_id``. + zone_id: string &default=""; + ## The port that this node will listen on for peer connections. + ## A value of ``0/unknown`` means the node is not pre-configured to listen. + p: port &default=0/unknown; + ## Name of the manager node this node uses. For workers and proxies. + manager: string &optional; + ## A unique identifier assigned to the node by the broker framework. + ## This field is only set while a node is connected. + id: string &optional; + ## The port used to expose metrics to Prometheus. Setting this in a cluster + ## configuration will override the setting for Telemetry::metrics_port for + ## the node. + metrics_port: port &optional; + }; + + ## Record to represent a cluster node including its name. + type NamedNode: record { + name: string; + node: Node; + }; + + ## An event instance for cluster pub/sub. + ## + ## See :zeek:see:`Cluster::publish` and :zeek:see:`Cluster::make_event`. + type Event: record { + ## The event handler to be invoked on the remote node. + ev: any; + ## The arguments for the event. + args: vector of any; + }; + + ## The default maximum queue size for WebSocket event dispatcher instances. + ## + ## If the maximum queue size is reached, events from external WebSocket + ## clients will be stalled and processed once the queue has been drained. + ## + ## An internal metric named ``cluster_onloop_queue_stalls`` and + ## labeled with a ``WebSocketEventDispatcher::`` tag + ## is incremented when the maximum queue size is reached. + const default_websocket_max_event_queue_size = 32 &redef; + + ## The default ping interval for WebSocket clients. + const default_websocket_ping_interval = 5 sec &redef; + + ## The TLS options for a WebSocket server. + ## + ## If cert_file and key_file are set, TLS is enabled. If both + ## are unset, TLS is disabled. Any other combination is an error. + type WebSocketTLSOptions: record { + ## The cert file to use. + cert_file: string &optional; + ## The key file to use. + key_file: string &optional; + ## Expect peers to send client certificates. + enable_peer_verification: bool &default=F; + ## The CA certificate or CA bundle used for peer verification. + ## Empty will use the implementations's default when + ## ``enable_peer_verification`` is T. + ca_file: string &default=""; + ## The ciphers to use. Empty will use the implementation's defaults. + ciphers: string &default=""; + }; + + ## WebSocket server options to pass to :zeek:see:`Cluster::listen_websocket`. + type WebSocketServerOptions: record { + ## The address to listen on, cannot be used together with ``listen_host``. + listen_addr: addr &optional; + ## The port the WebSocket server is supposed to listen on. + listen_port: port; + ## The maximum event queue size for this server. + max_event_queue_size: count &default=default_websocket_max_event_queue_size; + ## Ping interval to use. A WebSocket client not responding to + ## the pings will be disconnected. Set to a negative value to + ## disable pings. Subsecond intervals are currently not supported. + ping_interval: interval &default=default_websocket_ping_interval; + ## The TLS options used for this WebSocket server. By default, + ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. + tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); + }; + + ## Network information of an endpoint. + type NetworkInfo: record { + ## The IP address or hostname where the endpoint listens. + address: string; + ## The port where the endpoint is bound to. + bound_port: port; + }; + + ## Information about a WebSocket endpoint. + type EndpointInfo: record { + id: string; + network: NetworkInfo; + ## The value of the X-Application-Name HTTP header, if any. + application_name: string &optional; + }; +} diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 92ae304ef1..57c9152075 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -136,10 +136,12 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek + scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 8d2b58f0e1..2476ec6e1b 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -136,10 +136,12 @@ scripts/base/init-frameworks-and-bifs.zeek scripts/base/frameworks/cluster/main.zeek scripts/base/frameworks/control/__load__.zeek scripts/base/frameworks/control/main.zeek - build/scripts/base/bif/cluster.bif.zeek + scripts/base/frameworks/cluster/types.zeek build/scripts/base/bif/plugins/Zeek_Cluster_WebSocket.events.bif.zeek + build/scripts/base/bif/cluster.bif.zeek scripts/base/frameworks/cluster/pools.zeek scripts/base/utils/hash_hrw.zeek + scripts/base/frameworks/cluster/pubsub.zeek scripts/base/frameworks/cluster/telemetry.zeek scripts/base/frameworks/config/__load__.zeek scripts/base/frameworks/config/main.zeek diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 8697db08b7..6ad9831781 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -477,6 +477,7 @@ 0.000000 MetaHookPost LoadFile(0, ./polling, <...>/polling.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./pools, <...>/pools.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./postprocessors, <...>/postprocessors) -> -1 +0.000000 MetaHookPost LoadFile(0, ./pubsub, <...>/pubsub.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./scp, <...>/scp.zeek) -> -1 @@ -793,6 +794,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./polling, <...>/polling.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./pools, <...>/pools.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./postprocessors, <...>/postprocessors) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./scp, <...>/scp.zeek) -> (-1, ) @@ -1419,6 +1421,7 @@ 0.000000 MetaHookPre LoadFile(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFile(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFile(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFile(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFile(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFile(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./scp, <...>/scp.zeek) @@ -1735,6 +1738,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./polling, <...>/polling.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./pools, <...>/pools.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./postprocessors, <...>/postprocessors) +0.000000 MetaHookPre LoadFileExtended(0, ./pubsub, <...>/pubsub.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./removal-hooks, <...>/removal-hooks.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./reporter.bif.zeek, <...>/reporter.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./scp, <...>/scp.zeek) @@ -2371,6 +2375,7 @@ 0.000000 | HookLoadFile ./pools <...>/pools.zeek 0.000000 | HookLoadFile ./postprocessors <...>/postprocessors 0.000000 | HookLoadFile ./programming <...>/programming.sig +0.000000 | HookLoadFile ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFile ./python <...>/python.sig 0.000000 | HookLoadFile ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFile ./reporter.bif.zeek <...>/reporter.bif.zeek @@ -2687,6 +2692,7 @@ 0.000000 | HookLoadFileExtended ./pools <...>/pools.zeek 0.000000 | HookLoadFileExtended ./postprocessors <...>/postprocessors 0.000000 | HookLoadFileExtended ./programming <...>/programming.sig +0.000000 | HookLoadFileExtended ./pubsub <...>/pubsub.zeek 0.000000 | HookLoadFileExtended ./python <...>/python.sig 0.000000 | HookLoadFileExtended ./removal-hooks <...>/removal-hooks.zeek 0.000000 | HookLoadFileExtended ./reporter.bif.zeek <...>/reporter.bif.zeek