##! ZeroMQ cluster backend support. ##! ##! For publish-subscribe functionality, one node in the Zeek cluster spawns a ##! thread running a central broker listening on a XPUB and XSUB socket. ##! These sockets are connected via `zmq_proxy() `_. ##! All other nodes connect to this central broker with their own XSUB and ##! XPUB sockets, establishing a global many-to-many publish-subscribe system ##! where each node sees subscriptions and messages from all other nodes in a ##! Zeek cluster. ZeroMQ's `publish-subscribe pattern `_ ##! documentation may be a good starting point. Elsewhere in ZeroMQ's documentation, ##! the central broker is also called `forwarder `_. ##! ##! For remote logging functionality, the ZeroMQ `pipeline pattern `_ ##! is used. All logger nodes listen on a PULL socket. Other nodes connect ##! via PUSH sockets to all of the loggers. Concretely, remote logging ##! functionality is not publish-subscribe, but instead leverages ZeroMQ's ##! built-in load-balancing functionality provided by PUSH and PULL ##! sockets. ##! ##! The ZeroMQ cluster backend technically allows to run a non-Zeek central ##! broker (it only needs to offer XPUB and XSUB sockets). Further, it is ##! possible to run non-Zeek logger nodes. All a logger node needs to do is ##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes ##! to send their log writes. module Cluster::Backend::ZeroMQ; export { ## The central broker's XPUB endpoint to connect to. ## ## A node connects with its XSUB socket to the XPUB socket ## of the central broker. const connect_xpub_endpoint = "tcp://127.0.0.1:5556" &redef; ## The central broker's XSUB endpoint to connect to. ## ## A node connects with its XPUB socket to the XSUB socket ## of the central broker. const connect_xsub_endpoint = "tcp://127.0.0.1:5555" &redef; ## Vector of ZeroMQ endpoints to connect to for logging. ## ## A node's PUSH socket used for logging connects to each ## of the ZeroMQ endpoints listed in this vector. const connect_log_endpoints: vector of string &redef; ## Toggle for running a central ZeroMQ XPUB-XSUB broker on this node. ## ## If set to ``T``, :zeek:see:`Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread` ## is called during :zeek:see:`zeek_init`. The node will listen ## on :zeek:see:`Cluster::Backend::ZeroMQ::listen_xsub_endpoint` and ## :zeek:see:`Cluster::Backend::ZeroMQ::listen_xpub_endpoint` and ## forward subscriptions and messages between nodes. ## ## By default, this is set to ``T`` on the manager and ``F`` elsewhere. const run_proxy_thread: bool = F &redef; ## XSUB listen endpoint for the central broker. ## ## This setting is used for the XSUB socket of the central broker started ## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. const listen_xsub_endpoint = "tcp://127.0.0.1:5556" &redef; ## XPUB listen endpoint for the central broker. ## ## This setting is used for the XPUB socket of the central broker started ## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. const listen_xpub_endpoint = "tcp://127.0.0.1:5555" &redef; ## PULL socket address to listen on for log messages. ## ## If empty, don't listen for log messages, otherwise ## a ZeroMQ address to bind to. E.g., ``tcp://127.0.0.1:5555``. const listen_log_endpoint = "" &redef; ## Configure the ZeroMQ's sockets linger value. ## ## The default used by libzmq is 30 seconds (30 000) which is very long ## when loggers vanish before workers during a shutdown, so we reduce ## this to 500 milliseconds by default. ## ## A value of ``-1`` configures blocking forever, while ``0`` would ## immediately discard any pending messages. ## ## See ZeroMQ's `ZMQ_LINGER documentation `_ ## for more details. const linger_ms: int = 500 &redef; ## Configure ZeroMQ's immedidate setting on PUSH sockets ## ## Setting this to ``T`` will queue log writes only to completed ## connections. By default, log writes are queued to all potential ## endpoints listed in :zeek:see:`Cluster::Backend::ZeroMQ::connect_log_endpoints`. ## ## See ZeroMQ's `ZMQ_IMMEDIATE documentation `_ ## for more details. const log_immediate: bool = F &redef; ## Send high water mark value for the log PUSH sockets. ## ## If reached, Zeek nodes will block or drop messages. ## ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ ## for more details. ## ## TODO: Make action configurable (block vs drop) const log_sndhwm: int = 1000 &redef; ## Receive high water mark value for the log PULL sockets. ## ## If reached, Zeek workers will block or drop messages. ## ## See ZeroMQ's `ZMQ_RCVHWM documentation `_ ## for more details. ## ## TODO: Make action configurable (block vs drop) const log_rcvhwm: int = 1000 &redef; ## Kernel transmit buffer size for log sockets. ## ## Using -1 will use the kernel's default. ## ## See ZeroMQ's `ZMQ_SNDBUF documentation `_. const log_sndbuf: int = -1 &redef; ## Kernel receive buffer size for log sockets. ## ## Using -1 will use the kernel's default. ## ## See ZeroMQ's `ZMQ_RCVBUF documentation `_ ## for more details. const log_rcvbuf: int = -1 &redef; ## Do not silently drop messages if high-water-mark is reached. ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket ## to detect when sending a message fails due to reaching ## the high-water-mark. ## ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ ## for more details. const xpub_nodrop: bool = T &redef; ## Do not silently drop messages if high-water-mark is reached. ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket ## to detect when sending a message fails due to reaching ## the high-water-mark. ## ## This setting applies to the XPUB/XSUB broker started when ## :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. ## ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ ## for more details. const listen_xpub_nodrop: bool = T &redef; ## Messages to receive before yielding. ## ## Yield from the receive loop when this many messages have been ## received from one of the used sockets. const poll_max_messages = 100 &redef; ## Bitmask to enable low-level stderr based debug printing. ## ## poll debugging: 1 (produce verbose zmq::poll() output) ## ## Or values from the above list together and set debug_flags ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this ## in development if something seems off. The thread used internally ## will produce output on stderr. const debug_flags: count = 0 &redef; ## The node topic prefix to use. global node_topic_prefix = "zeek.cluster.node" &redef; ## The node_id topic prefix to use. global nodeid_topic_prefix = "zeek.cluster.nodeid" &redef; ## Low-level event when a subscription is added. ## ## Every node observes all subscriptions from other nodes ## in a cluster through its XPUB socket. Whenever a new ## subscription topic is added, this event is raised with ## the topic. ## ## topic: The topic. global subscription: event(topic: string); ## Low-level event when a subscription vanishes. ## ## Every node observes all subscriptions from other nodes ## in a cluster through its XPUB socket. Whenever a subscription ## is removed from the local XPUB socket, this event is raised ## with the topic set to the removed subscription. ## ## topic: The topic. global unsubscription: event(topic: string); ## Low-level event send to a node in response to their subscription. ## ## name: The sending node's name in :zeek:see:`Cluster::nodes`. ## ## id: The sending node's identifier, as generated by :zeek:see:`Cluster::node_id`. global hello: event(name: string, id: string); ## Expiration for hello state. ## ## How long to wait before expiring information about ## subscriptions and hello messages from other ## nodes. These expirations trigger reporter warnings. const hello_expiration: interval = 10sec &redef; } redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; # By default, let the manager node run the proxy thread. redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER; function zeromq_node_topic(name: string): string { return node_topic_prefix + "." + name; } function zeromq_nodeid_topic(id: string): string { return nodeid_topic_prefix + "." + id; } # Unique identifier for this node with some debug information. const my_node_id = fmt("zeromq_%s_%s_%s_%s", Cluster::node, gethostname(), getpid(), unique_id("N")); function zeromq_node_id(): string { return my_node_id; } redef Cluster::node_topic = zeromq_node_topic; redef Cluster::nodeid_topic = zeromq_nodeid_topic; redef Cluster::node_id = zeromq_node_id; redef Cluster::logger_topic = "zeek.cluster.logger"; redef Cluster::manager_topic = "zeek.cluster.manager"; redef Cluster::proxy_topic = "zeek.cluster.proxy"; redef Cluster::worker_topic = "zeek.cluster.worker"; redef Cluster::proxy_pool_spec = Cluster::PoolSpec( $topic = "zeek.cluster.pool.proxy", $node_type = Cluster::PROXY); redef Cluster::logger_pool_spec = Cluster::PoolSpec( $topic = "zeek.cluster.pool.logger", $node_type = Cluster::LOGGER); redef Cluster::worker_pool_spec = Cluster::PoolSpec( $topic = "zeek.cluster.pool.worker", $node_type = Cluster::WORKER); # Configure listen_log_endpoint based on port in cluster-layout, if any. @if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) const my_node = Cluster::nodes[Cluster::node]; @if ( my_node?$p ) redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p)); @endif @endif # Populate connect_log_endpoints based on Cluster::nodes on non-logger nodes. # If you're experimenting with zero-logger clusters, ignore this code and set # connect_log_endpoints yourself via redef. event zeek_init() &priority=100 { if ( Cluster::local_node_type() == Cluster::LOGGER ) return; if ( Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER ) return; for ( _, node in Cluster::nodes ) { local endp: string; if ( node$node_type == Cluster::LOGGER && node?$p ) { endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); connect_log_endpoints += endp; } if ( Cluster::manager_is_logger && node$node_type == Cluster::MANAGER && node?$p ) { endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); connect_log_endpoints += endp; } } # If there's no endpoints configured, but more than a single # node in cluster layout, log an error as that's probably not # an intended configuration. if ( |connect_log_endpoints| == 0 && |Cluster::nodes| > 1 ) Reporter::error("No ZeroMQ connect_log_endpoints configured"); } function nodeid_subscription_expired(nodeids: set[string], nodeid: string): interval { Reporter::warning(fmt("Expired subscription from nodeid %s", nodeid)); return 0.0sec; } function nodeid_hello_expired(nodeids: set[string], nodeid: string): interval { Reporter::warning(fmt("Expired hello from nodeid %s", nodeid)); return 0.0sec; } # State about subscriptions and hellos seen from other nodes. global nodeid_subscriptions: set[string] &create_expire=hello_expiration &expire_func=nodeid_subscription_expired; global nodeid_hellos: set[string] &create_expire=hello_expiration &expire_func=nodeid_hello_expired; # The ZeroMQ plugin notifies script land when a new subscription arrived # on that node's XPUB socket. If the topic of such a subscription starts with # the nodeid_topic_prefix for another node A, node B seeing the subscription # sends ZeroMQ::hello() to the topic, announcing its own presence to node A. # Conversely, when node A sees the subscription for node B's nodeid topic, # it also sens ZeroMQ::hello(). In other words, every node says hello to all # other nodes based on subscriptions they observe on their local XPUB sockets. # # Once node B has seen both, the nodeid topic subscription and ZeroMQ::hello() # event from node A, it raises a Cluster::node_up() event for node A. # # See also the Cluster::Backend::ZeroMQ::hello() handler below. # # 1) node A subscribes to Cluster::nodeid_topic(Cluster::node_id()) # 2) node B observes subscription for node A's nodeid_topic and replies with ZeroMQ::hello() # 3) node A receives node B's nodeid_topic subscription, replies with ZeroMQ::hello() # 4) node B receives node A's ZeroMQ::hello() and raises Cluster::node_up() # as it has already seen node A's nodeid_topic subscription. event Cluster::Backend::ZeroMQ::subscription(topic: string) { local prefix = nodeid_topic_prefix + "."; if ( ! starts_with(topic, prefix) ) return; local nodeid = topic[|prefix|:]; # Do not say hello to ourselves - we won't see it anyhow. if ( nodeid == Cluster::node_id() ) return; Cluster::publish(topic, Cluster::Backend::ZeroMQ::hello, Cluster::node, Cluster::node_id()); # If we saw a ZeroMQ::hello from the other node already, send # it a Cluster::hello. if ( nodeid in nodeid_hellos ) { Cluster::publish(Cluster::nodeid_topic(nodeid), Cluster::hello, Cluster::node, Cluster::node_id()); delete nodeid_hellos[nodeid]; } else { add nodeid_subscriptions[nodeid]; } } # Receiving ZeroMQ::hello() from another node: If we received a subscription # for the node's nodeid_topic, reply with a Cluster::hello. If the node never # properly went away, log a warning and raise a Cluster::node_down() now. event Cluster::Backend::ZeroMQ::hello(name: string, id: string) { if ( name in Cluster::nodes ) { local n = Cluster::nodes[name]; if ( n?$id ) { if ( n$id == id ) { # Duplicate ZeroMQ::hello(), very strange, ignore it. Reporter::warning(fmt("node '%s' sends ZeroMQ::hello twice (id:%s)", name, id)); return; } Reporter::warning(fmt("node '%s' never said goodbye (old id:%s new id:%s", name, n$id, id)); # We raise node_down() here for the old instance, # but it's obviously fake and somewhat lying. event Cluster::node_down(name, n$id); } } # It is possible to publish Cluster::hello() directly if the nodeid_topic # subscription for the other node was already seen. Otherwise, remember # that Cluster::hello() has been seen and send Cluster::hello() in # subscription processing further up. if ( id in nodeid_subscriptions ) { Cluster::publish(Cluster::nodeid_topic(id), Cluster::hello, Cluster::node, Cluster::node_id()); delete nodeid_subscriptions[id]; } else { add nodeid_hellos[id]; } } # If the unsubscription is for a nodeid prefix, extract the # nodeid that's gone, find the name of the node from the # cluster layout and raise Cluster::node_down(). event Cluster::Backend::ZeroMQ::unsubscription(topic: string) { local prefix = nodeid_topic_prefix + "."; if ( ! starts_with(topic, prefix) ) return; local gone_node_id = topic[|prefix|:]; local name = ""; for ( node_name, n in Cluster::nodes ) { if ( n?$id && n$id == gone_node_id ) { name = node_name; break; } } if ( name != "" ) event Cluster::node_down(name, gone_node_id); else Reporter::warning(fmt("unsubscription of unknown node with id '%s'", gone_node_id)); }