mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/backend/zeromq: Add ZeroMQ based cluster backend
This is a cluster backend implementation using a central XPUB/XSUB proxy that by default runs on the manager node. Logging is implemented leveraging PUSH/PULL sockets between logger and other nodes, rather than going through XPUB/XSUB. The test-all-policy-cluster baseline changed: Previously, Broker::peer() would be called from setup-connections.zeek, causing the IO loop to be alive. With the ZeroMQ backend, the IO loop is only alive when Cluster::init() is called, but that doesn't happen anymore.
This commit is contained in:
parent
889c7d888a
commit
35c79ab2e3
38 changed files with 1948 additions and 4 deletions
|
@ -0,0 +1 @@
|
|||
@load ./main.zeek
|
|
@ -0,0 +1,14 @@
|
|||
##! Establish ZeroMQ connectivity with the broker.
|
||||
|
||||
@load ./main
|
||||
|
||||
module Cluster::Backend::ZeroMQ;
|
||||
|
||||
|
||||
event zeek_init() &priority=10
|
||||
{
|
||||
if ( run_proxy_thread )
|
||||
Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread();
|
||||
|
||||
Cluster::init();
|
||||
}
|
424
scripts/policy/frameworks/cluster/backend/zeromq/main.zeek
Normal file
424
scripts/policy/frameworks/cluster/backend/zeromq/main.zeek
Normal file
|
@ -0,0 +1,424 @@
|
|||
##! ZeroMQ cluster backend support.
|
||||
##!
|
||||
##! For publish-subscribe functionality, one node in the Zeek cluster spawns a
|
||||
##! thread running a central broker listening on a XPUB and XSUB socket.
|
||||
##! These sockets are connected via `zmq_proxy() <https://libzmq.readthedocs.io/en/latest/zmq_proxy.html>`_.
|
||||
##! All other nodes connect to this central broker with their own XSUB and
|
||||
##! XPUB sockets, establishing a global many-to-many publish-subscribe system
|
||||
##! where each node sees subscriptions and messages from all other nodes in a
|
||||
##! Zeek cluster. ZeroMQ's `publish-subscribe pattern <http://api.zeromq.org/4-2:zmq-socket#toc9>`_
|
||||
##! documentation may be a good starting point. Elsewhere in ZeroMQ's documentation,
|
||||
##! the central broker is also called `forwarder <http://api.zeromq.org/4-2:zmq-proxy#toc5>`_.
|
||||
##!
|
||||
##! For remote logging functionality, the ZeroMQ `pipeline pattern <http://api.zeromq.org/4-2:zmq-socket#toc14>`_
|
||||
##! is used. All logger nodes listen on a PULL socket. Other nodes connect
|
||||
##! via PUSH sockets to all of the loggers. Concretely, remote logging
|
||||
##! functionality is not publish-subscribe, but instead leverages ZeroMQ's
|
||||
##! built-in load-balancing functionality provided by PUSH and PULL
|
||||
##! sockets.
|
||||
##!
|
||||
##! The ZeroMQ cluster backend technically allows to run a non-Zeek central
|
||||
##! broker (it only needs to offer XPUB and XSUB sockets). Further, it is
|
||||
##! possible to run non-Zeek logger nodes. All a logger node needs to do is
|
||||
##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes
|
||||
##! to send their log writes.
|
||||
module Cluster::Backend::ZeroMQ;
|
||||
|
||||
export {
|
||||
## The central broker's XPUB endpoint to connect to.
|
||||
##
|
||||
## A node connects with its XSUB socket to the XPUB socket
|
||||
## of the central broker.
|
||||
const connect_xpub_endpoint = "tcp://127.0.0.1:5556" &redef;
|
||||
|
||||
|
||||
## The central broker's XSUB endpoint to connect to.
|
||||
##
|
||||
## A node connects with its XPUB socket to the XSUB socket
|
||||
## of the central broker.
|
||||
const connect_xsub_endpoint = "tcp://127.0.0.1:5555" &redef;
|
||||
|
||||
## Vector of ZeroMQ endpoints to connect to for logging.
|
||||
##
|
||||
## A node's PUSH socket used for logging connects to each
|
||||
## of the ZeroMQ endpoints listed in this vector.
|
||||
const connect_log_endpoints: vector of string &redef;
|
||||
|
||||
## Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.
|
||||
##
|
||||
## If set to ``T``, :zeek:see:`Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread`
|
||||
## is called during :zeek:see:`zeek_init`. The node will listen
|
||||
## on :zeek:see:`Cluster::Backend::ZeroMQ::listen_xsub_endpoint` and
|
||||
## :zeek:see:`Cluster::Backend::ZeroMQ::listen_xpub_endpoint` and
|
||||
## forward subscriptions and messages between nodes.
|
||||
##
|
||||
## By default, this is set to ``T`` on the manager and ``F`` elsewhere.
|
||||
const run_proxy_thread: bool = F &redef;
|
||||
|
||||
## XSUB listen endpoint for the central broker.
|
||||
##
|
||||
## This setting is used for the XSUB socket of the central broker started
|
||||
## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
|
||||
const listen_xsub_endpoint = "tcp://127.0.0.1:5556" &redef;
|
||||
|
||||
## XPUB listen endpoint for the central broker.
|
||||
##
|
||||
## This setting is used for the XPUB socket of the central broker started
|
||||
## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
|
||||
const listen_xpub_endpoint = "tcp://127.0.0.1:5555" &redef;
|
||||
|
||||
## PULL socket address to listen on for log messages.
|
||||
##
|
||||
## If empty, don't listen for log messages, otherwise
|
||||
## a ZeroMQ address to bind to. E.g., ``tcp://127.0.0.1:5555``.
|
||||
const listen_log_endpoint = "" &redef;
|
||||
|
||||
## Configure the ZeroMQ's sockets linger value.
|
||||
##
|
||||
## The default used by libzmq is 30 seconds (30 000) which is very long
|
||||
## when loggers vanish before workers during a shutdown, so we reduce
|
||||
## this to 500 milliseconds by default.
|
||||
##
|
||||
## A value of ``-1`` configures blocking forever, while ``0`` would
|
||||
## immediately discard any pending messages.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_LINGER documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc24>`_
|
||||
## for more details.
|
||||
const linger_ms: int = 500 &redef;
|
||||
|
||||
## Configure ZeroMQ's immedidate setting on PUSH sockets
|
||||
##
|
||||
## Setting this to ``T`` will queue log writes only to completed
|
||||
## connections. By default, log writes are queued to all potential
|
||||
## endpoints listed in :zeek:see:`Cluster::Backend::ZeroMQ::connect_log_endpoints`.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_IMMEDIATE documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc21>`_
|
||||
## for more details.
|
||||
const log_immediate: bool = F &redef;
|
||||
|
||||
## Send high water mark value for the log PUSH sockets.
|
||||
##
|
||||
## If reached, Zeek nodes will block or drop messages.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_SNDHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc46>`_
|
||||
## for more details.
|
||||
##
|
||||
## TODO: Make action configurable (block vs drop)
|
||||
const log_sndhwm: int = 1000 &redef;
|
||||
|
||||
## Receive high water mark value for the log PULL sockets.
|
||||
##
|
||||
## If reached, Zeek workers will block or drop messages.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_RCVHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc35>`_
|
||||
## for more details.
|
||||
##
|
||||
## TODO: Make action configurable (block vs drop)
|
||||
const log_rcvhwm: int = 1000 &redef;
|
||||
|
||||
## Kernel transmit buffer size for log sockets.
|
||||
##
|
||||
## Using -1 will use the kernel's default.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_SNDBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc45>`_.
|
||||
const log_sndbuf: int = -1 &redef;
|
||||
|
||||
## Kernel receive buffer size for log sockets.
|
||||
##
|
||||
## Using -1 will use the kernel's default.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_RCVBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc34>`_
|
||||
## for more details.
|
||||
const log_rcvbuf: int = -1 &redef;
|
||||
|
||||
## Do not silently drop messages if high-water-mark is reached.
|
||||
##
|
||||
## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket
|
||||
## to detect when sending a message fails due to reaching
|
||||
## the high-water-mark.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_XPUB_NODROP documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc61>`_
|
||||
## for more details.
|
||||
const xpub_nodrop: bool = T &redef;
|
||||
|
||||
## Do not silently drop messages if high-water-mark is reached.
|
||||
##
|
||||
## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket
|
||||
## to detect when sending a message fails due to reaching
|
||||
## the high-water-mark.
|
||||
##
|
||||
## This setting applies to the XPUB/XSUB broker started when
|
||||
## :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
|
||||
##
|
||||
## See ZeroMQ's `ZMQ_XPUB_NODROP documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc61>`_
|
||||
## for more details.
|
||||
const listen_xpub_nodrop: bool = T &redef;
|
||||
|
||||
## Messages to receive before yielding.
|
||||
##
|
||||
## Yield from the receive loop when this many messages have been
|
||||
## received from one of the used sockets.
|
||||
const poll_max_messages = 100 &redef;
|
||||
|
||||
## Bitmask to enable low-level stderr based debug printing.
|
||||
##
|
||||
## poll debugging: 1 (produce verbose zmq::poll() output)
|
||||
##
|
||||
## Or values from the above list together and set debug_flags
|
||||
## to the result. E.g. use 7 to select 4, 2 and 1. Only use this
|
||||
## in development if something seems off. The thread used internally
|
||||
## will produce output on stderr.
|
||||
const debug_flags: count = 0 &redef;
|
||||
|
||||
## The node topic prefix to use.
|
||||
global node_topic_prefix = "zeek.cluster.node" &redef;
|
||||
|
||||
## The node_id topic prefix to use.
|
||||
global nodeid_topic_prefix = "zeek.cluster.nodeid" &redef;
|
||||
|
||||
## Low-level event when a subscription is added.
|
||||
##
|
||||
## Every node observes all subscriptions from other nodes
|
||||
## in a cluster through its XPUB socket. Whenever a new
|
||||
## subscription topic is added, this event is raised with
|
||||
## the topic.
|
||||
##
|
||||
## topic: The topic.
|
||||
global subscription: event(topic: string);
|
||||
|
||||
## Low-level event when a subscription vanishes.
|
||||
##
|
||||
## Every node observes all subscriptions from other nodes
|
||||
## in a cluster through its XPUB socket. Whenever a subscription
|
||||
## is removed from the local XPUB socket, this event is raised
|
||||
## with the topic set to the removed subscription.
|
||||
##
|
||||
## topic: The topic.
|
||||
global unsubscription: event(topic: string);
|
||||
|
||||
## Low-level event send to a node in response to their subscription.
|
||||
##
|
||||
## name: The sending node's name in :zeek:see:`Cluster::nodes`.
|
||||
##
|
||||
## id: The sending node's identifier, as generated by :zeek:see:`Cluster::node_id`.
|
||||
global hello: event(name: string, id: string);
|
||||
|
||||
## Expiration for hello state.
|
||||
##
|
||||
## How long to wait before expiring information about
|
||||
## subscriptions and hello messages from other
|
||||
## nodes. These expirations trigger reporter warnings.
|
||||
const hello_expiration: interval = 10sec &redef;
|
||||
}
|
||||
|
||||
redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;
|
||||
|
||||
# By default, let the manager node run the proxy thread.
|
||||
redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER;
|
||||
|
||||
function zeromq_node_topic(name: string): string {
|
||||
return node_topic_prefix + "." + name;
|
||||
}
|
||||
|
||||
function zeromq_nodeid_topic(id: string): string {
|
||||
return nodeid_topic_prefix + "." + id;
|
||||
}
|
||||
|
||||
# Unique identifier for this node with some debug information.
|
||||
const my_node_id = fmt("zeromq_%s_%s_%s_%s", Cluster::node, gethostname(), getpid(), unique_id("N"));
|
||||
|
||||
function zeromq_node_id(): string {
|
||||
return my_node_id;
|
||||
}
|
||||
|
||||
redef Cluster::node_topic = zeromq_node_topic;
|
||||
redef Cluster::nodeid_topic = zeromq_nodeid_topic;
|
||||
redef Cluster::node_id = zeromq_node_id;
|
||||
|
||||
redef Cluster::logger_topic = "zeek.cluster.logger";
|
||||
redef Cluster::manager_topic = "zeek.cluster.manager";
|
||||
redef Cluster::proxy_topic = "zeek.cluster.proxy";
|
||||
redef Cluster::worker_topic = "zeek.cluster.worker";
|
||||
|
||||
redef Cluster::proxy_pool_spec = Cluster::PoolSpec(
|
||||
$topic = "zeek.cluster.pool.proxy",
|
||||
$node_type = Cluster::PROXY);
|
||||
|
||||
redef Cluster::logger_pool_spec = Cluster::PoolSpec(
|
||||
$topic = "zeek.cluster.pool.logger",
|
||||
$node_type = Cluster::LOGGER);
|
||||
|
||||
redef Cluster::worker_pool_spec = Cluster::PoolSpec(
|
||||
$topic = "zeek.cluster.pool.worker",
|
||||
$node_type = Cluster::WORKER);
|
||||
|
||||
|
||||
# Configure listen_log_endpoint based on port in cluster-layout, if any.
|
||||
@if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) )
|
||||
const my_node = Cluster::nodes[Cluster::node];
|
||||
@if ( my_node?$p )
|
||||
redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p));
|
||||
@endif
|
||||
@endif
|
||||
|
||||
# Populate connect_log_endpoints based on Cluster::nodes on non-logger nodes.
|
||||
# If you're experimenting with zero-logger clusters, ignore this code and set
|
||||
# connect_log_endpoints yourself via redef.
|
||||
event zeek_init() &priority=100
|
||||
{
|
||||
if ( Cluster::local_node_type() == Cluster::LOGGER )
|
||||
return;
|
||||
|
||||
if ( Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER )
|
||||
return;
|
||||
|
||||
for ( _, node in Cluster::nodes )
|
||||
{
|
||||
local endp: string;
|
||||
if ( node$node_type == Cluster::LOGGER && node?$p )
|
||||
{
|
||||
endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p));
|
||||
connect_log_endpoints += endp;
|
||||
}
|
||||
|
||||
if ( Cluster::manager_is_logger && node$node_type == Cluster::MANAGER && node?$p )
|
||||
{
|
||||
endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p));
|
||||
connect_log_endpoints += endp;
|
||||
}
|
||||
}
|
||||
|
||||
# If there's no endpoints configured, but more than a single
|
||||
# node in cluster layout, log an error as that's probably not
|
||||
# an intended configuration.
|
||||
if ( |connect_log_endpoints| == 0 && |Cluster::nodes| > 1 )
|
||||
Reporter::error("No ZeroMQ connect_log_endpoints configured");
|
||||
}
|
||||
|
||||
function nodeid_subscription_expired(nodeids: set[string], nodeid: string): interval
|
||||
{
|
||||
Reporter::warning(fmt("Expired subscription from nodeid %s", nodeid));
|
||||
return 0.0sec;
|
||||
}
|
||||
|
||||
function nodeid_hello_expired(nodeids: set[string], nodeid: string): interval
|
||||
{
|
||||
Reporter::warning(fmt("Expired hello from nodeid %s", nodeid));
|
||||
return 0.0sec;
|
||||
}
|
||||
|
||||
# State about subscriptions and hellos seen from other nodes.
|
||||
global nodeid_subscriptions: set[string] &create_expire=hello_expiration &expire_func=nodeid_subscription_expired;
|
||||
global nodeid_hellos: set[string] &create_expire=hello_expiration &expire_func=nodeid_hello_expired;
|
||||
|
||||
# The ZeroMQ plugin notifies script land when a new subscription arrived
|
||||
# on that node's XPUB socket. If the topic of such a subscription starts with
|
||||
# the nodeid_topic_prefix for another node A, node B seeing the subscription
|
||||
# sends ZeroMQ::hello() to the topic, announcing its own presence to node A.
|
||||
# Conversely, when node A sees the subscription for node B's nodeid topic,
|
||||
# it also sens ZeroMQ::hello(). In other words, every node says hello to all
|
||||
# other nodes based on subscriptions they observe on their local XPUB sockets.
|
||||
#
|
||||
# Once node B has seen both, the nodeid topic subscription and ZeroMQ::hello()
|
||||
# event from node A, it raises a Cluster::node_up() event for node A.
|
||||
#
|
||||
# See also the Cluster::Backend::ZeroMQ::hello() handler below.
|
||||
#
|
||||
# 1) node A subscribes to Cluster::nodeid_topic(Cluster::node_id())
|
||||
# 2) node B observes subscription for node A's nodeid_topic and replies with ZeroMQ::hello()
|
||||
# 3) node A receives node B's nodeid_topic subscription, replies with ZeroMQ::hello()
|
||||
# 4) node B receives node A's ZeroMQ::hello() and raises Cluster::node_up()
|
||||
# as it has already seen node A's nodeid_topic subscription.
|
||||
event Cluster::Backend::ZeroMQ::subscription(topic: string)
|
||||
{
|
||||
local prefix = nodeid_topic_prefix + ".";
|
||||
|
||||
if ( ! starts_with(topic, prefix) )
|
||||
return;
|
||||
|
||||
local nodeid = topic[|prefix|:];
|
||||
|
||||
# Do not say hello to ourselves - we won't see it anyhow.
|
||||
if ( nodeid == Cluster::node_id() )
|
||||
return;
|
||||
|
||||
Cluster::publish(topic, Cluster::Backend::ZeroMQ::hello, Cluster::node, Cluster::node_id());
|
||||
|
||||
# If we saw a ZeroMQ::hello from the other node already, send
|
||||
# it a Cluster::hello.
|
||||
if ( nodeid in nodeid_hellos )
|
||||
{
|
||||
Cluster::publish(Cluster::nodeid_topic(nodeid), Cluster::hello, Cluster::node, Cluster::node_id());
|
||||
delete nodeid_hellos[nodeid];
|
||||
}
|
||||
else
|
||||
{
|
||||
add nodeid_subscriptions[nodeid];
|
||||
}
|
||||
}
|
||||
|
||||
# Receiving ZeroMQ::hello() from another node: If we received a subscription
|
||||
# for the node's nodeid_topic, reply with a Cluster::hello. If the node never
|
||||
# properly went away, log a warning and raise a Cluster::node_down() now.
|
||||
event Cluster::Backend::ZeroMQ::hello(name: string, id: string)
|
||||
{
|
||||
if ( name in Cluster::nodes )
|
||||
{
|
||||
local n = Cluster::nodes[name];
|
||||
if ( n?$id )
|
||||
{
|
||||
if ( n$id == id )
|
||||
{
|
||||
# Duplicate ZeroMQ::hello(), very strange, ignore it.
|
||||
Reporter::warning(fmt("node '%s' sends ZeroMQ::hello twice (id:%s)",
|
||||
name, id));
|
||||
return;
|
||||
}
|
||||
|
||||
Reporter::warning(fmt("node '%s' never said goodbye (old id:%s new id:%s",
|
||||
name, n$id, id));
|
||||
|
||||
# We raise node_down() here for the old instance,
|
||||
# but it's obviously fake and somewhat lying.
|
||||
event Cluster::node_down(name, n$id);
|
||||
}
|
||||
}
|
||||
|
||||
# It is possible to publish Cluster::hello() directly if the nodeid_topic
|
||||
# subscription for the other node was already seen. Otherwise, remember
|
||||
# that Cluster::hello() has been seen and send Cluster::hello() in
|
||||
# subscription processing further up.
|
||||
if ( id in nodeid_subscriptions )
|
||||
{
|
||||
Cluster::publish(Cluster::nodeid_topic(id), Cluster::hello, Cluster::node, Cluster::node_id());
|
||||
delete nodeid_subscriptions[id];
|
||||
}
|
||||
else
|
||||
{
|
||||
add nodeid_hellos[id];
|
||||
}
|
||||
}
|
||||
|
||||
# If the unsubscription is for a nodeid prefix, extract the
|
||||
# nodeid that's gone, find the name of the node from the
|
||||
# cluster layout and raise Cluster::node_down().
|
||||
event Cluster::Backend::ZeroMQ::unsubscription(topic: string)
|
||||
{
|
||||
local prefix = nodeid_topic_prefix + ".";
|
||||
if ( ! starts_with(topic, prefix) )
|
||||
return;
|
||||
|
||||
local gone_node_id = topic[|prefix|:];
|
||||
local name = "";
|
||||
for ( node_name, n in Cluster::nodes ) {
|
||||
if ( n?$id && n$id == gone_node_id ) {
|
||||
name = node_name;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ( name != "" )
|
||||
event Cluster::node_down(name, gone_node_id);
|
||||
else
|
||||
Reporter::warning(fmt("unsubscription of unknown node with id '%s'", gone_node_id));
|
||||
}
|
|
@ -11,6 +11,9 @@
|
|||
|
||||
# @load frameworks/control/controllee.zeek
|
||||
# @load frameworks/control/controller.zeek
|
||||
@load frameworks/cluster/backend/zeromq/__load__.zeek
|
||||
# @load frameworks/cluster/backend/zeromq/connect.zeek
|
||||
@load frameworks/cluster/backend/zeromq/main.zeek
|
||||
@load frameworks/cluster/experimental.zeek
|
||||
# Loaded via the above through test-all-policy-cluster.test
|
||||
# when running as a manager, creates cluster.log entries
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
# Scripts which are commented out in test-all-policy.zeek.
|
||||
@load protocols/ssl/decryption.zeek
|
||||
@load frameworks/cluster/backend/zeromq/connect.zeek
|
||||
@load frameworks/cluster/nodes-experimental/manager.zeek
|
||||
@load frameworks/control/controllee.zeek
|
||||
@load frameworks/control/controller.zeek
|
||||
|
@ -28,6 +29,7 @@ event zeek_init() &priority=1000
|
|||
# fail when run under zeekygen. For the purpose of zeekygen, we could
|
||||
# probably disable all modules, too.
|
||||
disable_module_events("Control");
|
||||
disable_module_events("Cluster::Backend::ZeroMQ");
|
||||
disable_module_events("Management::Agent::Runtime");
|
||||
disable_module_events("Management::Controller::Runtime");
|
||||
disable_module_events("Management::Node");
|
||||
|
|
|
@ -11,4 +11,5 @@ zeek_add_subdir_library(
|
|||
BIFS
|
||||
cluster.bif)
|
||||
|
||||
add_subdirectory(backend)
|
||||
add_subdirectory(serializer)
|
||||
|
|
4
src/cluster/backend/CMakeLists.txt
Normal file
4
src/cluster/backend/CMakeLists.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
option(ENABLE_CLUSTER_BACKEND_ZEROMQ "Enable the ZeroMQ cluster backend" ON)
|
||||
if (ENABLE_CLUSTER_BACKEND_ZEROMQ)
|
||||
add_subdirectory(zeromq)
|
||||
endif ()
|
21
src/cluster/backend/zeromq/CMakeLists.txt
Normal file
21
src/cluster/backend/zeromq/CMakeLists.txt
Normal file
|
@ -0,0 +1,21 @@
|
|||
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
|
||||
|
||||
find_package(ZeroMQ REQUIRED)
|
||||
|
||||
message(STATUS "zeromq: ${ZeroMQ_LIBRARIES} ${ZeroMQ_INCLUDE_DIRS}")
|
||||
|
||||
zeek_add_plugin(
|
||||
Zeek
|
||||
Cluster_Backend_ZeroMQ
|
||||
INCLUDE_DIRS
|
||||
${CMAKE_CURRENT_SOURCE_DIR}
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
${ZeroMQ_INCLUDE_DIRS}
|
||||
DEPENDENCIES
|
||||
${ZeroMQ_LIBRARIES}
|
||||
SOURCES
|
||||
Plugin.cc
|
||||
ZeroMQ-Proxy.cc
|
||||
ZeroMQ.cc
|
||||
BIFS
|
||||
cluster_backend_zeromq.bif)
|
22
src/cluster/backend/zeromq/Plugin.cc
Normal file
22
src/cluster/backend/zeromq/Plugin.cc
Normal file
|
@ -0,0 +1,22 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "zeek/cluster/backend/zeromq/Plugin.h"
|
||||
|
||||
#include "zeek/cluster/Component.h"
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ.h"
|
||||
|
||||
|
||||
namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ {
|
||||
|
||||
Plugin plugin;
|
||||
|
||||
zeek::plugin::Configuration Plugin::Configure() {
|
||||
AddComponent(new cluster::BackendComponent("ZeroMQ", zeek::cluster::zeromq::ZeroMQBackend::Instantiate));
|
||||
|
||||
zeek::plugin::Configuration config;
|
||||
config.name = "Zeek::Cluster_Backend_ZeroMQ";
|
||||
config.description = "Cluster backend using ZeroMQ";
|
||||
return config;
|
||||
}
|
||||
|
||||
} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ
|
14
src/cluster/backend/zeromq/Plugin.h
Normal file
14
src/cluster/backend/zeromq/Plugin.h
Normal file
|
@ -0,0 +1,14 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/plugin/Plugin.h"
|
||||
|
||||
namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ {
|
||||
|
||||
class Plugin : public zeek::plugin::Plugin {
|
||||
public:
|
||||
zeek::plugin::Configuration Configure() override;
|
||||
};
|
||||
|
||||
} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ
|
72
src/cluster/backend/zeromq/ZeroMQ-Proxy.cc
Normal file
72
src/cluster/backend/zeromq/ZeroMQ-Proxy.cc
Normal file
|
@ -0,0 +1,72 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
|
||||
|
||||
#include <zmq.hpp>
|
||||
|
||||
#include "zeek/Reporter.h"
|
||||
#include "zeek/util.h"
|
||||
|
||||
|
||||
using namespace zeek::cluster::zeromq;
|
||||
|
||||
namespace {
|
||||
|
||||
/**
|
||||
* Function that runs zmq_proxy() that provides a central XPUB/XSUB
|
||||
* broker for other Zeek nodes to connect and exchange subscription
|
||||
* information.
|
||||
*/
|
||||
void thread_fun(ProxyThread::Args* args) {
|
||||
zeek::util::detail::set_thread_name("zmq-proxy-thread");
|
||||
|
||||
try {
|
||||
zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
args->xsub.close();
|
||||
args->xpub.close();
|
||||
|
||||
if ( err.num() != ETERM ) {
|
||||
std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool ProxyThread::Start() {
|
||||
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
|
||||
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
|
||||
|
||||
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
||||
|
||||
try {
|
||||
xpub.bind(xpub_endpoint);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
xsub.bind(xsub_endpoint);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
|
||||
return false;
|
||||
}
|
||||
|
||||
args = {.xpub = std::move(xpub), .xsub = std::move(xsub)};
|
||||
|
||||
thread = std::thread(thread_fun, &args);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ProxyThread::Shutdown() {
|
||||
ctx.shutdown();
|
||||
|
||||
if ( thread.joinable() )
|
||||
thread.join();
|
||||
|
||||
ctx.close();
|
||||
}
|
56
src/cluster/backend/zeromq/ZeroMQ-Proxy.h
Normal file
56
src/cluster/backend/zeromq/ZeroMQ-Proxy.h
Normal file
|
@ -0,0 +1,56 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <zmq.hpp>
|
||||
|
||||
|
||||
// Central XPUB/XSUB proxy.
|
||||
//
|
||||
// Spawns a thread that runs zmq_proxy() for a XPUB/XSUB pair.
|
||||
namespace zeek::cluster::zeromq {
|
||||
|
||||
class ProxyThread {
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param xpub_endpoint the XPUB socket address to listen on.
|
||||
* @param xsub_endpoint the XSUB socket address to listen on.
|
||||
* @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket.
|
||||
*/
|
||||
ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop)
|
||||
: xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), xpub_nodrop(xpub_nodrop) {}
|
||||
|
||||
|
||||
~ProxyThread() { Shutdown(); }
|
||||
|
||||
/**
|
||||
* Data kept in object and passed to thread.
|
||||
*/
|
||||
struct Args {
|
||||
zmq::socket_t xpub;
|
||||
zmq::socket_t xsub;
|
||||
};
|
||||
|
||||
/**
|
||||
* Bind the sockets and spawn the thread.
|
||||
*/
|
||||
bool Start();
|
||||
|
||||
/**
|
||||
* Shutdown the ZeroMQ context and join the thread.
|
||||
*/
|
||||
void Shutdown();
|
||||
|
||||
private:
|
||||
zmq::context_t ctx;
|
||||
std::thread thread;
|
||||
Args args;
|
||||
std::string xpub_endpoint;
|
||||
std::string xsub_endpoint;
|
||||
int xpub_nodrop = 1;
|
||||
};
|
||||
} // namespace zeek::cluster::zeromq
|
569
src/cluster/backend/zeromq/ZeroMQ.cc
Normal file
569
src/cluster/backend/zeromq/ZeroMQ.cc
Normal file
|
@ -0,0 +1,569 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "ZeroMQ.h"
|
||||
|
||||
#include <array>
|
||||
#include <cerrno>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdio>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <zmq.hpp>
|
||||
|
||||
#include "zeek/DebugLogger.h"
|
||||
#include "zeek/Event.h"
|
||||
#include "zeek/EventRegistry.h"
|
||||
#include "zeek/IntrusivePtr.h"
|
||||
#include "zeek/Reporter.h"
|
||||
#include "zeek/Val.h"
|
||||
#include "zeek/cluster/Backend.h"
|
||||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/backend/zeromq/Plugin.h"
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
|
||||
|
||||
namespace zeek {
|
||||
|
||||
namespace plugin::Zeek_Cluster_Backend_ZeroMQ {
|
||||
|
||||
extern zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::Plugin plugin;
|
||||
|
||||
}
|
||||
|
||||
namespace cluster::zeromq {
|
||||
|
||||
enum class DebugFlag : zeek_uint_t {
|
||||
NONE = 0,
|
||||
POLL = 1,
|
||||
};
|
||||
|
||||
constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
|
||||
return static_cast<DebugFlag>(x & static_cast<zeek_uint_t>(y));
|
||||
}
|
||||
|
||||
#define ZEROMQ_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::plugin, __VA_ARGS__)
|
||||
|
||||
#define ZEROMQ_THREAD_PRINTF(...) \
|
||||
do { \
|
||||
std::fprintf(stderr, "[zeromq] " __VA_ARGS__); \
|
||||
} while ( 0 )
|
||||
|
||||
#define ZEROMQ_DEBUG_THREAD_PRINTF(flag, ...) \
|
||||
do { \
|
||||
if ( (debug_flags & flag) == flag ) { \
|
||||
ZEROMQ_THREAD_PRINTF(__VA_ARGS__); \
|
||||
} \
|
||||
} while ( 0 )
|
||||
|
||||
namespace {
|
||||
void self_thread_fun(void* arg) {
|
||||
auto* self = static_cast<ZeroMQBackend*>(arg);
|
||||
self->Run();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
|
||||
// Constructor.
|
||||
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls)
|
||||
: ThreadedBackend(std::move(es), std::move(ls)) {
|
||||
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
|
||||
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
|
||||
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
|
||||
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
|
||||
|
||||
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
|
||||
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
|
||||
}
|
||||
|
||||
void ZeroMQBackend::DoInitPostScript() {
|
||||
ThreadedBackend::DoInitPostScript();
|
||||
|
||||
my_node_id = zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::my_node_id")->ToStdString();
|
||||
listen_xpub_endpoint =
|
||||
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString();
|
||||
listen_xsub_endpoint =
|
||||
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString();
|
||||
listen_xpub_nodrop =
|
||||
zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0;
|
||||
connect_xpub_endpoint =
|
||||
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::connect_xpub_endpoint")->ToStdString();
|
||||
connect_xsub_endpoint =
|
||||
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::connect_xsub_endpoint")->ToStdString();
|
||||
listen_log_endpoint =
|
||||
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_log_endpoint")->ToStdString();
|
||||
poll_max_messages = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::poll_max_messages")->Get();
|
||||
debug_flags = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::debug_flags")->Get();
|
||||
|
||||
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
|
||||
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
|
||||
|
||||
main_inproc.bind("inproc://publish-bridge");
|
||||
child_inproc.connect("inproc://publish-bridge");
|
||||
}
|
||||
|
||||
|
||||
void ZeroMQBackend::DoTerminate() {
|
||||
ZEROMQ_DEBUG("Shutting down ctx");
|
||||
ctx.shutdown();
|
||||
ZEROMQ_DEBUG("Joining self_thread");
|
||||
if ( self_thread.joinable() )
|
||||
self_thread.join();
|
||||
|
||||
log_push.close();
|
||||
log_pull.close();
|
||||
xsub.close();
|
||||
xpub.close();
|
||||
main_inproc.close();
|
||||
child_inproc.close();
|
||||
|
||||
ZEROMQ_DEBUG("Closing ctx");
|
||||
ctx.close();
|
||||
|
||||
// If running the proxy thread, terminate it, too.
|
||||
if ( proxy_thread ) {
|
||||
ZEROMQ_DEBUG("Shutting down proxy thread");
|
||||
proxy_thread->Shutdown();
|
||||
}
|
||||
|
||||
ZEROMQ_DEBUG("Terminated");
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoInit() {
|
||||
auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
|
||||
int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0;
|
||||
|
||||
xpub.set(zmq::sockopt::linger, linger_ms);
|
||||
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
|
||||
|
||||
try {
|
||||
xsub.connect(connect_xsub_endpoint);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
xpub.connect(connect_xpub_endpoint);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
auto log_immediate =
|
||||
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
|
||||
|
||||
auto log_sndhwm =
|
||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
|
||||
|
||||
auto log_sndbuf =
|
||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
|
||||
|
||||
auto log_rcvhwm =
|
||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
|
||||
|
||||
auto log_rcvbuf =
|
||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
|
||||
|
||||
ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf,
|
||||
log_rcvhwm, log_rcvbuf, linger_ms);
|
||||
|
||||
log_push.set(zmq::sockopt::sndhwm, log_sndhwm);
|
||||
log_push.set(zmq::sockopt::sndbuf, log_sndbuf);
|
||||
log_push.set(zmq::sockopt::linger, linger_ms);
|
||||
log_push.set(zmq::sockopt::immediate, log_immediate);
|
||||
|
||||
log_pull.set(zmq::sockopt::rcvhwm, log_rcvhwm);
|
||||
log_pull.set(zmq::sockopt::rcvbuf, log_rcvbuf);
|
||||
|
||||
|
||||
if ( ! listen_log_endpoint.empty() ) {
|
||||
ZEROMQ_DEBUG("Listening on log pull socket: %s", listen_log_endpoint.c_str());
|
||||
try {
|
||||
log_pull.bind(listen_log_endpoint);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(),
|
||||
err.what());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const auto& log_endpoints = zeek::id::find_val<zeek::VectorVal>("Cluster::Backend::ZeroMQ::connect_log_endpoints");
|
||||
for ( unsigned int i = 0; i < log_endpoints->Size(); i++ )
|
||||
connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString());
|
||||
|
||||
for ( const auto& endp : connect_log_endpoints ) {
|
||||
ZEROMQ_DEBUG("Connecting log_push socket with %s", endp.c_str());
|
||||
try {
|
||||
log_push.connect(endp);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// At this point we've connected xpub/xsub and any logging endpoints.
|
||||
// However, we cannot tell if we're connected to anything as ZeroMQ does
|
||||
// not trivially expose this information.
|
||||
//
|
||||
// There is the zmq_socket_monitor() API that we could use to get some
|
||||
// more low-level events in the future for logging and possibly script
|
||||
// layer eventing: http://api.zeromq.org/4-2:zmq-socket-monitor
|
||||
|
||||
|
||||
// As of now, message processing happens in a separate thread that is
|
||||
// started below. If we wanted to integrate ZeroMQ as a selectable IO
|
||||
// source rather than going through ThreadedBackend and its flare, the
|
||||
// following post might be useful:
|
||||
//
|
||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||
self_thread = std::thread(self_thread_fun, this);
|
||||
|
||||
// After connecting, call ThreadedBackend::DoInit() to register
|
||||
// the IO source with the loop.
|
||||
return ThreadedBackend::DoInit();
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::SpawnZmqProxyThread() {
|
||||
proxy_thread = std::make_unique<ProxyThread>(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop);
|
||||
return proxy_thread->Start();
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format,
|
||||
const cluster::detail::byte_buffer& buf) {
|
||||
// Publishing an event happens as a multipart message with 4 parts:
|
||||
//
|
||||
// * The topic to publish to - this is required by XPUB/XSUB
|
||||
// * The node's identifier - see Cluster::node_id().
|
||||
// * The format used to serialize the event.
|
||||
// * The serialized event itself.
|
||||
std::array<zmq::const_buffer, 4> parts = {
|
||||
zmq::const_buffer(topic.data(), topic.size()),
|
||||
zmq::const_buffer(my_node_id.data(), my_node_id.size()),
|
||||
zmq::const_buffer(format.data(), format.size()),
|
||||
zmq::const_buffer(buf.data(), buf.size()),
|
||||
};
|
||||
|
||||
ZEROMQ_DEBUG("Publishing %zu bytes to %s", buf.size(), topic.c_str());
|
||||
|
||||
for ( size_t i = 0; i < parts.size(); i++ ) {
|
||||
zmq::send_flags flags = zmq::send_flags::none;
|
||||
if ( i < parts.size() - 1 )
|
||||
flags = flags | zmq::send_flags::sndmore;
|
||||
|
||||
// This should never fail, it will instead block
|
||||
// when HWM is reached. I guess we need to see if
|
||||
// and how this can happen :-/
|
||||
main_inproc.send(parts[i], flags);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) {
|
||||
ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str());
|
||||
try {
|
||||
// Prepend 0x01 byte to indicate subscription to XSUB socket
|
||||
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
|
||||
std::string msg = "\x01" + topic_prefix;
|
||||
xsub.send(zmq::const_buffer(msg.data(), msg.size()));
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) {
|
||||
ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str());
|
||||
try {
|
||||
// Prepend 0x00 byte to indicate subscription to XSUB socket.
|
||||
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
|
||||
std::string msg = "\x00" + topic_prefix;
|
||||
xsub.send(zmq::const_buffer(msg.data(), msg.size()));
|
||||
} catch ( zmq::error_t& err ) {
|
||||
zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
|
||||
cluster::detail::byte_buffer& buf) {
|
||||
ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str());
|
||||
static std::string message_type = "log-write";
|
||||
|
||||
// Publishing a log write is done using 4 parts
|
||||
//
|
||||
// * A constant "log-write" string
|
||||
// * The node's identifier - see Cluster::node_id().
|
||||
// * The format used to serialize the log write.
|
||||
// * The serialized log write itself.
|
||||
std::array<zmq::const_buffer, 4> parts = {
|
||||
zmq::const_buffer{message_type.data(), message_type.size()},
|
||||
zmq::const_buffer(my_node_id.data(), my_node_id.size()),
|
||||
zmq::const_buffer{format.data(), format.size()},
|
||||
zmq::const_buffer{buf.data(), buf.size()},
|
||||
};
|
||||
|
||||
zmq::send_result_t result;
|
||||
for ( size_t i = 0; i < parts.size(); i++ ) {
|
||||
zmq::send_flags flags = zmq::send_flags::dontwait;
|
||||
if ( i < parts.size() - 1 )
|
||||
flags = flags | zmq::send_flags::sndmore;
|
||||
|
||||
result = log_push.send(parts[i], flags);
|
||||
if ( ! result ) {
|
||||
// XXX: Not exactly clear what we should do if we reach HWM.
|
||||
// we could block and hope a logger comes along that empties
|
||||
// our internal queue, or discard messages and log very loudly
|
||||
// and have metrics about it. However, this may happen regularly
|
||||
// at shutdown.
|
||||
//
|
||||
// Maybe that should be configurable?
|
||||
|
||||
// If no logging endpoints were configured, that almost seems on
|
||||
// purpose (and there's a warning elsewhere about this), so skip
|
||||
// logging an error when sending fails.
|
||||
if ( connect_log_endpoints.empty() )
|
||||
return true;
|
||||
|
||||
reporter->Error("Failed to send log write. HWM reached?");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ZeroMQBackend::Run() {
|
||||
using MultipartMessage = std::vector<zmq::message_t>;
|
||||
|
||||
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
|
||||
QueueMessages qmsgs;
|
||||
qmsgs.reserve(msgs.size());
|
||||
|
||||
for ( const auto& msg : msgs ) {
|
||||
// sender, format, type, payload
|
||||
if ( msg.size() != 4 ) {
|
||||
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
|
||||
continue;
|
||||
}
|
||||
|
||||
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
|
||||
qmsgs.emplace_back(LogMessage{.format = std::string(msg[2].data<const char>(), msg[2].size()),
|
||||
.payload = std::move(payload)});
|
||||
}
|
||||
|
||||
QueueForProcessing(std::move(qmsgs));
|
||||
};
|
||||
|
||||
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
|
||||
// Forward messages from the inprocess bridge to xpub.
|
||||
for ( auto& msg : msgs ) {
|
||||
assert(msg.size() == 4);
|
||||
|
||||
for ( auto& part : msg ) {
|
||||
zmq::send_flags flags = zmq::send_flags::dontwait;
|
||||
if ( part.more() )
|
||||
flags = flags | zmq::send_flags::sndmore;
|
||||
|
||||
zmq::send_result_t result;
|
||||
do {
|
||||
try {
|
||||
result = xpub.send(part, flags);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
// XXX: Not sure if the return false is so great here.
|
||||
//
|
||||
// Also, if we fail to publish, should we block rather
|
||||
// than discard?
|
||||
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num());
|
||||
break;
|
||||
}
|
||||
// EAGAIN returns empty result, means try again!
|
||||
} while ( ! result );
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
|
||||
QueueMessages qmsgs;
|
||||
qmsgs.reserve(msgs.size());
|
||||
|
||||
for ( const auto& msg : msgs ) {
|
||||
if ( msg.size() != 1 ) {
|
||||
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if the messages starts with \x00 or \x01 to understand if it's
|
||||
// a subscription or unsubscription message.
|
||||
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
|
||||
if ( first == 0 || first == 1 ) {
|
||||
QueueMessage qm;
|
||||
auto* start = msg[0].data<std::byte>() + 1;
|
||||
auto* end = msg[0].data<std::byte>() + msg[0].size();
|
||||
detail::byte_buffer topic(start, end);
|
||||
if ( first == 1 ) {
|
||||
qm = BackendMessage{1, std::move(topic)};
|
||||
}
|
||||
else if ( first == 0 ) {
|
||||
qm = BackendMessage{0, std::move(topic)};
|
||||
}
|
||||
else {
|
||||
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
|
||||
continue;
|
||||
}
|
||||
|
||||
qmsgs.emplace_back(std::move(qm));
|
||||
}
|
||||
}
|
||||
|
||||
QueueForProcessing(std::move(qmsgs));
|
||||
};
|
||||
|
||||
auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) {
|
||||
QueueMessages qmsgs;
|
||||
qmsgs.reserve(msgs.size());
|
||||
|
||||
for ( const auto& msg : msgs ) {
|
||||
if ( msg.size() != 4 ) {
|
||||
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter out messages that are coming from this node.
|
||||
std::string sender(msg[1].data<const char>(), msg[1].size());
|
||||
if ( sender == my_node_id )
|
||||
continue;
|
||||
|
||||
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
|
||||
qmsgs.emplace_back(EventMessage{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
|
||||
.format = std::string(msg[2].data<const char>(), msg[2].size()),
|
||||
.payload = std::move(payload)});
|
||||
}
|
||||
|
||||
QueueForProcessing(std::move(qmsgs));
|
||||
};
|
||||
|
||||
struct SocketInfo {
|
||||
zmq::socket_ref socket;
|
||||
std::string name;
|
||||
std::function<void(std::vector<MultipartMessage>&)> handler;
|
||||
};
|
||||
|
||||
std::vector<SocketInfo> sockets = {
|
||||
{.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages},
|
||||
{.socket = xpub, .name = "xpub", .handler = HandleXPubMessages},
|
||||
{.socket = xsub, .name = "xsub", .handler = HandleXSubMessages},
|
||||
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
|
||||
};
|
||||
|
||||
std::vector<zmq::pollitem_t> poll_items(sockets.size());
|
||||
|
||||
while ( true ) {
|
||||
for ( size_t i = 0; i < sockets.size(); i++ )
|
||||
poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR};
|
||||
|
||||
// Awkward.
|
||||
std::vector<std::vector<MultipartMessage>> rcv_messages(sockets.size());
|
||||
try {
|
||||
int r = zmq::poll(poll_items, std::chrono::seconds(-1));
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r);
|
||||
|
||||
for ( size_t i = 0; i < poll_items.size(); i++ ) {
|
||||
const auto& item = poll_items[i];
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: items[%lu]=%s %s %s\n", i, sockets[i].name.c_str(),
|
||||
item.revents & ZMQ_POLLIN ? "pollin " : "",
|
||||
item.revents & ZMQ_POLLERR ? "err" : "");
|
||||
|
||||
if ( item.revents & ZMQ_POLLERR ) {
|
||||
// What should we be doing? Re-open sockets? Terminate?
|
||||
ZEROMQ_THREAD_PRINTF("poll: error: POLLERR on socket %zu %s %p revents=%x\n", i,
|
||||
sockets[i].name.c_str(), item.socket, item.revents);
|
||||
}
|
||||
|
||||
// Nothing to do?
|
||||
if ( (item.revents & ZMQ_POLLIN) == 0 )
|
||||
continue;
|
||||
|
||||
bool consumed_one = false;
|
||||
|
||||
// Read messages from the socket.
|
||||
do {
|
||||
zmq::message_t msg;
|
||||
rcv_messages[i].emplace_back(); // make room for a multipart message
|
||||
auto& into = rcv_messages[i].back();
|
||||
|
||||
// Only receive up to poll_max_messages from an individual
|
||||
// socket. Move on to the next when exceeded. The last pushed
|
||||
// message (empty) is popped at the end of the loop.
|
||||
if ( poll_max_messages > 0 && rcv_messages[i].size() > poll_max_messages ) {
|
||||
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: %s rcv_messages[%zu] full!\n",
|
||||
sockets[i].name.c_str(), i);
|
||||
break;
|
||||
}
|
||||
|
||||
consumed_one = false;
|
||||
bool more = false;
|
||||
|
||||
// Read a multi-part message.
|
||||
do {
|
||||
auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait);
|
||||
if ( recv_result ) {
|
||||
consumed_one = true;
|
||||
more = msg.more();
|
||||
into.emplace_back(std::move(msg));
|
||||
}
|
||||
else {
|
||||
// EAGAIN and more flag set? Try again!
|
||||
if ( more )
|
||||
continue;
|
||||
}
|
||||
} while ( more );
|
||||
} while ( consumed_one );
|
||||
|
||||
assert(rcv_messages[i].back().size() == 0);
|
||||
rcv_messages[i].pop_back();
|
||||
}
|
||||
} catch ( zmq::error_t& err ) {
|
||||
if ( err.num() == ETERM )
|
||||
return;
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
// At this point, we've received anything that was readable from the sockets.
|
||||
// Now interpret and enqueue it into messages.
|
||||
for ( size_t i = 0; i < sockets.size(); i++ ) {
|
||||
if ( rcv_messages[i].empty() )
|
||||
continue;
|
||||
|
||||
sockets[i].handler(rcv_messages[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
|
||||
if ( tag == 0 || tag == 1 ) {
|
||||
std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()};
|
||||
zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription;
|
||||
|
||||
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str());
|
||||
zeek::event_mgr.Enqueue(eh, zeek::make_intrusive<zeek::StringVal>(topic));
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
zeek::reporter->Error("Ignoring bad BackendMessage tag=%d", tag);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} // namespace cluster::zeromq
|
||||
} // namespace zeek
|
99
src/cluster/backend/zeromq/ZeroMQ.h
Normal file
99
src/cluster/backend/zeromq/ZeroMQ.h
Normal file
|
@ -0,0 +1,99 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
#include <zmq.hpp>
|
||||
|
||||
#include "zeek/cluster/Backend.h"
|
||||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
|
||||
|
||||
namespace zeek::cluster::zeromq {
|
||||
|
||||
class ZeroMQBackend : public cluster::ThreadedBackend {
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls);
|
||||
|
||||
/**
|
||||
* Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen
|
||||
* sockets. Only one node in a cluster should do this.
|
||||
*/
|
||||
bool SpawnZmqProxyThread();
|
||||
|
||||
/**
|
||||
* Run method for background thread.
|
||||
*/
|
||||
void Run();
|
||||
|
||||
/**
|
||||
* Component factory.
|
||||
*/
|
||||
static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer,
|
||||
std::unique_ptr<LogSerializer> log_serializer) {
|
||||
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer));
|
||||
}
|
||||
|
||||
private:
|
||||
void DoInitPostScript() override;
|
||||
|
||||
bool DoInit() override;
|
||||
|
||||
void DoTerminate() override;
|
||||
|
||||
bool DoPublishEvent(const std::string& topic, const std::string& format,
|
||||
const cluster::detail::byte_buffer& buf) override;
|
||||
|
||||
bool DoSubscribe(const std::string& topic_prefix) override;
|
||||
|
||||
bool DoUnsubscribe(const std::string& topic_prefix) override;
|
||||
|
||||
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
|
||||
cluster::detail::byte_buffer& buf) override;
|
||||
|
||||
const char* Tag() override { return "ZeroMQ"; }
|
||||
|
||||
bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override;
|
||||
|
||||
// Script level variables.
|
||||
std::string my_node_id;
|
||||
std::string connect_xsub_endpoint;
|
||||
std::string connect_xpub_endpoint;
|
||||
std::string listen_xsub_endpoint;
|
||||
std::string listen_xpub_endpoint;
|
||||
std::string listen_log_endpoint;
|
||||
int listen_xpub_nodrop = 1;
|
||||
|
||||
zeek_uint_t poll_max_messages = 0;
|
||||
zeek_uint_t debug_flags = 0;
|
||||
|
||||
EventHandlerPtr event_subscription;
|
||||
EventHandlerPtr event_unsubscription;
|
||||
|
||||
zmq::context_t ctx;
|
||||
zmq::socket_t xsub;
|
||||
zmq::socket_t xpub;
|
||||
|
||||
// inproc sockets used for sending
|
||||
// publish messages to xpub in a
|
||||
// thread safe manner.
|
||||
zmq::socket_t main_inproc;
|
||||
zmq::socket_t child_inproc;
|
||||
|
||||
// Sockets used for logging. The log_push socket connects
|
||||
// with one or more logger-like nodes. Logger nodes listen
|
||||
// on the log_pull socket.
|
||||
std::vector<std::string> connect_log_endpoints;
|
||||
zmq::socket_t log_push;
|
||||
zmq::socket_t log_pull;
|
||||
|
||||
std::thread self_thread;
|
||||
|
||||
std::unique_ptr<ProxyThread> proxy_thread;
|
||||
};
|
||||
|
||||
} // namespace zeek::cluster::zeromq
|
16
src/cluster/backend/zeromq/cluster_backend_zeromq.bif
Normal file
16
src/cluster/backend/zeromq/cluster_backend_zeromq.bif
Normal file
|
@ -0,0 +1,16 @@
|
|||
%%{
|
||||
#include "ZeroMQ.h"
|
||||
%%}
|
||||
|
||||
function Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread%(%): bool
|
||||
%{
|
||||
// Spawn the ZeroMQ broker thread.
|
||||
auto *zeromq_backend = dynamic_cast<zeek::cluster::zeromq::ZeroMQBackend*>(zeek::cluster::backend);
|
||||
if ( ! zeromq_backend )
|
||||
{
|
||||
zeek::emit_builtin_error("Cluster::backend not set to ZeroMQ?");
|
||||
return zeek::val_mgr->Bool(false);
|
||||
}
|
||||
|
||||
return zeek::val_mgr->Bool(zeromq_backend->SpawnZmqProxyThread());
|
||||
%}
|
52
src/cluster/backend/zeromq/cmake/FindZeroMQ.cmake
Normal file
52
src/cluster/backend/zeromq/cmake/FindZeroMQ.cmake
Normal file
|
@ -0,0 +1,52 @@
|
|||
include(FindPackageHandleStandardArgs)
|
||||
|
||||
find_library(ZeroMQ_LIBRARY NAMES zmq HINTS ${ZeroMQ_ROOT_DIR}/lib)
|
||||
|
||||
find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.h HINTS ${ZeroMQ_ROOT_DIR}/include)
|
||||
|
||||
find_path(ZeroMQ_CPP_INCLUDE_DIR NAMES zmq.hpp HINTS ${ZeroMQ_ROOT_DIR}/include)
|
||||
|
||||
function (set_cppzmq_version)
|
||||
# Extract the version from
|
||||
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MAJOR_VERSION_H
|
||||
REGEX "^#define CPPZMQ_VERSION_MAJOR [0-9]+$")
|
||||
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MINOR_VERSION_H
|
||||
REGEX "^#define CPPZMQ_VERSION_MINOR [0-9]+$")
|
||||
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_PATCH_VERSION_H
|
||||
REGEX "^#define CPPZMQ_VERSION_PATCH [0-9]+$")
|
||||
string(REGEX REPLACE "^.*MAJOR ([0-9]+)$" "\\1" CPPZMQ_MAJOR_VERSION
|
||||
"${CPPZMQ_MAJOR_VERSION_H}")
|
||||
string(REGEX REPLACE "^.*MINOR ([0-9]+)$" "\\1" CPPZMQ_MINOR_VERSION
|
||||
"${CPPZMQ_MINOR_VERSION_H}")
|
||||
string(REGEX REPLACE "^.*PATCH ([0-9]+)$" "\\1" CPPZMQ_PATCH_VERSION
|
||||
"${CPPZMQ_PATCH_VERSION_H}")
|
||||
|
||||
set(ZeroMQ_CPP_VERSION "${CPPZMQ_MAJOR_VERSION}.${CPPZMQ_MINOR_VERSION}.${CPPZMQ_PATCH_VERSION}"
|
||||
PARENT_SCOPE)
|
||||
endfunction ()
|
||||
|
||||
if (ZeroMQ_CPP_INCLUDE_DIR)
|
||||
set_cppzmq_version()
|
||||
endif ()
|
||||
|
||||
if (NOT ZeroMQ_CPP_VERSION)
|
||||
# Probably no zmq.hpp file, use the version from auxil
|
||||
set(ZeroMQ_CPP_INCLUDE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/auxil/cppzmq"
|
||||
CACHE FILEPATH "Include path for cppzmq" FORCE)
|
||||
set_cppzmq_version()
|
||||
elseif (ZeroMQ_CPP_VERSION VERSION_LESS "4.9.0")
|
||||
message(STATUS "Found old cppzmq version ${ZeroMQ_CPP_VERSION}, using bundled version")
|
||||
set(ZeroMQ_CPP_INCLUDE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/auxil/cppzmq"
|
||||
CACHE FILEPATH "Include path for cppzmq" FORCE)
|
||||
set_cppzmq_version()
|
||||
endif ()
|
||||
|
||||
message(STATUS "Using cppzmq ${ZeroMQ_CPP_VERSION} from ${ZeroMQ_CPP_INCLUDE_DIR}")
|
||||
|
||||
find_package_handle_standard_args(
|
||||
ZeroMQ FOUND_VAR ZeroMQ_FOUND REQUIRED_VARS ZeroMQ_LIBRARY ZeroMQ_INCLUDE_DIR
|
||||
ZeroMQ_CPP_INCLUDE_DIR ZeroMQ_CPP_VERSION)
|
||||
|
||||
set(ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY})
|
||||
set(ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} ${ZeroMQ_CPP_INCLUDE_DIR})
|
||||
set(ZeroMQ_FOUND ${ZeroMQ_FOUND})
|
|
@ -0,0 +1,21 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
16
testing/btest/Baseline/cluster.zeromq.logging/manager.out
Normal file
16
testing/btest/Baseline/cluster.zeromq.logging/manager.out
Normal file
|
@ -0,0 +1,16 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
A zeek_init, manager
|
||||
B node_up, logger
|
||||
B node_up, proxy
|
||||
B node_up, worker-1
|
||||
B node_up, worker-2
|
||||
B nodes_up, 2
|
||||
B nodes_up, 3
|
||||
B nodes_up, 4
|
||||
B nodes_up, 5
|
||||
C send_finish
|
||||
D node_down, logger
|
||||
D node_down, proxy
|
||||
D node_down, worker-1
|
||||
D node_down, worker-2
|
||||
D send_finish to logger
|
21
testing/btest/Baseline/cluster.zeromq.logging/node_up.sorted
Normal file
21
testing/btest/Baseline/cluster.zeromq.logging/node_up.sorted
Normal file
|
@ -0,0 +1,21 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
logger manager
|
||||
logger proxy
|
||||
logger worker-1
|
||||
logger worker-2
|
||||
manager logger
|
||||
manager proxy
|
||||
manager worker-1
|
||||
manager worker-2
|
||||
proxy logger
|
||||
proxy manager
|
||||
proxy worker-1
|
||||
proxy worker-2
|
||||
worker-1 logger
|
||||
worker-1 manager
|
||||
worker-1 proxy
|
||||
worker-1 worker-2
|
||||
worker-2 logger
|
||||
worker-2 manager
|
||||
worker-2 proxy
|
||||
worker-2 worker-1
|
|
@ -0,0 +1,13 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
|
@ -0,0 +1,11 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
A node_up, proxy
|
||||
A node_up, worker-1
|
||||
A node_up, worker-2
|
||||
B nodes_up, 2
|
||||
B nodes_up, 3
|
||||
B nodes_up, 4
|
||||
D node_down, proxy
|
||||
D node_down, worker-1
|
||||
D node_down, worker-2
|
||||
zeek_init, manager
|
|
@ -0,0 +1,13 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
manager proxy
|
||||
manager worker-1
|
||||
manager worker-2
|
||||
proxy manager
|
||||
proxy worker-1
|
||||
proxy worker-2
|
||||
worker-1 manager
|
||||
worker-1 proxy
|
||||
worker-1 worker-2
|
||||
worker-2 manager
|
||||
worker-2 proxy
|
||||
worker-2 worker-1
|
|
@ -0,0 +1,21 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
|
||||
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
|
|
@ -0,0 +1,3 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
node_up, worker-1
|
||||
node_down, worker-1
|
|
@ -0,0 +1,2 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
node_up, manager
|
|
@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek
|
|||
build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek
|
||||
|
|
|
@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek
|
|||
build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek
|
||||
build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek
|
||||
|
|
|
@ -1,5 +1 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
received termination signal
|
||||
received termination signal
|
||||
received termination signal
|
||||
received termination signal
|
||||
|
|
|
@ -337,6 +337,7 @@
|
|||
0.000000 MetaHookPost LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> -1
|
||||
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> -1
|
||||
|
@ -643,6 +644,7 @@
|
|||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> (-1, <no content>)
|
||||
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> (-1, <no content>)
|
||||
|
@ -1281,6 +1283,7 @@
|
|||
0.000000 MetaHookPre LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek)
|
||||
|
@ -1587,6 +1590,7 @@
|
|||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek)
|
||||
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek)
|
||||
|
@ -2224,6 +2228,7 @@
|
|||
0.000000 | HookLoadFile ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek
|
||||
0.000000 | HookLoadFile ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek
|
||||
|
@ -2530,6 +2535,7 @@
|
|||
0.000000 | HookLoadFileExtended ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek
|
||||
0.000000 | HookLoadFileExtended ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek
|
||||
|
|
8
testing/btest/Files/zeromq/cluster-layout-no-logger.zeek
Normal file
8
testing/btest/Files/zeromq/cluster-layout-no-logger.zeek
Normal file
|
@ -0,0 +1,8 @@
|
|||
redef Cluster::manager_is_logger = T;
|
||||
|
||||
redef Cluster::nodes = {
|
||||
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))],
|
||||
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
};
|
9
testing/btest/Files/zeromq/cluster-layout-simple.zeek
Normal file
9
testing/btest/Files/zeromq/cluster-layout-simple.zeek
Normal file
|
@ -0,0 +1,9 @@
|
|||
redef Cluster::manager_is_logger = F;
|
||||
|
||||
redef Cluster::nodes = {
|
||||
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
|
||||
["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))],
|
||||
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
};
|
10
testing/btest/Files/zeromq/cluster-layout-two-loggers.zeek
Normal file
10
testing/btest/Files/zeromq/cluster-layout-two-loggers.zeek
Normal file
|
@ -0,0 +1,10 @@
|
|||
redef Cluster::manager_is_logger = F;
|
||||
|
||||
redef Cluster::nodes = {
|
||||
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
|
||||
["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_1"))],
|
||||
["logger-2"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_2"))],
|
||||
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
|
||||
};
|
11
testing/btest/Files/zeromq/test-bootstrap.zeek
Normal file
11
testing/btest/Files/zeromq/test-bootstrap.zeek
Normal file
|
@ -0,0 +1,11 @@
|
|||
# Helper scripts for test expecting XPUB/XSUB ports allocated by
|
||||
# btest and configuring the ZeroMQ globals.
|
||||
@load base/utils/numbers
|
||||
|
||||
@load frameworks/cluster/backend/zeromq
|
||||
@load frameworks/cluster/backend/zeromq/connect
|
||||
|
||||
redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT")));
|
||||
redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT")));
|
||||
redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT")));
|
||||
redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT")));
|
139
testing/btest/cluster/zeromq/logging.zeek
Normal file
139
testing/btest/cluster/zeromq/logging.zeek
Normal file
|
@ -0,0 +1,139 @@
|
|||
# @TEST-DOC: Startup a ZeroMQ cluster by hand, testing basic logging and node_up and node_down events.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run logger "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=logger zeek -b ../other.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 10
|
||||
# @TEST-EXEC: btest-diff cluster.log.normalized
|
||||
# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted
|
||||
# @TEST-EXEC: btest-diff node_up.sorted
|
||||
# @TEST-EXEC: sort manager/out > manager.out
|
||||
# @TEST-EXEC: btest-diff manager.out
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
redef Log::default_rotation_interval = 0sec;
|
||||
redef Log::flush_interval = 0.01sec;
|
||||
|
||||
type Info: record {
|
||||
self: string &log &default=Cluster::node;
|
||||
node: string &log;
|
||||
};
|
||||
|
||||
redef enum Log::ID += { TEST_LOG };
|
||||
|
||||
global finish: event(name: string) &is_used;
|
||||
|
||||
event zeek_init() {
|
||||
print "A zeek_init", Cluster::node;
|
||||
Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]);
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string) &priority=-5 {
|
||||
print "B node_up", name;
|
||||
Log::write(TEST_LOG, [$node=name]);
|
||||
# Log::flush(TEST_LOG);
|
||||
# Log::flush(Cluster::LOG);
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
global nodes_up: set[string] = {"manager"};
|
||||
global nodes_down: set[string] = {"manager"};
|
||||
|
||||
event send_finish() {
|
||||
print "C send_finish";
|
||||
for ( n in nodes_up )
|
||||
if ( n != "logger" )
|
||||
Cluster::publish(Cluster::node_topic(n), finish, Cluster::node);
|
||||
}
|
||||
|
||||
event check_cluster_log() {
|
||||
if ( file_size("DONE") >= 0 ) {
|
||||
event send_finish();
|
||||
return;
|
||||
}
|
||||
|
||||
system("../check-cluster-log.sh");
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event zeek_init() {
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string) &priority=-1 {
|
||||
add nodes_up[name];
|
||||
print "B nodes_up", |nodes_up|;
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string) {
|
||||
print "D node_down", name;
|
||||
add nodes_down[name];
|
||||
|
||||
if ( |nodes_down| == |Cluster::nodes| - 1 ) {
|
||||
print "D send_finish to logger";
|
||||
Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node);
|
||||
}
|
||||
if ( |nodes_down| == |Cluster::nodes| )
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE other.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event finish(name: string) {
|
||||
print fmt("finish from %s", name);
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE check-cluster-log.sh
|
||||
#!/bin/sh
|
||||
#
|
||||
# This script checks logger/cluster.log until the expected number
|
||||
# of log entries have been observed and puts a normalized version
|
||||
# into the testing directory for baselining.
|
||||
CLUSTER_LOG=../logger/cluster.log
|
||||
|
||||
if [ ! -f $CLUSTER_LOG ]; then
|
||||
echo "$CLUSTER_LOG not found!" >&2
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
if [ -f DONE ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Remove hostname and pid from node id in message.
|
||||
zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log.tmp
|
||||
|
||||
# 4 times 5
|
||||
if [ $(wc -l < cluster.log.tmp) = 20 ]; then
|
||||
echo "DONE!" >&2
|
||||
mv cluster.log.tmp ../cluster.log.normalized
|
||||
echo "DONE" > DONE
|
||||
fi
|
||||
|
||||
exit 0
|
||||
# @TEST-END-FILE
|
129
testing/btest/cluster/zeromq/manager-is-logger.zeek
Normal file
129
testing/btest/cluster/zeromq/manager-is-logger.zeek
Normal file
|
@ -0,0 +1,129 @@
|
|||
# @TEST-DOC: Startup a ZeroMQ cluster without a logger, testing logging through the manager.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 10
|
||||
# @TEST-EXEC: btest-diff cluster.log.normalized
|
||||
# @TEST-EXEC: zeek-cut -F ' ' < ./manager/node_up.log | sort > node_up.sorted
|
||||
# @TEST-EXEC: btest-diff node_up.sorted
|
||||
# @TEST-EXEC: sort manager/out > manager.out
|
||||
# @TEST-EXEC: btest-diff manager.out
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
redef Log::default_rotation_interval = 0sec;
|
||||
redef Log::flush_interval = 0.01sec;
|
||||
|
||||
type Info: record {
|
||||
self: string &log &default=Cluster::node;
|
||||
node: string &log;
|
||||
};
|
||||
|
||||
redef enum Log::ID += { TEST_LOG };
|
||||
|
||||
global finish: event(name: string) &is_used;
|
||||
|
||||
event zeek_init() {
|
||||
print "zeek_init", Cluster::node;
|
||||
Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]);
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string) {
|
||||
print "A node_up", name;
|
||||
Log::write(TEST_LOG, [$node=name]);
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
global nodes_up: set[string] = {"manager"};
|
||||
global nodes_down: set[string] = {"manager"};
|
||||
|
||||
event send_finish() {
|
||||
for ( n in nodes_up )
|
||||
Cluster::publish(Cluster::node_topic(n), finish, Cluster::node);
|
||||
}
|
||||
|
||||
event check_cluster_log() {
|
||||
if ( file_size("DONE") >= 0 ) {
|
||||
event send_finish();
|
||||
return;
|
||||
}
|
||||
|
||||
system("../check-cluster-log.sh");
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event zeek_init() {
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string) {
|
||||
add nodes_up[name];
|
||||
print "B nodes_up", |nodes_up|;
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string) {
|
||||
print "D node_down", name;
|
||||
add nodes_down[name];
|
||||
if ( |nodes_down| == |Cluster::nodes| )
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE other.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event finish(name: string) {
|
||||
print fmt("finish from %s", name);
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
#
|
||||
# @TEST-START-FILE check-cluster-log.sh
|
||||
#!/bin/sh
|
||||
#
|
||||
# This script checks cluster.log until the expected number
|
||||
# of log entries have been observed and puts a normalized version
|
||||
# into the testing directory for baselining.
|
||||
CLUSTER_LOG=cluster.log
|
||||
|
||||
if [ ! -f $CLUSTER_LOG ]; then
|
||||
echo "$CLUSTER_LOG not found!" >&2
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
if [ -f DONE ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Remove hostname and pid from node id in message.
|
||||
zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log.tmp
|
||||
|
||||
# 4 times 3
|
||||
if [ $(wc -l < cluster.log.tmp) = 12 ]; then
|
||||
echo "DONE!" >&2
|
||||
mv cluster.log.tmp ../cluster.log.normalized
|
||||
echo "DONE" > DONE
|
||||
fi
|
||||
|
||||
exit 0
|
||||
# @TEST-END-FILE
|
86
testing/btest/cluster/zeromq/supervisor.zeek
Normal file
86
testing/btest/cluster/zeromq/supervisor.zeek
Normal file
|
@ -0,0 +1,86 @@
|
|||
# @TEST-DOC: Configure a ZeroMQ cluster with Zeek's supervisor.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
|
||||
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run supervisor "ZEEKPATH=$ZEEKPATH:.. && zeek -j ../supervisor.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-wait 10
|
||||
# @TEST-EXEC: btest-diff supervisor/cluster.log
|
||||
|
||||
redef Log::default_rotation_interval = 0secs;
|
||||
redef Log::flush_interval = 0.01sec;
|
||||
|
||||
@if ( ! Supervisor::is_supervisor() )
|
||||
@load ./zeromq-test-bootstrap
|
||||
@else
|
||||
|
||||
# The supervisor peeks into logger/cluster.log to initate a shutdown when
|
||||
# all nodes have said hello to each other. See the check-cluster.log.sh
|
||||
# script below.
|
||||
event check_cluster_log() {
|
||||
system_env("../check-cluster-log.sh", table(["SUPERVISOR_PID"] = cat(getpid())));
|
||||
|
||||
schedule 0.1sec { check_cluster_log() };
|
||||
}
|
||||
|
||||
event zeek_init()
|
||||
{
|
||||
if ( ! Supervisor::is_supervisor() )
|
||||
return;
|
||||
|
||||
Broker::listen("127.0.0.1", 9999/tcp);
|
||||
|
||||
local cluster: table[string] of Supervisor::ClusterEndpoint;
|
||||
cluster["manager"] = [$role=Supervisor::MANAGER, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["logger"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))];
|
||||
cluster["proxy"] = [$role=Supervisor::PROXY, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["worker-1"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
|
||||
cluster["worker-2"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
|
||||
|
||||
for ( n, ep in cluster )
|
||||
{
|
||||
local sn = Supervisor::NodeConfig($name=n, $bare_mode=T, $cluster=cluster, $directory=n);
|
||||
local res = Supervisor::create(sn);
|
||||
|
||||
if ( res != "" )
|
||||
print fmt("supervisor failed to create node '%s': %s", n, res);
|
||||
}
|
||||
|
||||
# Start polling the cluster.log
|
||||
event check_cluster_log();
|
||||
}
|
||||
@endif
|
||||
|
||||
# @TEST-START-FILE check-cluster-log.sh
|
||||
#!/bin/sh
|
||||
#
|
||||
# This script checks logger/cluster.log until the expected number
|
||||
# of log entries have been observed and puts a normalized version
|
||||
# into the current directory. This runs from the supervisor.
|
||||
if [ ! -f logger/cluster.log ]; then
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
if [ -f DONE ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Remove hostname and pid from node id in message.
|
||||
zeek-cut node message < logger/cluster.log | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log
|
||||
|
||||
if [ $(wc -l < cluster.log) = 20 ]; then
|
||||
echo "DONE!" >&2
|
||||
# Trigger shutdown through supervisor.
|
||||
kill ${ZEEK_ARG_SUPERVISOR_PID};
|
||||
echo "DONE" > DONE
|
||||
fi
|
||||
# @TEST-END-FILE
|
53
testing/btest/cluster/zeromq/two-nodes.zeek
Normal file
53
testing/btest/cluster/zeromq/two-nodes.zeek
Normal file
|
@ -0,0 +1,53 @@
|
|||
# @TEST-DOC: Startup a manager running the ZeroMQ proxy thread, a worker connects and the manager sends a finish event to terminate the worker.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
# @TEST-EXEC: btest-diff ./manager/out
|
||||
# @TEST-EXEC: btest-diff ./worker/out
|
||||
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
global finish: event(name: string);
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
# If a node comes up that isn't us, send it a finish event.
|
||||
event Cluster::node_up(name: string, id: string) {
|
||||
print "node_up", name;
|
||||
Cluster::publish(Cluster::nodeid_topic(id), finish, Cluster::node);
|
||||
}
|
||||
|
||||
# If the worker vanishes, finish the test.
|
||||
event Cluster::node_down(name: string, id: string) {
|
||||
print "node_down", name;
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event Cluster::node_up(name: string, id: string) {
|
||||
print "node_up", name;
|
||||
}
|
||||
|
||||
event finish(name: string) &is_used {
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
4
testing/scripts/have-zeromq
Executable file
4
testing/scripts/have-zeromq
Executable file
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh
|
||||
|
||||
zeek -N Zeek::Cluster_Backend_ZeroMQ >/dev/null
|
||||
exit $?
|
Loading…
Add table
Add a link
Reference in a new issue