diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/__load__.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/__load__.zeek new file mode 100644 index 0000000000..5f39cb27df --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/zeromq/__load__.zeek @@ -0,0 +1 @@ +@load ./main.zeek diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek new file mode 100644 index 0000000000..94aee459ae --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/zeromq/connect.zeek @@ -0,0 +1,14 @@ +##! Establish ZeroMQ connectivity with the broker. + +@load ./main + +module Cluster::Backend::ZeroMQ; + + +event zeek_init() &priority=10 + { + if ( run_proxy_thread ) + Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread(); + + Cluster::init(); + } diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek new file mode 100644 index 0000000000..52e8bff74b --- /dev/null +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -0,0 +1,424 @@ +##! ZeroMQ cluster backend support. +##! +##! For publish-subscribe functionality, one node in the Zeek cluster spawns a +##! thread running a central broker listening on a XPUB and XSUB socket. +##! These sockets are connected via `zmq_proxy() `_. +##! All other nodes connect to this central broker with their own XSUB and +##! XPUB sockets, establishing a global many-to-many publish-subscribe system +##! where each node sees subscriptions and messages from all other nodes in a +##! Zeek cluster. ZeroMQ's `publish-subscribe pattern `_ +##! documentation may be a good starting point. Elsewhere in ZeroMQ's documentation, +##! the central broker is also called `forwarder `_. +##! +##! For remote logging functionality, the ZeroMQ `pipeline pattern `_ +##! is used. All logger nodes listen on a PULL socket. Other nodes connect +##! via PUSH sockets to all of the loggers. Concretely, remote logging +##! functionality is not publish-subscribe, but instead leverages ZeroMQ's +##! built-in load-balancing functionality provided by PUSH and PULL +##! sockets. +##! +##! The ZeroMQ cluster backend technically allows to run a non-Zeek central +##! broker (it only needs to offer XPUB and XSUB sockets). Further, it is +##! possible to run non-Zeek logger nodes. All a logger node needs to do is +##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes +##! to send their log writes. +module Cluster::Backend::ZeroMQ; + +export { + ## The central broker's XPUB endpoint to connect to. + ## + ## A node connects with its XSUB socket to the XPUB socket + ## of the central broker. + const connect_xpub_endpoint = "tcp://127.0.0.1:5556" &redef; + + + ## The central broker's XSUB endpoint to connect to. + ## + ## A node connects with its XPUB socket to the XSUB socket + ## of the central broker. + const connect_xsub_endpoint = "tcp://127.0.0.1:5555" &redef; + + ## Vector of ZeroMQ endpoints to connect to for logging. + ## + ## A node's PUSH socket used for logging connects to each + ## of the ZeroMQ endpoints listed in this vector. + const connect_log_endpoints: vector of string &redef; + + ## Toggle for running a central ZeroMQ XPUB-XSUB broker on this node. + ## + ## If set to ``T``, :zeek:see:`Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread` + ## is called during :zeek:see:`zeek_init`. The node will listen + ## on :zeek:see:`Cluster::Backend::ZeroMQ::listen_xsub_endpoint` and + ## :zeek:see:`Cluster::Backend::ZeroMQ::listen_xpub_endpoint` and + ## forward subscriptions and messages between nodes. + ## + ## By default, this is set to ``T`` on the manager and ``F`` elsewhere. + const run_proxy_thread: bool = F &redef; + + ## XSUB listen endpoint for the central broker. + ## + ## This setting is used for the XSUB socket of the central broker started + ## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. + const listen_xsub_endpoint = "tcp://127.0.0.1:5556" &redef; + + ## XPUB listen endpoint for the central broker. + ## + ## This setting is used for the XPUB socket of the central broker started + ## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. + const listen_xpub_endpoint = "tcp://127.0.0.1:5555" &redef; + + ## PULL socket address to listen on for log messages. + ## + ## If empty, don't listen for log messages, otherwise + ## a ZeroMQ address to bind to. E.g., ``tcp://127.0.0.1:5555``. + const listen_log_endpoint = "" &redef; + + ## Configure the ZeroMQ's sockets linger value. + ## + ## The default used by libzmq is 30 seconds (30 000) which is very long + ## when loggers vanish before workers during a shutdown, so we reduce + ## this to 500 milliseconds by default. + ## + ## A value of ``-1`` configures blocking forever, while ``0`` would + ## immediately discard any pending messages. + ## + ## See ZeroMQ's `ZMQ_LINGER documentation `_ + ## for more details. + const linger_ms: int = 500 &redef; + + ## Configure ZeroMQ's immedidate setting on PUSH sockets + ## + ## Setting this to ``T`` will queue log writes only to completed + ## connections. By default, log writes are queued to all potential + ## endpoints listed in :zeek:see:`Cluster::Backend::ZeroMQ::connect_log_endpoints`. + ## + ## See ZeroMQ's `ZMQ_IMMEDIATE documentation `_ + ## for more details. + const log_immediate: bool = F &redef; + + ## Send high water mark value for the log PUSH sockets. + ## + ## If reached, Zeek nodes will block or drop messages. + ## + ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ + ## for more details. + ## + ## TODO: Make action configurable (block vs drop) + const log_sndhwm: int = 1000 &redef; + + ## Receive high water mark value for the log PULL sockets. + ## + ## If reached, Zeek workers will block or drop messages. + ## + ## See ZeroMQ's `ZMQ_RCVHWM documentation `_ + ## for more details. + ## + ## TODO: Make action configurable (block vs drop) + const log_rcvhwm: int = 1000 &redef; + + ## Kernel transmit buffer size for log sockets. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_SNDBUF documentation `_. + const log_sndbuf: int = -1 &redef; + + ## Kernel receive buffer size for log sockets. + ## + ## Using -1 will use the kernel's default. + ## + ## See ZeroMQ's `ZMQ_RCVBUF documentation `_ + ## for more details. + const log_rcvbuf: int = -1 &redef; + + ## Do not silently drop messages if high-water-mark is reached. + ## + ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket + ## to detect when sending a message fails due to reaching + ## the high-water-mark. + ## + ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ + ## for more details. + const xpub_nodrop: bool = T &redef; + + ## Do not silently drop messages if high-water-mark is reached. + ## + ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket + ## to detect when sending a message fails due to reaching + ## the high-water-mark. + ## + ## This setting applies to the XPUB/XSUB broker started when + ## :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``. + ## + ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ + ## for more details. + const listen_xpub_nodrop: bool = T &redef; + + ## Messages to receive before yielding. + ## + ## Yield from the receive loop when this many messages have been + ## received from one of the used sockets. + const poll_max_messages = 100 &redef; + + ## Bitmask to enable low-level stderr based debug printing. + ## + ## poll debugging: 1 (produce verbose zmq::poll() output) + ## + ## Or values from the above list together and set debug_flags + ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this + ## in development if something seems off. The thread used internally + ## will produce output on stderr. + const debug_flags: count = 0 &redef; + + ## The node topic prefix to use. + global node_topic_prefix = "zeek.cluster.node" &redef; + + ## The node_id topic prefix to use. + global nodeid_topic_prefix = "zeek.cluster.nodeid" &redef; + + ## Low-level event when a subscription is added. + ## + ## Every node observes all subscriptions from other nodes + ## in a cluster through its XPUB socket. Whenever a new + ## subscription topic is added, this event is raised with + ## the topic. + ## + ## topic: The topic. + global subscription: event(topic: string); + + ## Low-level event when a subscription vanishes. + ## + ## Every node observes all subscriptions from other nodes + ## in a cluster through its XPUB socket. Whenever a subscription + ## is removed from the local XPUB socket, this event is raised + ## with the topic set to the removed subscription. + ## + ## topic: The topic. + global unsubscription: event(topic: string); + + ## Low-level event send to a node in response to their subscription. + ## + ## name: The sending node's name in :zeek:see:`Cluster::nodes`. + ## + ## id: The sending node's identifier, as generated by :zeek:see:`Cluster::node_id`. + global hello: event(name: string, id: string); + + ## Expiration for hello state. + ## + ## How long to wait before expiring information about + ## subscriptions and hello messages from other + ## nodes. These expirations trigger reporter warnings. + const hello_expiration: interval = 10sec &redef; +} + +redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; + +# By default, let the manager node run the proxy thread. +redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER; + +function zeromq_node_topic(name: string): string { + return node_topic_prefix + "." + name; +} + +function zeromq_nodeid_topic(id: string): string { + return nodeid_topic_prefix + "." + id; +} + +# Unique identifier for this node with some debug information. +const my_node_id = fmt("zeromq_%s_%s_%s_%s", Cluster::node, gethostname(), getpid(), unique_id("N")); + +function zeromq_node_id(): string { + return my_node_id; +} + +redef Cluster::node_topic = zeromq_node_topic; +redef Cluster::nodeid_topic = zeromq_nodeid_topic; +redef Cluster::node_id = zeromq_node_id; + +redef Cluster::logger_topic = "zeek.cluster.logger"; +redef Cluster::manager_topic = "zeek.cluster.manager"; +redef Cluster::proxy_topic = "zeek.cluster.proxy"; +redef Cluster::worker_topic = "zeek.cluster.worker"; + +redef Cluster::proxy_pool_spec = Cluster::PoolSpec( + $topic = "zeek.cluster.pool.proxy", + $node_type = Cluster::PROXY); + +redef Cluster::logger_pool_spec = Cluster::PoolSpec( + $topic = "zeek.cluster.pool.logger", + $node_type = Cluster::LOGGER); + +redef Cluster::worker_pool_spec = Cluster::PoolSpec( + $topic = "zeek.cluster.pool.worker", + $node_type = Cluster::WORKER); + + +# Configure listen_log_endpoint based on port in cluster-layout, if any. +@if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) ) +const my_node = Cluster::nodes[Cluster::node]; +@if ( my_node?$p ) +redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p)); +@endif +@endif + +# Populate connect_log_endpoints based on Cluster::nodes on non-logger nodes. +# If you're experimenting with zero-logger clusters, ignore this code and set +# connect_log_endpoints yourself via redef. +event zeek_init() &priority=100 + { + if ( Cluster::local_node_type() == Cluster::LOGGER ) + return; + + if ( Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER ) + return; + + for ( _, node in Cluster::nodes ) + { + local endp: string; + if ( node$node_type == Cluster::LOGGER && node?$p ) + { + endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); + connect_log_endpoints += endp; + } + + if ( Cluster::manager_is_logger && node$node_type == Cluster::MANAGER && node?$p ) + { + endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p)); + connect_log_endpoints += endp; + } + } + + # If there's no endpoints configured, but more than a single + # node in cluster layout, log an error as that's probably not + # an intended configuration. + if ( |connect_log_endpoints| == 0 && |Cluster::nodes| > 1 ) + Reporter::error("No ZeroMQ connect_log_endpoints configured"); + } + +function nodeid_subscription_expired(nodeids: set[string], nodeid: string): interval + { + Reporter::warning(fmt("Expired subscription from nodeid %s", nodeid)); + return 0.0sec; + } + +function nodeid_hello_expired(nodeids: set[string], nodeid: string): interval + { + Reporter::warning(fmt("Expired hello from nodeid %s", nodeid)); + return 0.0sec; + } + +# State about subscriptions and hellos seen from other nodes. +global nodeid_subscriptions: set[string] &create_expire=hello_expiration &expire_func=nodeid_subscription_expired; +global nodeid_hellos: set[string] &create_expire=hello_expiration &expire_func=nodeid_hello_expired; + +# The ZeroMQ plugin notifies script land when a new subscription arrived +# on that node's XPUB socket. If the topic of such a subscription starts with +# the nodeid_topic_prefix for another node A, node B seeing the subscription +# sends ZeroMQ::hello() to the topic, announcing its own presence to node A. +# Conversely, when node A sees the subscription for node B's nodeid topic, +# it also sens ZeroMQ::hello(). In other words, every node says hello to all +# other nodes based on subscriptions they observe on their local XPUB sockets. +# +# Once node B has seen both, the nodeid topic subscription and ZeroMQ::hello() +# event from node A, it raises a Cluster::node_up() event for node A. +# +# See also the Cluster::Backend::ZeroMQ::hello() handler below. +# +# 1) node A subscribes to Cluster::nodeid_topic(Cluster::node_id()) +# 2) node B observes subscription for node A's nodeid_topic and replies with ZeroMQ::hello() +# 3) node A receives node B's nodeid_topic subscription, replies with ZeroMQ::hello() +# 4) node B receives node A's ZeroMQ::hello() and raises Cluster::node_up() +# as it has already seen node A's nodeid_topic subscription. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + local prefix = nodeid_topic_prefix + "."; + + if ( ! starts_with(topic, prefix) ) + return; + + local nodeid = topic[|prefix|:]; + + # Do not say hello to ourselves - we won't see it anyhow. + if ( nodeid == Cluster::node_id() ) + return; + + Cluster::publish(topic, Cluster::Backend::ZeroMQ::hello, Cluster::node, Cluster::node_id()); + + # If we saw a ZeroMQ::hello from the other node already, send + # it a Cluster::hello. + if ( nodeid in nodeid_hellos ) + { + Cluster::publish(Cluster::nodeid_topic(nodeid), Cluster::hello, Cluster::node, Cluster::node_id()); + delete nodeid_hellos[nodeid]; + } + else + { + add nodeid_subscriptions[nodeid]; + } + } + +# Receiving ZeroMQ::hello() from another node: If we received a subscription +# for the node's nodeid_topic, reply with a Cluster::hello. If the node never +# properly went away, log a warning and raise a Cluster::node_down() now. +event Cluster::Backend::ZeroMQ::hello(name: string, id: string) + { + if ( name in Cluster::nodes ) + { + local n = Cluster::nodes[name]; + if ( n?$id ) + { + if ( n$id == id ) + { + # Duplicate ZeroMQ::hello(), very strange, ignore it. + Reporter::warning(fmt("node '%s' sends ZeroMQ::hello twice (id:%s)", + name, id)); + return; + } + + Reporter::warning(fmt("node '%s' never said goodbye (old id:%s new id:%s", + name, n$id, id)); + + # We raise node_down() here for the old instance, + # but it's obviously fake and somewhat lying. + event Cluster::node_down(name, n$id); + } + } + + # It is possible to publish Cluster::hello() directly if the nodeid_topic + # subscription for the other node was already seen. Otherwise, remember + # that Cluster::hello() has been seen and send Cluster::hello() in + # subscription processing further up. + if ( id in nodeid_subscriptions ) + { + Cluster::publish(Cluster::nodeid_topic(id), Cluster::hello, Cluster::node, Cluster::node_id()); + delete nodeid_subscriptions[id]; + } + else + { + add nodeid_hellos[id]; + } + } + +# If the unsubscription is for a nodeid prefix, extract the +# nodeid that's gone, find the name of the node from the +# cluster layout and raise Cluster::node_down(). +event Cluster::Backend::ZeroMQ::unsubscription(topic: string) + { + local prefix = nodeid_topic_prefix + "."; + if ( ! starts_with(topic, prefix) ) + return; + + local gone_node_id = topic[|prefix|:]; + local name = ""; + for ( node_name, n in Cluster::nodes ) { + if ( n?$id && n$id == gone_node_id ) { + name = node_name; + break; + } + } + + if ( name != "" ) + event Cluster::node_down(name, gone_node_id); + else + Reporter::warning(fmt("unsubscription of unknown node with id '%s'", gone_node_id)); + } diff --git a/scripts/test-all-policy.zeek b/scripts/test-all-policy.zeek index b515dd234b..58930dc194 100644 --- a/scripts/test-all-policy.zeek +++ b/scripts/test-all-policy.zeek @@ -11,6 +11,9 @@ # @load frameworks/control/controllee.zeek # @load frameworks/control/controller.zeek +@load frameworks/cluster/backend/zeromq/__load__.zeek +# @load frameworks/cluster/backend/zeromq/connect.zeek +@load frameworks/cluster/backend/zeromq/main.zeek @load frameworks/cluster/experimental.zeek # Loaded via the above through test-all-policy-cluster.test # when running as a manager, creates cluster.log entries diff --git a/scripts/zeekygen/__load__.zeek b/scripts/zeekygen/__load__.zeek index 5cda9d6263..d22dba2a97 100644 --- a/scripts/zeekygen/__load__.zeek +++ b/scripts/zeekygen/__load__.zeek @@ -2,6 +2,7 @@ # Scripts which are commented out in test-all-policy.zeek. @load protocols/ssl/decryption.zeek +@load frameworks/cluster/backend/zeromq/connect.zeek @load frameworks/cluster/nodes-experimental/manager.zeek @load frameworks/control/controllee.zeek @load frameworks/control/controller.zeek @@ -28,6 +29,7 @@ event zeek_init() &priority=1000 # fail when run under zeekygen. For the purpose of zeekygen, we could # probably disable all modules, too. disable_module_events("Control"); + disable_module_events("Cluster::Backend::ZeroMQ"); disable_module_events("Management::Agent::Runtime"); disable_module_events("Management::Controller::Runtime"); disable_module_events("Management::Node"); diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt index 14a063943d..bb421c6c01 100644 --- a/src/cluster/CMakeLists.txt +++ b/src/cluster/CMakeLists.txt @@ -11,4 +11,5 @@ zeek_add_subdir_library( BIFS cluster.bif) +add_subdirectory(backend) add_subdirectory(serializer) diff --git a/src/cluster/backend/CMakeLists.txt b/src/cluster/backend/CMakeLists.txt new file mode 100644 index 0000000000..0e5d704186 --- /dev/null +++ b/src/cluster/backend/CMakeLists.txt @@ -0,0 +1,4 @@ +option(ENABLE_CLUSTER_BACKEND_ZEROMQ "Enable the ZeroMQ cluster backend" ON) +if (ENABLE_CLUSTER_BACKEND_ZEROMQ) + add_subdirectory(zeromq) +endif () diff --git a/src/cluster/backend/zeromq/CMakeLists.txt b/src/cluster/backend/zeromq/CMakeLists.txt new file mode 100644 index 0000000000..a15923445a --- /dev/null +++ b/src/cluster/backend/zeromq/CMakeLists.txt @@ -0,0 +1,21 @@ +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +find_package(ZeroMQ REQUIRED) + +message(STATUS "zeromq: ${ZeroMQ_LIBRARIES} ${ZeroMQ_INCLUDE_DIRS}") + +zeek_add_plugin( + Zeek + Cluster_Backend_ZeroMQ + INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ${ZeroMQ_INCLUDE_DIRS} + DEPENDENCIES + ${ZeroMQ_LIBRARIES} + SOURCES + Plugin.cc + ZeroMQ-Proxy.cc + ZeroMQ.cc + BIFS + cluster_backend_zeromq.bif) diff --git a/src/cluster/backend/zeromq/Plugin.cc b/src/cluster/backend/zeromq/Plugin.cc new file mode 100644 index 0000000000..ca823f0e54 --- /dev/null +++ b/src/cluster/backend/zeromq/Plugin.cc @@ -0,0 +1,22 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/backend/zeromq/Plugin.h" + +#include "zeek/cluster/Component.h" +#include "zeek/cluster/backend/zeromq/ZeroMQ.h" + + +namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ { + +Plugin plugin; + +zeek::plugin::Configuration Plugin::Configure() { + AddComponent(new cluster::BackendComponent("ZeroMQ", zeek::cluster::zeromq::ZeroMQBackend::Instantiate)); + + zeek::plugin::Configuration config; + config.name = "Zeek::Cluster_Backend_ZeroMQ"; + config.description = "Cluster backend using ZeroMQ"; + return config; +} + +} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ diff --git a/src/cluster/backend/zeromq/Plugin.h b/src/cluster/backend/zeromq/Plugin.h new file mode 100644 index 0000000000..882a7dc9ec --- /dev/null +++ b/src/cluster/backend/zeromq/Plugin.h @@ -0,0 +1,14 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include "zeek/plugin/Plugin.h" + +namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ { + +class Plugin : public zeek::plugin::Plugin { +public: + zeek::plugin::Configuration Configure() override; +}; + +} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc new file mode 100644 index 0000000000..3dae7639cd --- /dev/null +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.cc @@ -0,0 +1,72 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" + +#include + +#include "zeek/Reporter.h" +#include "zeek/util.h" + + +using namespace zeek::cluster::zeromq; + +namespace { + +/** + * Function that runs zmq_proxy() that provides a central XPUB/XSUB + * broker for other Zeek nodes to connect and exchange subscription + * information. + */ +void thread_fun(ProxyThread::Args* args) { + zeek::util::detail::set_thread_name("zmq-proxy-thread"); + + try { + zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/); + } catch ( zmq::error_t& err ) { + args->xsub.close(); + args->xpub.close(); + + if ( err.num() != ETERM ) { + std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num()); + throw; + } + } +} + +} // namespace + +bool ProxyThread::Start() { + zmq::socket_t xpub(ctx, zmq::socket_type::xpub); + zmq::socket_t xsub(ctx, zmq::socket_type::xsub); + + xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); + + try { + xpub.bind(xpub_endpoint); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + return false; + } + + try { + xsub.bind(xsub_endpoint); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num()); + return false; + } + + args = {.xpub = std::move(xpub), .xsub = std::move(xsub)}; + + thread = std::thread(thread_fun, &args); + + return true; +} + +void ProxyThread::Shutdown() { + ctx.shutdown(); + + if ( thread.joinable() ) + thread.join(); + + ctx.close(); +} diff --git a/src/cluster/backend/zeromq/ZeroMQ-Proxy.h b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h new file mode 100644 index 0000000000..de33d3da1c --- /dev/null +++ b/src/cluster/backend/zeromq/ZeroMQ-Proxy.h @@ -0,0 +1,56 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include +#include +#include + + +// Central XPUB/XSUB proxy. +// +// Spawns a thread that runs zmq_proxy() for a XPUB/XSUB pair. +namespace zeek::cluster::zeromq { + +class ProxyThread { +public: + /** + * Constructor. + * + * @param xpub_endpoint the XPUB socket address to listen on. + * @param xsub_endpoint the XSUB socket address to listen on. + * @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket. + */ + ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop) + : xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), xpub_nodrop(xpub_nodrop) {} + + + ~ProxyThread() { Shutdown(); } + + /** + * Data kept in object and passed to thread. + */ + struct Args { + zmq::socket_t xpub; + zmq::socket_t xsub; + }; + + /** + * Bind the sockets and spawn the thread. + */ + bool Start(); + + /** + * Shutdown the ZeroMQ context and join the thread. + */ + void Shutdown(); + +private: + zmq::context_t ctx; + std::thread thread; + Args args; + std::string xpub_endpoint; + std::string xsub_endpoint; + int xpub_nodrop = 1; +}; +} // namespace zeek::cluster::zeromq diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc new file mode 100644 index 0000000000..02fd795bbf --- /dev/null +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -0,0 +1,569 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "ZeroMQ.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "zeek/DebugLogger.h" +#include "zeek/Event.h" +#include "zeek/EventRegistry.h" +#include "zeek/IntrusivePtr.h" +#include "zeek/Reporter.h" +#include "zeek/Val.h" +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/cluster/backend/zeromq/Plugin.h" +#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" + +namespace zeek { + +namespace plugin::Zeek_Cluster_Backend_ZeroMQ { + +extern zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::Plugin plugin; + +} + +namespace cluster::zeromq { + +enum class DebugFlag : zeek_uint_t { + NONE = 0, + POLL = 1, +}; + +constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { + return static_cast(x & static_cast(y)); +} + +#define ZEROMQ_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::plugin, __VA_ARGS__) + +#define ZEROMQ_THREAD_PRINTF(...) \ + do { \ + std::fprintf(stderr, "[zeromq] " __VA_ARGS__); \ + } while ( 0 ) + +#define ZEROMQ_DEBUG_THREAD_PRINTF(flag, ...) \ + do { \ + if ( (debug_flags & flag) == flag ) { \ + ZEROMQ_THREAD_PRINTF(__VA_ARGS__); \ + } \ + } while ( 0 ) + +namespace { +void self_thread_fun(void* arg) { + auto* self = static_cast(arg); + self->Run(); +} + +} // namespace + + +// Constructor. +ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls) + : ThreadedBackend(std::move(es), std::move(ls)) { + xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); + xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); + log_push = zmq::socket_t(ctx, zmq::socket_type::push); + log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); + + main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); + child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); +} + +void ZeroMQBackend::DoInitPostScript() { + ThreadedBackend::DoInitPostScript(); + + my_node_id = zeek::id::find_val("Cluster::Backend::ZeroMQ::my_node_id")->ToStdString(); + listen_xpub_endpoint = + zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString(); + listen_xsub_endpoint = + zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString(); + listen_xpub_nodrop = + zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0; + connect_xpub_endpoint = + zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xpub_endpoint")->ToStdString(); + connect_xsub_endpoint = + zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_xsub_endpoint")->ToStdString(); + listen_log_endpoint = + zeek::id::find_val("Cluster::Backend::ZeroMQ::listen_log_endpoint")->ToStdString(); + poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); + debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); + + event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); + event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); + + main_inproc.bind("inproc://publish-bridge"); + child_inproc.connect("inproc://publish-bridge"); +} + + +void ZeroMQBackend::DoTerminate() { + ZEROMQ_DEBUG("Shutting down ctx"); + ctx.shutdown(); + ZEROMQ_DEBUG("Joining self_thread"); + if ( self_thread.joinable() ) + self_thread.join(); + + log_push.close(); + log_pull.close(); + xsub.close(); + xpub.close(); + main_inproc.close(); + child_inproc.close(); + + ZEROMQ_DEBUG("Closing ctx"); + ctx.close(); + + // If running the proxy thread, terminate it, too. + if ( proxy_thread ) { + ZEROMQ_DEBUG("Shutting down proxy thread"); + proxy_thread->Shutdown(); + } + + ZEROMQ_DEBUG("Terminated"); +} + +bool ZeroMQBackend::DoInit() { + auto linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); + int xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; + + xpub.set(zmq::sockopt::linger, linger_ms); + xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop); + + try { + xsub.connect(connect_xsub_endpoint); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what()); + return false; + } + + try { + xpub.connect(connect_xpub_endpoint); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what()); + return false; + } + + + auto log_immediate = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_immediate")->AsBool()); + + auto log_sndhwm = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt()); + + auto log_sndbuf = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt()); + + auto log_rcvhwm = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt()); + + auto log_rcvbuf = + static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt()); + + ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf, + log_rcvhwm, log_rcvbuf, linger_ms); + + log_push.set(zmq::sockopt::sndhwm, log_sndhwm); + log_push.set(zmq::sockopt::sndbuf, log_sndbuf); + log_push.set(zmq::sockopt::linger, linger_ms); + log_push.set(zmq::sockopt::immediate, log_immediate); + + log_pull.set(zmq::sockopt::rcvhwm, log_rcvhwm); + log_pull.set(zmq::sockopt::rcvbuf, log_rcvbuf); + + + if ( ! listen_log_endpoint.empty() ) { + ZEROMQ_DEBUG("Listening on log pull socket: %s", listen_log_endpoint.c_str()); + try { + log_pull.bind(listen_log_endpoint); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(), + err.what()); + return false; + } + } + + const auto& log_endpoints = zeek::id::find_val("Cluster::Backend::ZeroMQ::connect_log_endpoints"); + for ( unsigned int i = 0; i < log_endpoints->Size(); i++ ) + connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString()); + + for ( const auto& endp : connect_log_endpoints ) { + ZEROMQ_DEBUG("Connecting log_push socket with %s", endp.c_str()); + try { + log_push.connect(endp); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what()); + return false; + } + } + + // At this point we've connected xpub/xsub and any logging endpoints. + // However, we cannot tell if we're connected to anything as ZeroMQ does + // not trivially expose this information. + // + // There is the zmq_socket_monitor() API that we could use to get some + // more low-level events in the future for logging and possibly script + // layer eventing: http://api.zeromq.org/4-2:zmq-socket-monitor + + + // As of now, message processing happens in a separate thread that is + // started below. If we wanted to integrate ZeroMQ as a selectable IO + // source rather than going through ThreadedBackend and its flare, the + // following post might be useful: + // + // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ + self_thread = std::thread(self_thread_fun, this); + + // After connecting, call ThreadedBackend::DoInit() to register + // the IO source with the loop. + return ThreadedBackend::DoInit(); +} + +bool ZeroMQBackend::SpawnZmqProxyThread() { + proxy_thread = std::make_unique(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop); + return proxy_thread->Start(); +} + +bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format, + const cluster::detail::byte_buffer& buf) { + // Publishing an event happens as a multipart message with 4 parts: + // + // * The topic to publish to - this is required by XPUB/XSUB + // * The node's identifier - see Cluster::node_id(). + // * The format used to serialize the event. + // * The serialized event itself. + std::array parts = { + zmq::const_buffer(topic.data(), topic.size()), + zmq::const_buffer(my_node_id.data(), my_node_id.size()), + zmq::const_buffer(format.data(), format.size()), + zmq::const_buffer(buf.data(), buf.size()), + }; + + ZEROMQ_DEBUG("Publishing %zu bytes to %s", buf.size(), topic.c_str()); + + for ( size_t i = 0; i < parts.size(); i++ ) { + zmq::send_flags flags = zmq::send_flags::none; + if ( i < parts.size() - 1 ) + flags = flags | zmq::send_flags::sndmore; + + // This should never fail, it will instead block + // when HWM is reached. I guess we need to see if + // and how this can happen :-/ + main_inproc.send(parts[i], flags); + } + + return true; +} + +bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) { + ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str()); + try { + // Prepend 0x01 byte to indicate subscription to XSUB socket + // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). + std::string msg = "\x01" + topic_prefix; + xsub.send(zmq::const_buffer(msg.data(), msg.size())); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); + return false; + } + + return true; +} + +bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { + ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str()); + try { + // Prepend 0x00 byte to indicate subscription to XSUB socket. + // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). + std::string msg = "\x00" + topic_prefix; + xsub.send(zmq::const_buffer(msg.data(), msg.size())); + } catch ( zmq::error_t& err ) { + zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); + return false; + } + + return true; +} + +bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, + cluster::detail::byte_buffer& buf) { + ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str()); + static std::string message_type = "log-write"; + + // Publishing a log write is done using 4 parts + // + // * A constant "log-write" string + // * The node's identifier - see Cluster::node_id(). + // * The format used to serialize the log write. + // * The serialized log write itself. + std::array parts = { + zmq::const_buffer{message_type.data(), message_type.size()}, + zmq::const_buffer(my_node_id.data(), my_node_id.size()), + zmq::const_buffer{format.data(), format.size()}, + zmq::const_buffer{buf.data(), buf.size()}, + }; + + zmq::send_result_t result; + for ( size_t i = 0; i < parts.size(); i++ ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( i < parts.size() - 1 ) + flags = flags | zmq::send_flags::sndmore; + + result = log_push.send(parts[i], flags); + if ( ! result ) { + // XXX: Not exactly clear what we should do if we reach HWM. + // we could block and hope a logger comes along that empties + // our internal queue, or discard messages and log very loudly + // and have metrics about it. However, this may happen regularly + // at shutdown. + // + // Maybe that should be configurable? + + // If no logging endpoints were configured, that almost seems on + // purpose (and there's a warning elsewhere about this), so skip + // logging an error when sending fails. + if ( connect_log_endpoints.empty() ) + return true; + + reporter->Error("Failed to send log write. HWM reached?"); + return false; + } + } + + return true; +} + +void ZeroMQBackend::Run() { + using MultipartMessage = std::vector; + + auto HandleLogMessages = [this](const std::vector& msgs) { + QueueMessages qmsgs; + qmsgs.reserve(msgs.size()); + + for ( const auto& msg : msgs ) { + // sender, format, type, payload + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); + continue; + } + + detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + qmsgs.emplace_back(LogMessage{.format = std::string(msg[2].data(), msg[2].size()), + .payload = std::move(payload)}); + } + + QueueForProcessing(std::move(qmsgs)); + }; + + auto HandleInprocMessages = [this](std::vector& msgs) { + // Forward messages from the inprocess bridge to xpub. + for ( auto& msg : msgs ) { + assert(msg.size() == 4); + + for ( auto& part : msg ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( part.more() ) + flags = flags | zmq::send_flags::sndmore; + + zmq::send_result_t result; + do { + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + // XXX: Not sure if the return false is so great here. + // + // Also, if we fail to publish, should we block rather + // than discard? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num()); + break; + } + // EAGAIN returns empty result, means try again! + } while ( ! result ); + } + } + }; + + auto HandleXPubMessages = [this](const std::vector& msgs) { + QueueMessages qmsgs; + qmsgs.reserve(msgs.size()); + + for ( const auto& msg : msgs ) { + if ( msg.size() != 1 ) { + ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + continue; + } + + // Check if the messages starts with \x00 or \x01 to understand if it's + // a subscription or unsubscription message. + auto first = *reinterpret_cast(msg[0].data()); + if ( first == 0 || first == 1 ) { + QueueMessage qm; + auto* start = msg[0].data() + 1; + auto* end = msg[0].data() + msg[0].size(); + detail::byte_buffer topic(start, end); + if ( first == 1 ) { + qm = BackendMessage{1, std::move(topic)}; + } + else if ( first == 0 ) { + qm = BackendMessage{0, std::move(topic)}; + } + else { + ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); + continue; + } + + qmsgs.emplace_back(std::move(qm)); + } + } + + QueueForProcessing(std::move(qmsgs)); + }; + + auto HandleXSubMessages = [this](const std::vector& msgs) { + QueueMessages qmsgs; + qmsgs.reserve(msgs.size()); + + for ( const auto& msg : msgs ) { + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); + continue; + } + + // Filter out messages that are coming from this node. + std::string sender(msg[1].data(), msg[1].size()); + if ( sender == my_node_id ) + continue; + + detail::byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + qmsgs.emplace_back(EventMessage{.topic = std::string(msg[0].data(), msg[0].size()), + .format = std::string(msg[2].data(), msg[2].size()), + .payload = std::move(payload)}); + } + + QueueForProcessing(std::move(qmsgs)); + }; + + struct SocketInfo { + zmq::socket_ref socket; + std::string name; + std::function&)> handler; + }; + + std::vector sockets = { + {.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages}, + {.socket = xpub, .name = "xpub", .handler = HandleXPubMessages}, + {.socket = xsub, .name = "xsub", .handler = HandleXSubMessages}, + {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, + }; + + std::vector poll_items(sockets.size()); + + while ( true ) { + for ( size_t i = 0; i < sockets.size(); i++ ) + poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR}; + + // Awkward. + std::vector> rcv_messages(sockets.size()); + try { + int r = zmq::poll(poll_items, std::chrono::seconds(-1)); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r); + + for ( size_t i = 0; i < poll_items.size(); i++ ) { + const auto& item = poll_items[i]; + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: items[%lu]=%s %s %s\n", i, sockets[i].name.c_str(), + item.revents & ZMQ_POLLIN ? "pollin " : "", + item.revents & ZMQ_POLLERR ? "err" : ""); + + if ( item.revents & ZMQ_POLLERR ) { + // What should we be doing? Re-open sockets? Terminate? + ZEROMQ_THREAD_PRINTF("poll: error: POLLERR on socket %zu %s %p revents=%x\n", i, + sockets[i].name.c_str(), item.socket, item.revents); + } + + // Nothing to do? + if ( (item.revents & ZMQ_POLLIN) == 0 ) + continue; + + bool consumed_one = false; + + // Read messages from the socket. + do { + zmq::message_t msg; + rcv_messages[i].emplace_back(); // make room for a multipart message + auto& into = rcv_messages[i].back(); + + // Only receive up to poll_max_messages from an individual + // socket. Move on to the next when exceeded. The last pushed + // message (empty) is popped at the end of the loop. + if ( poll_max_messages > 0 && rcv_messages[i].size() > poll_max_messages ) { + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: %s rcv_messages[%zu] full!\n", + sockets[i].name.c_str(), i); + break; + } + + consumed_one = false; + bool more = false; + + // Read a multi-part message. + do { + auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait); + if ( recv_result ) { + consumed_one = true; + more = msg.more(); + into.emplace_back(std::move(msg)); + } + else { + // EAGAIN and more flag set? Try again! + if ( more ) + continue; + } + } while ( more ); + } while ( consumed_one ); + + assert(rcv_messages[i].back().size() == 0); + rcv_messages[i].pop_back(); + } + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; + + throw; + } + + // At this point, we've received anything that was readable from the sockets. + // Now interpret and enqueue it into messages. + for ( size_t i = 0; i < sockets.size(); i++ ) { + if ( rcv_messages[i].empty() ) + continue; + + sockets[i].handler(rcv_messages[i]); + } + } +} + +bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { + if ( tag == 0 || tag == 1 ) { + std::string topic{reinterpret_cast(payload.data()), payload.size()}; + zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription; + + ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str()); + zeek::event_mgr.Enqueue(eh, zeek::make_intrusive(topic)); + return true; + } + else { + zeek::reporter->Error("Ignoring bad BackendMessage tag=%d", tag); + return false; + } +} + + +} // namespace cluster::zeromq +} // namespace zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h new file mode 100644 index 0000000000..8a715b8c28 --- /dev/null +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -0,0 +1,99 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#pragma once + +#include +#include +#include + +#include "zeek/cluster/Backend.h" +#include "zeek/cluster/Serializer.h" +#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" + +namespace zeek::cluster::zeromq { + +class ZeroMQBackend : public cluster::ThreadedBackend { +public: + /** + * Constructor. + */ + ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls); + + /** + * Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen + * sockets. Only one node in a cluster should do this. + */ + bool SpawnZmqProxyThread(); + + /** + * Run method for background thread. + */ + void Run(); + + /** + * Component factory. + */ + static std::unique_ptr Instantiate(std::unique_ptr event_serializer, + std::unique_ptr log_serializer) { + return std::make_unique(std::move(event_serializer), std::move(log_serializer)); + } + +private: + void DoInitPostScript() override; + + bool DoInit() override; + + void DoTerminate() override; + + bool DoPublishEvent(const std::string& topic, const std::string& format, + const cluster::detail::byte_buffer& buf) override; + + bool DoSubscribe(const std::string& topic_prefix) override; + + bool DoUnsubscribe(const std::string& topic_prefix) override; + + bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format, + cluster::detail::byte_buffer& buf) override; + + const char* Tag() override { return "ZeroMQ"; } + + bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override; + + // Script level variables. + std::string my_node_id; + std::string connect_xsub_endpoint; + std::string connect_xpub_endpoint; + std::string listen_xsub_endpoint; + std::string listen_xpub_endpoint; + std::string listen_log_endpoint; + int listen_xpub_nodrop = 1; + + zeek_uint_t poll_max_messages = 0; + zeek_uint_t debug_flags = 0; + + EventHandlerPtr event_subscription; + EventHandlerPtr event_unsubscription; + + zmq::context_t ctx; + zmq::socket_t xsub; + zmq::socket_t xpub; + + // inproc sockets used for sending + // publish messages to xpub in a + // thread safe manner. + zmq::socket_t main_inproc; + zmq::socket_t child_inproc; + + // Sockets used for logging. The log_push socket connects + // with one or more logger-like nodes. Logger nodes listen + // on the log_pull socket. + std::vector connect_log_endpoints; + zmq::socket_t log_push; + zmq::socket_t log_pull; + + std::thread self_thread; + + std::unique_ptr proxy_thread; +}; + +} // namespace zeek::cluster::zeromq diff --git a/src/cluster/backend/zeromq/cluster_backend_zeromq.bif b/src/cluster/backend/zeromq/cluster_backend_zeromq.bif new file mode 100644 index 0000000000..4721a19c6b --- /dev/null +++ b/src/cluster/backend/zeromq/cluster_backend_zeromq.bif @@ -0,0 +1,16 @@ +%%{ +#include "ZeroMQ.h" +%%} + +function Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread%(%): bool + %{ + // Spawn the ZeroMQ broker thread. + auto *zeromq_backend = dynamic_cast(zeek::cluster::backend); + if ( ! zeromq_backend ) + { + zeek::emit_builtin_error("Cluster::backend not set to ZeroMQ?"); + return zeek::val_mgr->Bool(false); + } + + return zeek::val_mgr->Bool(zeromq_backend->SpawnZmqProxyThread()); + %} diff --git a/src/cluster/backend/zeromq/cmake/FindZeroMQ.cmake b/src/cluster/backend/zeromq/cmake/FindZeroMQ.cmake new file mode 100644 index 0000000000..964f6a457d --- /dev/null +++ b/src/cluster/backend/zeromq/cmake/FindZeroMQ.cmake @@ -0,0 +1,52 @@ +include(FindPackageHandleStandardArgs) + +find_library(ZeroMQ_LIBRARY NAMES zmq HINTS ${ZeroMQ_ROOT_DIR}/lib) + +find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.h HINTS ${ZeroMQ_ROOT_DIR}/include) + +find_path(ZeroMQ_CPP_INCLUDE_DIR NAMES zmq.hpp HINTS ${ZeroMQ_ROOT_DIR}/include) + +function (set_cppzmq_version) + # Extract the version from + file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MAJOR_VERSION_H + REGEX "^#define CPPZMQ_VERSION_MAJOR [0-9]+$") + file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MINOR_VERSION_H + REGEX "^#define CPPZMQ_VERSION_MINOR [0-9]+$") + file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_PATCH_VERSION_H + REGEX "^#define CPPZMQ_VERSION_PATCH [0-9]+$") + string(REGEX REPLACE "^.*MAJOR ([0-9]+)$" "\\1" CPPZMQ_MAJOR_VERSION + "${CPPZMQ_MAJOR_VERSION_H}") + string(REGEX REPLACE "^.*MINOR ([0-9]+)$" "\\1" CPPZMQ_MINOR_VERSION + "${CPPZMQ_MINOR_VERSION_H}") + string(REGEX REPLACE "^.*PATCH ([0-9]+)$" "\\1" CPPZMQ_PATCH_VERSION + "${CPPZMQ_PATCH_VERSION_H}") + + set(ZeroMQ_CPP_VERSION "${CPPZMQ_MAJOR_VERSION}.${CPPZMQ_MINOR_VERSION}.${CPPZMQ_PATCH_VERSION}" + PARENT_SCOPE) +endfunction () + +if (ZeroMQ_CPP_INCLUDE_DIR) + set_cppzmq_version() +endif () + +if (NOT ZeroMQ_CPP_VERSION) + # Probably no zmq.hpp file, use the version from auxil + set(ZeroMQ_CPP_INCLUDE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/auxil/cppzmq" + CACHE FILEPATH "Include path for cppzmq" FORCE) + set_cppzmq_version() +elseif (ZeroMQ_CPP_VERSION VERSION_LESS "4.9.0") + message(STATUS "Found old cppzmq version ${ZeroMQ_CPP_VERSION}, using bundled version") + set(ZeroMQ_CPP_INCLUDE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/auxil/cppzmq" + CACHE FILEPATH "Include path for cppzmq" FORCE) + set_cppzmq_version() +endif () + +message(STATUS "Using cppzmq ${ZeroMQ_CPP_VERSION} from ${ZeroMQ_CPP_INCLUDE_DIR}") + +find_package_handle_standard_args( + ZeroMQ FOUND_VAR ZeroMQ_FOUND REQUIRED_VARS ZeroMQ_LIBRARY ZeroMQ_INCLUDE_DIR + ZeroMQ_CPP_INCLUDE_DIR ZeroMQ_CPP_VERSION) + +set(ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY}) +set(ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} ${ZeroMQ_CPP_INCLUDE_DIR}) +set(ZeroMQ_FOUND ${ZeroMQ_FOUND}) diff --git a/testing/btest/Baseline/cluster.zeromq.logging/cluster.log.normalized b/testing/btest/Baseline/cluster.zeromq.logging/cluster.log.normalized new file mode 100644 index 0000000000..fdebaf2784 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.logging/cluster.log.normalized @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger got hello from manager (zeromq_manager___NrFj3eGxkRR5) +logger got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +logger got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +logger got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +manager got hello from logger (zeromq_logger___NrFj3eGxkRR5) +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from logger (zeromq_logger___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/Baseline/cluster.zeromq.logging/manager.out b/testing/btest/Baseline/cluster.zeromq.logging/manager.out new file mode 100644 index 0000000000..9c5b6173f4 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.logging/manager.out @@ -0,0 +1,16 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A zeek_init, manager +B node_up, logger +B node_up, proxy +B node_up, worker-1 +B node_up, worker-2 +B nodes_up, 2 +B nodes_up, 3 +B nodes_up, 4 +B nodes_up, 5 +C send_finish +D node_down, logger +D node_down, proxy +D node_down, worker-1 +D node_down, worker-2 +D send_finish to logger diff --git a/testing/btest/Baseline/cluster.zeromq.logging/node_up.sorted b/testing/btest/Baseline/cluster.zeromq.logging/node_up.sorted new file mode 100644 index 0000000000..1351c3ed4a --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.logging/node_up.sorted @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger manager +logger proxy +logger worker-1 +logger worker-2 +manager logger +manager proxy +manager worker-1 +manager worker-2 +proxy logger +proxy manager +proxy worker-1 +proxy worker-2 +worker-1 logger +worker-1 manager +worker-1 proxy +worker-1 worker-2 +worker-2 logger +worker-2 manager +worker-2 proxy +worker-2 worker-1 diff --git a/testing/btest/Baseline/cluster.zeromq.manager-is-logger/cluster.log.normalized b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/cluster.log.normalized new file mode 100644 index 0000000000..4cb1e8ebc0 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/cluster.log.normalized @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/Baseline/cluster.zeromq.manager-is-logger/manager.out b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/manager.out new file mode 100644 index 0000000000..df8b56a0eb --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/manager.out @@ -0,0 +1,11 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A node_up, proxy +A node_up, worker-1 +A node_up, worker-2 +B nodes_up, 2 +B nodes_up, 3 +B nodes_up, 4 +D node_down, proxy +D node_down, worker-1 +D node_down, worker-2 +zeek_init, manager diff --git a/testing/btest/Baseline/cluster.zeromq.manager-is-logger/node_up.sorted b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/node_up.sorted new file mode 100644 index 0000000000..57fce89d58 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.manager-is-logger/node_up.sorted @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +manager proxy +manager worker-1 +manager worker-2 +proxy manager +proxy worker-1 +proxy worker-2 +worker-1 manager +worker-1 proxy +worker-1 worker-2 +worker-2 manager +worker-2 proxy +worker-2 worker-1 diff --git a/testing/btest/Baseline/cluster.zeromq.supervisor/supervisor.cluster.log b/testing/btest/Baseline/cluster.zeromq.supervisor/supervisor.cluster.log new file mode 100644 index 0000000000..fdebaf2784 --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.supervisor/supervisor.cluster.log @@ -0,0 +1,21 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +logger got hello from manager (zeromq_manager___NrFj3eGxkRR5) +logger got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +logger got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +logger got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +manager got hello from logger (zeromq_logger___NrFj3eGxkRR5) +manager got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +manager got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +manager got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +proxy got hello from logger (zeromq_logger___NrFj3eGxkRR5) +proxy got hello from manager (zeromq_manager___NrFj3eGxkRR5) +proxy got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) +proxy got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-1 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-1 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-1 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-1 got hello from worker-2 (zeromq_worker-2___NrFj3eGxkRR5) +worker-2 got hello from logger (zeromq_logger___NrFj3eGxkRR5) +worker-2 got hello from manager (zeromq_manager___NrFj3eGxkRR5) +worker-2 got hello from proxy (zeromq_proxy___NrFj3eGxkRR5) +worker-2 got hello from worker-1 (zeromq_worker-1___NrFj3eGxkRR5) diff --git a/testing/btest/Baseline/cluster.zeromq.two-nodes/..manager.out b/testing/btest/Baseline/cluster.zeromq.two-nodes/..manager.out new file mode 100644 index 0000000000..7e67339b4d --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.two-nodes/..manager.out @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, worker-1 +node_down, worker-1 diff --git a/testing/btest/Baseline/cluster.zeromq.two-nodes/..worker.out b/testing/btest/Baseline/cluster.zeromq.two-nodes/..worker.out new file mode 100644 index 0000000000..386f8ae30f --- /dev/null +++ b/testing/btest/Baseline/cluster.zeromq.two-nodes/..worker.out @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +node_up, manager diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 443d35b00e..ad6adb5cb9 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek + build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 8a23826c17..353e066690 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek + build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek diff --git a/testing/btest/Baseline/coverage.test-all-policy-cluster/.stderr b/testing/btest/Baseline/coverage.test-all-policy-cluster/.stderr index bff9a64e41..49d861c74c 100644 --- a/testing/btest/Baseline/coverage.test-all-policy-cluster/.stderr +++ b/testing/btest/Baseline/coverage.test-all-policy-cluster/.stderr @@ -1,5 +1 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -received termination signal -received termination signal -received termination signal -received termination signal diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index 9eadabd9ad..2722d1e90b 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -337,6 +337,7 @@ 0.000000 MetaHookPost LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> -1 +0.000000 MetaHookPost LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> -1 0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> -1 @@ -643,6 +644,7 @@ 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> (-1, ) +0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> (-1, ) 0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> (-1, ) @@ -1281,6 +1283,7 @@ 0.000000 MetaHookPre LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) +0.000000 MetaHookPre LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) 0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) @@ -1587,6 +1590,7 @@ 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) +0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) 0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) @@ -2224,6 +2228,7 @@ 0.000000 | HookLoadFile ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek 0.000000 | HookLoadFile ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek 0.000000 | HookLoadFile ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek +0.000000 | HookLoadFile ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek 0.000000 | HookLoadFile ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek @@ -2530,6 +2535,7 @@ 0.000000 | HookLoadFileExtended ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek +0.000000 | HookLoadFileExtended ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek 0.000000 | HookLoadFileExtended ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek diff --git a/testing/btest/Files/zeromq/cluster-layout-no-logger.zeek b/testing/btest/Files/zeromq/cluster-layout-no-logger.zeek new file mode 100644 index 0000000000..23baf76a2a --- /dev/null +++ b/testing/btest/Files/zeromq/cluster-layout-no-logger.zeek @@ -0,0 +1,8 @@ +redef Cluster::manager_is_logger = T; + +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))], + ["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], +}; diff --git a/testing/btest/Files/zeromq/cluster-layout-simple.zeek b/testing/btest/Files/zeromq/cluster-layout-simple.zeek new file mode 100644 index 0000000000..be99599819 --- /dev/null +++ b/testing/btest/Files/zeromq/cluster-layout-simple.zeek @@ -0,0 +1,9 @@ +redef Cluster::manager_is_logger = F; + +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], + ["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))], + ["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], +}; diff --git a/testing/btest/Files/zeromq/cluster-layout-two-loggers.zeek b/testing/btest/Files/zeromq/cluster-layout-two-loggers.zeek new file mode 100644 index 0000000000..19e6942774 --- /dev/null +++ b/testing/btest/Files/zeromq/cluster-layout-two-loggers.zeek @@ -0,0 +1,10 @@ +redef Cluster::manager_is_logger = F; + +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1], + ["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_1"))], + ["logger-2"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_2"))], + ["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1], + ["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], + ["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1], +}; diff --git a/testing/btest/Files/zeromq/test-bootstrap.zeek b/testing/btest/Files/zeromq/test-bootstrap.zeek new file mode 100644 index 0000000000..116a60ea0e --- /dev/null +++ b/testing/btest/Files/zeromq/test-bootstrap.zeek @@ -0,0 +1,11 @@ +# Helper scripts for test expecting XPUB/XSUB ports allocated by +# btest and configuring the ZeroMQ globals. +@load base/utils/numbers + +@load frameworks/cluster/backend/zeromq +@load frameworks/cluster/backend/zeromq/connect + +redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT"))); +redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT"))); +redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT"))); +redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT"))); diff --git a/testing/btest/cluster/zeromq/logging.zeek b/testing/btest/cluster/zeromq/logging.zeek new file mode 100644 index 0000000000..377cf5da90 --- /dev/null +++ b/testing/btest/cluster/zeromq/logging.zeek @@ -0,0 +1,139 @@ +# @TEST-DOC: Startup a ZeroMQ cluster by hand, testing basic logging and node_up and node_down events. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run logger "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=logger zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff cluster.log.normalized +# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted +# @TEST-EXEC: btest-diff node_up.sorted +# @TEST-EXEC: sort manager/out > manager.out +# @TEST-EXEC: btest-diff manager.out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +redef Log::default_rotation_interval = 0sec; +redef Log::flush_interval = 0.01sec; + +type Info: record { + self: string &log &default=Cluster::node; + node: string &log; +}; + +redef enum Log::ID += { TEST_LOG }; + +global finish: event(name: string) &is_used; + +event zeek_init() { + print "A zeek_init", Cluster::node; + Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]); +} + +event Cluster::node_up(name: string, id: string) &priority=-5 { + print "B node_up", name; + Log::write(TEST_LOG, [$node=name]); + # Log::flush(TEST_LOG); + # Log::flush(Cluster::LOG); +} +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = {"manager"}; +global nodes_down: set[string] = {"manager"}; + +event send_finish() { + print "C send_finish"; + for ( n in nodes_up ) + if ( n != "logger" ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); +} + +event check_cluster_log() { + if ( file_size("DONE") >= 0 ) { + event send_finish(); + return; + } + + system("../check-cluster-log.sh"); + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() { + schedule 0.1sec { check_cluster_log() }; +} + +event Cluster::node_up(name: string, id: string) &priority=-1 { + add nodes_up[name]; + print "B nodes_up", |nodes_up|; +} + +event Cluster::node_down(name: string, id: string) { + print "D node_down", name; + add nodes_down[name]; + + if ( |nodes_down| == |Cluster::nodes| - 1 ) { + print "D send_finish to logger"; + Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node); + } + if ( |nodes_down| == |Cluster::nodes| ) + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +event finish(name: string) { + print fmt("finish from %s", name); + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks logger/cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the testing directory for baselining. +CLUSTER_LOG=../logger/cluster.log + +if [ ! -f $CLUSTER_LOG ]; then + echo "$CLUSTER_LOG not found!" >&2 + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log.tmp + +# 4 times 5 +if [ $(wc -l < cluster.log.tmp) = 20 ]; then + echo "DONE!" >&2 + mv cluster.log.tmp ../cluster.log.normalized + echo "DONE" > DONE +fi + +exit 0 +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/manager-is-logger.zeek b/testing/btest/cluster/zeromq/manager-is-logger.zeek new file mode 100644 index 0000000000..c01a1e8f73 --- /dev/null +++ b/testing/btest/cluster/zeromq/manager-is-logger.zeek @@ -0,0 +1,129 @@ +# @TEST-DOC: Startup a ZeroMQ cluster without a logger, testing logging through the manager. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out" +# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff cluster.log.normalized +# @TEST-EXEC: zeek-cut -F ' ' < ./manager/node_up.log | sort > node_up.sorted +# @TEST-EXEC: btest-diff node_up.sorted +# @TEST-EXEC: sort manager/out > manager.out +# @TEST-EXEC: btest-diff manager.out + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +redef Log::default_rotation_interval = 0sec; +redef Log::flush_interval = 0.01sec; + +type Info: record { + self: string &log &default=Cluster::node; + node: string &log; +}; + +redef enum Log::ID += { TEST_LOG }; + +global finish: event(name: string) &is_used; + +event zeek_init() { + print "zeek_init", Cluster::node; + Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]); +} + +event Cluster::node_up(name: string, id: string) { + print "A node_up", name; + Log::write(TEST_LOG, [$node=name]); +} +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +global nodes_up: set[string] = {"manager"}; +global nodes_down: set[string] = {"manager"}; + +event send_finish() { + for ( n in nodes_up ) + Cluster::publish(Cluster::node_topic(n), finish, Cluster::node); +} + +event check_cluster_log() { + if ( file_size("DONE") >= 0 ) { + event send_finish(); + return; + } + + system("../check-cluster-log.sh"); + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() { + schedule 0.1sec { check_cluster_log() }; +} + +event Cluster::node_up(name: string, id: string) { + add nodes_up[name]; + print "B nodes_up", |nodes_up|; +} + +event Cluster::node_down(name: string, id: string) { + print "D node_down", name; + add nodes_down[name]; + if ( |nodes_down| == |Cluster::nodes| ) + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE other.zeek +@load ./common.zeek + +event finish(name: string) { + print fmt("finish from %s", name); + terminate(); +} +# @TEST-END-FILE +# +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the testing directory for baselining. +CLUSTER_LOG=cluster.log + +if [ ! -f $CLUSTER_LOG ]; then + echo "$CLUSTER_LOG not found!" >&2 + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log.tmp + +# 4 times 3 +if [ $(wc -l < cluster.log.tmp) = 12 ]; then + echo "DONE!" >&2 + mv cluster.log.tmp ../cluster.log.normalized + echo "DONE" > DONE +fi + +exit 0 +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/supervisor.zeek b/testing/btest/cluster/zeromq/supervisor.zeek new file mode 100644 index 0000000000..62574e9867 --- /dev/null +++ b/testing/btest/cluster/zeromq/supervisor.zeek @@ -0,0 +1,86 @@ +# @TEST-DOC: Configure a ZeroMQ cluster with Zeek's supervisor. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT + +# @TEST-EXEC: chmod +x ./check-cluster-log.sh +# +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run supervisor "ZEEKPATH=$ZEEKPATH:.. && zeek -j ../supervisor.zeek >out" +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff supervisor/cluster.log + +redef Log::default_rotation_interval = 0secs; +redef Log::flush_interval = 0.01sec; + +@if ( ! Supervisor::is_supervisor() ) +@load ./zeromq-test-bootstrap +@else + +# The supervisor peeks into logger/cluster.log to initate a shutdown when +# all nodes have said hello to each other. See the check-cluster.log.sh +# script below. +event check_cluster_log() { + system_env("../check-cluster-log.sh", table(["SUPERVISOR_PID"] = cat(getpid()))); + + schedule 0.1sec { check_cluster_log() }; +} + +event zeek_init() + { + if ( ! Supervisor::is_supervisor() ) + return; + + Broker::listen("127.0.0.1", 9999/tcp); + + local cluster: table[string] of Supervisor::ClusterEndpoint; + cluster["manager"] = [$role=Supervisor::MANAGER, $host=127.0.0.1, $p=0/unknown]; + cluster["logger"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))]; + cluster["proxy"] = [$role=Supervisor::PROXY, $host=127.0.0.1, $p=0/unknown]; + cluster["worker-1"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown]; + cluster["worker-2"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown]; + + for ( n, ep in cluster ) + { + local sn = Supervisor::NodeConfig($name=n, $bare_mode=T, $cluster=cluster, $directory=n); + local res = Supervisor::create(sn); + + if ( res != "" ) + print fmt("supervisor failed to create node '%s': %s", n, res); + } + + # Start polling the cluster.log + event check_cluster_log(); + } +@endif + +# @TEST-START-FILE check-cluster-log.sh +#!/bin/sh +# +# This script checks logger/cluster.log until the expected number +# of log entries have been observed and puts a normalized version +# into the current directory. This runs from the supervisor. +if [ ! -f logger/cluster.log ]; then + exit 1; +fi + +if [ -f DONE ]; then + exit 0 +fi + +# Remove hostname and pid from node id in message. +zeek-cut node message < logger/cluster.log | sed -r 's/_[^_]+_[0-9]+_/___/g' | sort > cluster.log + +if [ $(wc -l < cluster.log) = 20 ]; then + echo "DONE!" >&2 + # Trigger shutdown through supervisor. + kill ${ZEEK_ARG_SUPERVISOR_PID}; + echo "DONE" > DONE +fi +# @TEST-END-FILE diff --git a/testing/btest/cluster/zeromq/two-nodes.zeek b/testing/btest/cluster/zeromq/two-nodes.zeek new file mode 100644 index 0000000000..2fd01d7257 --- /dev/null +++ b/testing/btest/cluster/zeromq/two-nodes.zeek @@ -0,0 +1,53 @@ +# @TEST-DOC: Startup a manager running the ZeroMQ proxy thread, a worker connects and the manager sends a finish event to terminate the worker. +# +# @TEST-REQUIRES: have-zeromq +# +# @TEST-GROUP: cluster-zeromq +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./worker/out + + +# @TEST-START-FILE common.zeek +@load ./zeromq-test-bootstrap + +global finish: event(name: string); +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek +# If a node comes up that isn't us, send it a finish event. +event Cluster::node_up(name: string, id: string) { + print "node_up", name; + Cluster::publish(Cluster::nodeid_topic(id), finish, Cluster::node); +} + +# If the worker vanishes, finish the test. +event Cluster::node_down(name: string, id: string) { + print "node_down", name; + terminate(); +} +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event Cluster::node_up(name: string, id: string) { + print "node_up", name; +} + +event finish(name: string) &is_used { + terminate(); +} +# @TEST-END-FILE diff --git a/testing/scripts/have-zeromq b/testing/scripts/have-zeromq new file mode 100755 index 0000000000..4f52c32113 --- /dev/null +++ b/testing/scripts/have-zeromq @@ -0,0 +1,4 @@ +#!/bin/sh + +zeek -N Zeek::Cluster_Backend_ZeroMQ >/dev/null +exit $?