Merge remote-tracking branch 'origin/topic/awelzel/pluggable-cluster-backends-part4'

* origin/topic/awelzel/pluggable-cluster-backends-part4:
  Update ZAM BiF-tracking
  configure: Add --disable-cluster-backend-zeromq
  CMakeLists: Cluster backends output
  zeromq: Conditionally enable by default
  btest/generic: Add publish_hrw(), publish_rr() and logging tests
  generate-docs: Run on Ubuntu 24.04, add cppzmq
  docker: Add cppzmq/libzmq dependencies
  tsan_suppressions: Add called_from_lib: libzmq
  ci: Add cppzmq and libzmq to most platforms
  cluster/backend/zeromq: Add ZeroMQ based cluster backend
  cluster/backend/zeromq: Add cppzmq submodule
  cluster: Add Cluster::node_id(), allow redef of node_topic(), nodeid_topic()
  cluster: Move publish_hrw() and publish_rr() to cluster.bif
This commit is contained in:
Arne Welzel 2024-12-11 09:28:04 +01:00
commit 4ee2f9256b
89 changed files with 3035 additions and 109 deletions

View file

@ -17,7 +17,7 @@ jobs:
permissions:
contents: write # for Git to git push
if: github.repository == 'zeek/zeek'
runs-on: ubuntu-22.04
runs-on: ubuntu-24.04
steps:
# We only perform a push if the action was triggered via a schedule
@ -51,6 +51,7 @@ jobs:
bsdmainutils \
ccache \
cmake \
cppzmq-dev \
flex \
g++ \
gcc \
@ -71,7 +72,7 @@ jobs:
# `python2` so this is a simple workaround until we drop Python 2
# support and explicitly use `python3` for all invocations.
sudo ln -sf /usr/bin/python3 /usr/local/bin/python
sudo pip3 install -r doc/requirements.txt
sudo pip3 install --break-system-packages -r doc/requirements.txt
- name: ccache
uses: hendrikmuhs/ccache-action@v1.2

3
.gitmodules vendored
View file

@ -76,3 +76,6 @@
[submodule "auxil/prometheus-cpp"]
path = auxil/prometheus-cpp
url = https://github.com/zeek/prometheus-cpp
[submodule "src/cluster/backend/zeromq/auxil/cppzmq"]
path = src/cluster/backend/zeromq/auxil/cppzmq
url = https://github.com/zeromq/cppzmq

54
CHANGES
View file

@ -1,3 +1,57 @@
7.1.0-dev.760 | 2024-12-11 09:28:04 +0100
* Update ZAM BiF-tracking (Arne Welzel, Corelight)
* configure: Add --disable-cluster-backend-zeromq (Arne Welzel, Corelight)
* CMakeLists: Cluster backends output (Arne Welzel, Corelight)
* zeromq: Conditionally enable by default (Arne Welzel, Corelight)
Instead of having ZeroMQ as a new dependency, enable the ZeroMQ backend only
if ZeroMQ is available on the system as suggested by Tim.
* btest/generic: Add publish_hrw(), publish_rr() and logging tests (Arne Welzel, Corelight)
They currently use zeromq, but technically they should be valid for
any other backend, too, even broker.
* generate-docs: Run on Ubuntu 24.04, add cppzmq (Arne Welzel, Corelight)
* docker: Add cppzmq/libzmq dependencies (Arne Welzel, Corelight)
* tsan_suppressions: Add called_from_lib: libzmq (Arne Welzel, Corelight)
* ci: Add cppzmq and libzmq to most platforms (Arne Welzel, Corelight)
* cluster/backend/zeromq: Add ZeroMQ based cluster backend (Arne Welzel, Corelight)
This is a cluster backend implementation using a central XPUB/XSUB proxy
that by default runs on the manager node. Logging is implemented leveraging
PUSH/PULL sockets between logger and other nodes, rather than going
through XPUB/XSUB.
The test-all-policy-cluster baseline changed: Previously, Broker::peer()
would be called from setup-connections.zeek, causing the IO loop to be
alive. With the ZeroMQ backend, the IO loop is only alive when
Cluster::init() is called, but that doesn't happen anymore.
* cluster/backend/zeromq: Add cppzmq submodule (Arne Welzel, Corelight)
Not all supported platforms provide a recent enough cppzmq version,
add a fallback as submodule. cppzmq is a header-only library, so there's
no build step involved.
* cluster: Add Cluster::node_id(), allow redef of node_topic(), nodeid_topic() (Arne Welzel, Corelight)
This provides a way for non-broker cluster backends to override a
node's identifier and its own topics that it listens on by default.
* cluster: Move publish_hrw() and publish_rr() to cluster.bif (Arne Welzel, Corelight)
From this point on, Cluster::publish_hrw() and Cluster::publish_rr()
go through cluster/Backend.cc code.
7.1.0-dev.745 | 2024-12-10 16:15:57 -0700
* Add btest for unknown_protocols.log (Jan Grashoefer, Corelight)

View file

@ -1490,6 +1490,10 @@ message(
"\n - debugging: ${USE_PERFTOOLS_DEBUG}"
"\njemalloc: ${ENABLE_JEMALLOC}"
"\n"
"\nCluster backends:"
"\n - Broker: ON"
"\n - ZeroMQ: ${ENABLE_CLUSTER_BACKEND_ZEROMQ}"
"\n"
"\nFuzz Targets: ${ZEEK_ENABLE_FUZZERS}"
"\nFuzz Engine: ${ZEEK_FUZZING_ENGINE}"
"${_analyzer_warning}"

24
NEWS
View file

@ -39,6 +39,12 @@ New Functionality
This entire feature can be disabled by loading the new
``policy/protocols/conn/disable-unknown-ip-proto-support.zeek`` policy script.
- New ``Cluster::publish()``, ``Cluster::subscribe()`` and ``Cluster::unsubscribe()``
functions have been added. In contrast to their ``Broker`` counterparts, these
will operator on whichever cluster backend is enabled. Going forward, in-tree
``Broker::publish()`` usages will be replaced with ``Cluster::publish()`` and
script writers should opt to prefer these over the Broker-specific functions.
- Zeek now includes a PostgreSQL protocol analyzer. This analyzer is enabled
by default. The analyzer's events and its ``postgresql.log`` should be
considered preliminary and experimental until the arrival of Zeek's next
@ -148,6 +154,24 @@ New Functionality
This comes with performance caveats: For use-cases with high-data rates
a native protocol analyzer with dedicated events will be far more efficient.
- Experimental support for pluggable cluster backends has been added. New plugin
components have been introduced to support switching Zeek's Broker-based
publish-subscribe and remote logging functionality to alternative implementations.
redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;
Besides the backend, the serialization format used for events and log-writes
has become pluggable as well.
- The Zeek distribution now includes an experimental ZeroMQ based cluster backend.
To experiment with it, load the following script on each cluster node.
@load frameworks/cluster/backend/zeromq/connect
Note that Broker-dependent scripts or integrations will become non-functional
when doing so as Zeek nodes will not listen on Broker ports anymore, nor will
they establish a peering to other nodes.
Changed Functionality
---------------------

View file

@ -1 +1 @@
7.1.0-dev.745
7.1.0-dev.760

View file

@ -2,7 +2,7 @@ FROM alpine:latest
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230823
ENV DOCKERFILE_VERSION 20241024
RUN apk add --no-cache \
bash \
@ -10,6 +10,7 @@ RUN apk add --no-cache \
bsd-compat-headers \
ccache \
cmake \
cppzmq \
curl \
diffutils \
dnsmasq \

View file

@ -2,7 +2,7 @@ FROM quay.io/centos/centos:stream9
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230801
ENV DOCKERFILE_VERSION 20241024
# dnf config-manager isn't available at first, and
# we need it to install the CRB repo below.
@ -22,6 +22,7 @@ RUN dnf -y --nobest install \
bison \
ccache \
cmake \
cppzmq-devel \
diffutils \
flex \
gcc \

View file

@ -4,7 +4,7 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230801
ENV DOCKERFILE_VERSION 20241024
RUN apt-get update && apt-get -y install \
bison \
@ -22,6 +22,7 @@ RUN apt-get update && apt-get -y install \
libpcap-dev \
libssl-dev \
libuv1-dev \
libzmq3-dev \
make \
python3 \
python3-dev \

View file

@ -4,13 +4,14 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230801
ENV DOCKERFILE_VERSION 20241024
RUN apt-get update && apt-get -y install \
bison \
bsdmainutils \
ccache \
cmake \
cppzmq-dev \
curl \
dnsmasq \
flex \

View file

@ -2,12 +2,13 @@ FROM fedora:40
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20240617
ENV DOCKERFILE_VERSION 20241024
RUN dnf -y install \
bison \
ccache \
cmake \
cppzmq-devel \
diffutils \
dnsmasq \
flex \

View file

@ -2,12 +2,13 @@ FROM fedora:41
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20241112
ENV DOCKERFILE_VERSION 20241115
RUN dnf -y install \
bison \
ccache \
cmake \
cppzmq-devel \
diffutils \
findutils \
flex \

View file

@ -6,7 +6,7 @@ set -e
set -x
env ASSUME_ALWAYS_YES=YES pkg bootstrap
pkg install -y bash git cmake swig bison python3 base64 flex ccache jq dnsmasq
pkg install -y bash cppzmq git cmake swig bison python3 base64 flex ccache jq dnsmasq
pkg upgrade -y curl
pyver=$(python3 -c 'import sys; print(f"py{sys.version_info[0]}{sys.version_info[1]}")')
pkg install -y $pyver-sqlite3

View file

@ -7,7 +7,7 @@ set -x
brew update
brew upgrade cmake
brew install openssl@3 swig bison flex ccache libmaxminddb dnsmasq
brew install cppzmq openssl@3 swig bison flex ccache libmaxminddb dnsmasq
if [ $(sw_vers -productVersion | cut -d '.' -f 1) -lt 14 ]; then
python3 -m pip install --upgrade pip

View file

@ -2,7 +2,7 @@ FROM opensuse/leap:15.5
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230905
ENV DOCKERFILE_VERSION 20241024
RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.5:Update/standard/openSUSE:Leap:15.5:Update.repo \
&& zypper refresh \
@ -10,6 +10,7 @@ RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.5
bison \
ccache \
cmake \
cppzmq-devel \
curl \
flex \
gcc12 \

View file

@ -2,7 +2,7 @@ FROM opensuse/leap:15.6
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230905
ENV DOCKERFILE_VERSION 20241024
RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.6:Update/standard/openSUSE:Leap:15.6:Update.repo \
&& zypper refresh \
@ -10,6 +10,7 @@ RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.6
bison \
ccache \
cmake \
cppzmq-devel \
curl \
dnsmasq \
flex \

View file

@ -2,7 +2,7 @@ FROM opensuse/tumbleweed
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230801
ENV DOCKERFILE_VERSION 20241024
# Remove the repo-openh264 repository, it caused intermittent issues
# and we should not be needing any packages from it.
@ -14,6 +14,7 @@ RUN zypper refresh \
bison \
ccache \
cmake \
cppzmq-devel \
curl \
diffutils \
dnsmasq \

View file

@ -46,3 +46,16 @@ deadlock:zeek::threading::Queue<zeek::threading::BasicInputMessage*>::LocksForAl
# This only happens at shutdown. It was supposedly fixed in civetweb, but has cropped
# up again. See https://github.com/civetweb/civetweb/issues/861 for details.
race:mg_stop
# Uninstrumented library.
#
# We'd need to build zmq with TSAN enabled, without it reports data races
# as it doesn't see the synchronization done [1], but also there's reports
# that ZeroMQ uses non-standard synchronization that may be difficult for
# TSAN to see.
#
# [1] https://groups.google.com/g/thread-sanitizer/c/7UZqM02yMYg/m/KlHOv2ckr9sJ
# [2] https://github.com/zeromq/libzmq/issues/3919
#
called_from_lib:libzmq.so.5
called_from_lib:libzmq.so

View file

@ -4,7 +4,7 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20240528
ENV DOCKERFILE_VERSION 20241024
RUN apt-get update && apt-get -y install \
bc \
@ -23,6 +23,7 @@ RUN apt-get update && apt-get -y install \
libmaxminddb-dev \
libpcap-dev \
libssl-dev \
libzmq3-dev \
make \
python3 \
python3-dev \

View file

@ -4,7 +4,7 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20230801
ENV DOCKERFILE_VERSION 20241024
RUN apt-get update && apt-get -y install \
bc \
@ -23,6 +23,7 @@ RUN apt-get update && apt-get -y install \
libmaxminddb-dev \
libpcap-dev \
libssl-dev \
libzmq3-dev \
make \
python3 \
python3-dev \

View file

@ -4,7 +4,7 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20240807
ENV DOCKERFILE_VERSION 20241024
RUN apt-get update && apt-get -y install \
bc \
@ -14,6 +14,7 @@ RUN apt-get update && apt-get -y install \
clang-18 \
clang++-18 \
cmake \
cppzmq-dev \
curl \
dnsmasq \
flex \

View file

@ -4,7 +4,7 @@ ENV DEBIAN_FRONTEND="noninteractive" TZ="America/Los_Angeles"
# A version field to invalidate Cirrus's build cache when needed, as suggested in
# https://github.com/cirruslabs/cirrus-ci-docs/issues/544#issuecomment-566066822
ENV DOCKERFILE_VERSION 20240807
ENV DOCKERFILE_VERSION 20241115
RUN apt-get update && apt-get -y install \
bc \
@ -14,6 +14,7 @@ RUN apt-get update && apt-get -y install \
clang-18 \
clang++-18 \
cmake \
cppzmq-dev \
curl \
dnsmasq \
flex \

4
configure vendored
View file

@ -75,6 +75,7 @@ Usage: $0 [OPTION]... [VAR=VALUE]...
--disable-broker-tests don't try to build Broker unit tests
--disable-btest don't install BTest
--disable-btest-pcaps don't install Zeek's BTest input pcaps
--disable-cluster-backend-zeromq don't build Zeek's ZeroMQ cluster backend
--disable-cpp-tests don't build Zeek's C++ unit tests
--disable-javascript don't build Zeek's JavaScript support
--disable-port-prealloc disable pre-allocating the PortVal array in ValManager
@ -333,6 +334,9 @@ while [ $# -ne 0 ]; do
--disable-btest-pcaps)
append_cache_entry INSTALL_BTEST_PCAPS BOOL false
;;
--disable-cluster-backend-zeromq)
append_cache_entry ENABLE_CLUSTER_BACKEND_ZEROMQ BOOL false
;;
--disable-cpp-tests)
append_cache_entry ENABLE_ZEEK_UNIT_TESTS BOOL false
;;

View file

@ -21,6 +21,7 @@ RUN apt-get -q update \
bison \
ccache \
cmake \
cppzmq-dev \
flex \
g++ \
gcc \

View file

@ -21,13 +21,14 @@ RUN apt-get -q update \
jq \
libmaxminddb0 \
libnode108 \
libpython3.11 \
libpcap0.8 \
libpython3.11 \
libssl3 \
libuv1 \
libz1 \
python3-minimal \
libzmq5 \
python3-git \
python3-minimal \
python3-semantic-version \
python3-websocket \
&& apt-get clean \

View file

@ -242,6 +242,13 @@ export {
## of the cluster that is started up.
const node = getenv("CLUSTER_NODE") &redef;
## Function returning this node's identifier.
##
## By default this is :zeek:see:`Broker::node_id`, but can be
## redefined by other cluster backends. This identifier should be
## a short lived identifier that resets when a node is restarted.
global node_id: function(): string = Broker::node_id &redef;
## Interval for retrying failed connections between cluster nodes.
## If set, the ZEEK_DEFAULT_CONNECT_RETRY (given in number of seconds)
## environment variable overrides this option.
@ -270,7 +277,7 @@ export {
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global node_topic: function(name: string): string;
global node_topic: function(name: string): string &redef;
## Retrieve the topic associated with a specific node in the cluster.
##
@ -279,7 +286,7 @@ export {
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global nodeid_topic: function(id: string): string;
global nodeid_topic: function(id: string): string &redef;
## Retrieve the cluster-level naming of a node based on its node ID,
## a backend-specific identifier.
@ -446,7 +453,7 @@ event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
local e = Broker::make_event(Cluster::hello, node, Cluster::node_id());
Broker::publish(nodeid_topic(endpoint$id), e);
}

View file

@ -94,7 +94,7 @@ event zeek_init() &priority=-10
return;
}
Cluster::subscribe(nodeid_topic(Broker::node_id()));
Cluster::subscribe(nodeid_topic(Cluster::node_id()));
Cluster::subscribe(node_topic(node));

View file

@ -0,0 +1 @@
@load ./main.zeek

View file

@ -0,0 +1,14 @@
##! Establish ZeroMQ connectivity with the broker.
@load ./main
module Cluster::Backend::ZeroMQ;
event zeek_init() &priority=10
{
if ( run_proxy_thread )
Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread();
Cluster::init();
}

View file

@ -0,0 +1,424 @@
##! ZeroMQ cluster backend support.
##!
##! For publish-subscribe functionality, one node in the Zeek cluster spawns a
##! thread running a central broker listening on a XPUB and XSUB socket.
##! These sockets are connected via `zmq_proxy() <https://libzmq.readthedocs.io/en/latest/zmq_proxy.html>`_.
##! All other nodes connect to this central broker with their own XSUB and
##! XPUB sockets, establishing a global many-to-many publish-subscribe system
##! where each node sees subscriptions and messages from all other nodes in a
##! Zeek cluster. ZeroMQ's `publish-subscribe pattern <http://api.zeromq.org/4-2:zmq-socket#toc9>`_
##! documentation may be a good starting point. Elsewhere in ZeroMQ's documentation,
##! the central broker is also called `forwarder <http://api.zeromq.org/4-2:zmq-proxy#toc5>`_.
##!
##! For remote logging functionality, the ZeroMQ `pipeline pattern <http://api.zeromq.org/4-2:zmq-socket#toc14>`_
##! is used. All logger nodes listen on a PULL socket. Other nodes connect
##! via PUSH sockets to all of the loggers. Concretely, remote logging
##! functionality is not publish-subscribe, but instead leverages ZeroMQ's
##! built-in load-balancing functionality provided by PUSH and PULL
##! sockets.
##!
##! The ZeroMQ cluster backend technically allows to run a non-Zeek central
##! broker (it only needs to offer XPUB and XSUB sockets). Further, it is
##! possible to run non-Zeek logger nodes. All a logger node needs to do is
##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes
##! to send their log writes.
module Cluster::Backend::ZeroMQ;
export {
## The central broker's XPUB endpoint to connect to.
##
## A node connects with its XSUB socket to the XPUB socket
## of the central broker.
const connect_xpub_endpoint = "tcp://127.0.0.1:5556" &redef;
## The central broker's XSUB endpoint to connect to.
##
## A node connects with its XPUB socket to the XSUB socket
## of the central broker.
const connect_xsub_endpoint = "tcp://127.0.0.1:5555" &redef;
## Vector of ZeroMQ endpoints to connect to for logging.
##
## A node's PUSH socket used for logging connects to each
## of the ZeroMQ endpoints listed in this vector.
const connect_log_endpoints: vector of string &redef;
## Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.
##
## If set to ``T``, :zeek:see:`Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread`
## is called during :zeek:see:`zeek_init`. The node will listen
## on :zeek:see:`Cluster::Backend::ZeroMQ::listen_xsub_endpoint` and
## :zeek:see:`Cluster::Backend::ZeroMQ::listen_xpub_endpoint` and
## forward subscriptions and messages between nodes.
##
## By default, this is set to ``T`` on the manager and ``F`` elsewhere.
const run_proxy_thread: bool = F &redef;
## XSUB listen endpoint for the central broker.
##
## This setting is used for the XSUB socket of the central broker started
## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
const listen_xsub_endpoint = "tcp://127.0.0.1:5556" &redef;
## XPUB listen endpoint for the central broker.
##
## This setting is used for the XPUB socket of the central broker started
## when :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
const listen_xpub_endpoint = "tcp://127.0.0.1:5555" &redef;
## PULL socket address to listen on for log messages.
##
## If empty, don't listen for log messages, otherwise
## a ZeroMQ address to bind to. E.g., ``tcp://127.0.0.1:5555``.
const listen_log_endpoint = "" &redef;
## Configure the ZeroMQ's sockets linger value.
##
## The default used by libzmq is 30 seconds (30 000) which is very long
## when loggers vanish before workers during a shutdown, so we reduce
## this to 500 milliseconds by default.
##
## A value of ``-1`` configures blocking forever, while ``0`` would
## immediately discard any pending messages.
##
## See ZeroMQ's `ZMQ_LINGER documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc24>`_
## for more details.
const linger_ms: int = 500 &redef;
## Configure ZeroMQ's immedidate setting on PUSH sockets
##
## Setting this to ``T`` will queue log writes only to completed
## connections. By default, log writes are queued to all potential
## endpoints listed in :zeek:see:`Cluster::Backend::ZeroMQ::connect_log_endpoints`.
##
## See ZeroMQ's `ZMQ_IMMEDIATE documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc21>`_
## for more details.
const log_immediate: bool = F &redef;
## Send high water mark value for the log PUSH sockets.
##
## If reached, Zeek nodes will block or drop messages.
##
## See ZeroMQ's `ZMQ_SNDHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc46>`_
## for more details.
##
## TODO: Make action configurable (block vs drop)
const log_sndhwm: int = 1000 &redef;
## Receive high water mark value for the log PULL sockets.
##
## If reached, Zeek workers will block or drop messages.
##
## See ZeroMQ's `ZMQ_RCVHWM documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc35>`_
## for more details.
##
## TODO: Make action configurable (block vs drop)
const log_rcvhwm: int = 1000 &redef;
## Kernel transmit buffer size for log sockets.
##
## Using -1 will use the kernel's default.
##
## See ZeroMQ's `ZMQ_SNDBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc45>`_.
const log_sndbuf: int = -1 &redef;
## Kernel receive buffer size for log sockets.
##
## Using -1 will use the kernel's default.
##
## See ZeroMQ's `ZMQ_RCVBUF documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc34>`_
## for more details.
const log_rcvbuf: int = -1 &redef;
## Do not silently drop messages if high-water-mark is reached.
##
## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket
## to detect when sending a message fails due to reaching
## the high-water-mark.
##
## See ZeroMQ's `ZMQ_XPUB_NODROP documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc61>`_
## for more details.
const xpub_nodrop: bool = T &redef;
## Do not silently drop messages if high-water-mark is reached.
##
## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket
## to detect when sending a message fails due to reaching
## the high-water-mark.
##
## This setting applies to the XPUB/XSUB broker started when
## :zeek:see:`Cluster::Backend::ZeroMQ::run_proxy_thread` is ``T``.
##
## See ZeroMQ's `ZMQ_XPUB_NODROP documentation <http://api.zeromq.org/4-2:zmq-setsockopt#toc61>`_
## for more details.
const listen_xpub_nodrop: bool = T &redef;
## Messages to receive before yielding.
##
## Yield from the receive loop when this many messages have been
## received from one of the used sockets.
const poll_max_messages = 100 &redef;
## Bitmask to enable low-level stderr based debug printing.
##
## poll debugging: 1 (produce verbose zmq::poll() output)
##
## Or values from the above list together and set debug_flags
## to the result. E.g. use 7 to select 4, 2 and 1. Only use this
## in development if something seems off. The thread used internally
## will produce output on stderr.
const debug_flags: count = 0 &redef;
## The node topic prefix to use.
global node_topic_prefix = "zeek.cluster.node" &redef;
## The node_id topic prefix to use.
global nodeid_topic_prefix = "zeek.cluster.nodeid" &redef;
## Low-level event when a subscription is added.
##
## Every node observes all subscriptions from other nodes
## in a cluster through its XPUB socket. Whenever a new
## subscription topic is added, this event is raised with
## the topic.
##
## topic: The topic.
global subscription: event(topic: string);
## Low-level event when a subscription vanishes.
##
## Every node observes all subscriptions from other nodes
## in a cluster through its XPUB socket. Whenever a subscription
## is removed from the local XPUB socket, this event is raised
## with the topic set to the removed subscription.
##
## topic: The topic.
global unsubscription: event(topic: string);
## Low-level event send to a node in response to their subscription.
##
## name: The sending node's name in :zeek:see:`Cluster::nodes`.
##
## id: The sending node's identifier, as generated by :zeek:see:`Cluster::node_id`.
global hello: event(name: string, id: string);
## Expiration for hello state.
##
## How long to wait before expiring information about
## subscriptions and hello messages from other
## nodes. These expirations trigger reporter warnings.
const hello_expiration: interval = 10sec &redef;
}
redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;
# By default, let the manager node run the proxy thread.
redef run_proxy_thread = Cluster::local_node_type() == Cluster::MANAGER;
function zeromq_node_topic(name: string): string {
return node_topic_prefix + "." + name;
}
function zeromq_nodeid_topic(id: string): string {
return nodeid_topic_prefix + "." + id;
}
# Unique identifier for this node with some debug information.
const my_node_id = fmt("zeromq_%s_%s_%s_%s", Cluster::node, gethostname(), getpid(), unique_id("N"));
function zeromq_node_id(): string {
return my_node_id;
}
redef Cluster::node_topic = zeromq_node_topic;
redef Cluster::nodeid_topic = zeromq_nodeid_topic;
redef Cluster::node_id = zeromq_node_id;
redef Cluster::logger_topic = "zeek.cluster.logger";
redef Cluster::manager_topic = "zeek.cluster.manager";
redef Cluster::proxy_topic = "zeek.cluster.proxy";
redef Cluster::worker_topic = "zeek.cluster.worker";
redef Cluster::proxy_pool_spec = Cluster::PoolSpec(
$topic = "zeek.cluster.pool.proxy",
$node_type = Cluster::PROXY);
redef Cluster::logger_pool_spec = Cluster::PoolSpec(
$topic = "zeek.cluster.pool.logger",
$node_type = Cluster::LOGGER);
redef Cluster::worker_pool_spec = Cluster::PoolSpec(
$topic = "zeek.cluster.pool.worker",
$node_type = Cluster::WORKER);
# Configure listen_log_endpoint based on port in cluster-layout, if any.
@if ( Cluster::local_node_type() == Cluster::LOGGER || (Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER) )
const my_node = Cluster::nodes[Cluster::node];
@if ( my_node?$p )
redef listen_log_endpoint = fmt("tcp://%s:%s", my_node$ip, port_to_count(my_node$p));
@endif
@endif
# Populate connect_log_endpoints based on Cluster::nodes on non-logger nodes.
# If you're experimenting with zero-logger clusters, ignore this code and set
# connect_log_endpoints yourself via redef.
event zeek_init() &priority=100
{
if ( Cluster::local_node_type() == Cluster::LOGGER )
return;
if ( Cluster::manager_is_logger && Cluster::local_node_type() == Cluster::MANAGER )
return;
for ( _, node in Cluster::nodes )
{
local endp: string;
if ( node$node_type == Cluster::LOGGER && node?$p )
{
endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p));
connect_log_endpoints += endp;
}
if ( Cluster::manager_is_logger && node$node_type == Cluster::MANAGER && node?$p )
{
endp = fmt("tcp://%s:%s", node$ip, port_to_count(node$p));
connect_log_endpoints += endp;
}
}
# If there's no endpoints configured, but more than a single
# node in cluster layout, log an error as that's probably not
# an intended configuration.
if ( |connect_log_endpoints| == 0 && |Cluster::nodes| > 1 )
Reporter::error("No ZeroMQ connect_log_endpoints configured");
}
function nodeid_subscription_expired(nodeids: set[string], nodeid: string): interval
{
Reporter::warning(fmt("Expired subscription from nodeid %s", nodeid));
return 0.0sec;
}
function nodeid_hello_expired(nodeids: set[string], nodeid: string): interval
{
Reporter::warning(fmt("Expired hello from nodeid %s", nodeid));
return 0.0sec;
}
# State about subscriptions and hellos seen from other nodes.
global nodeid_subscriptions: set[string] &create_expire=hello_expiration &expire_func=nodeid_subscription_expired;
global nodeid_hellos: set[string] &create_expire=hello_expiration &expire_func=nodeid_hello_expired;
# The ZeroMQ plugin notifies script land when a new subscription arrived
# on that node's XPUB socket. If the topic of such a subscription starts with
# the nodeid_topic_prefix for another node A, node B seeing the subscription
# sends ZeroMQ::hello() to the topic, announcing its own presence to node A.
# Conversely, when node A sees the subscription for node B's nodeid topic,
# it also sens ZeroMQ::hello(). In other words, every node says hello to all
# other nodes based on subscriptions they observe on their local XPUB sockets.
#
# Once node B has seen both, the nodeid topic subscription and ZeroMQ::hello()
# event from node A, it raises a Cluster::node_up() event for node A.
#
# See also the Cluster::Backend::ZeroMQ::hello() handler below.
#
# 1) node A subscribes to Cluster::nodeid_topic(Cluster::node_id())
# 2) node B observes subscription for node A's nodeid_topic and replies with ZeroMQ::hello()
# 3) node A receives node B's nodeid_topic subscription, replies with ZeroMQ::hello()
# 4) node B receives node A's ZeroMQ::hello() and raises Cluster::node_up()
# as it has already seen node A's nodeid_topic subscription.
event Cluster::Backend::ZeroMQ::subscription(topic: string)
{
local prefix = nodeid_topic_prefix + ".";
if ( ! starts_with(topic, prefix) )
return;
local nodeid = topic[|prefix|:];
# Do not say hello to ourselves - we won't see it anyhow.
if ( nodeid == Cluster::node_id() )
return;
Cluster::publish(topic, Cluster::Backend::ZeroMQ::hello, Cluster::node, Cluster::node_id());
# If we saw a ZeroMQ::hello from the other node already, send
# it a Cluster::hello.
if ( nodeid in nodeid_hellos )
{
Cluster::publish(Cluster::nodeid_topic(nodeid), Cluster::hello, Cluster::node, Cluster::node_id());
delete nodeid_hellos[nodeid];
}
else
{
add nodeid_subscriptions[nodeid];
}
}
# Receiving ZeroMQ::hello() from another node: If we received a subscription
# for the node's nodeid_topic, reply with a Cluster::hello. If the node never
# properly went away, log a warning and raise a Cluster::node_down() now.
event Cluster::Backend::ZeroMQ::hello(name: string, id: string)
{
if ( name in Cluster::nodes )
{
local n = Cluster::nodes[name];
if ( n?$id )
{
if ( n$id == id )
{
# Duplicate ZeroMQ::hello(), very strange, ignore it.
Reporter::warning(fmt("node '%s' sends ZeroMQ::hello twice (id:%s)",
name, id));
return;
}
Reporter::warning(fmt("node '%s' never said goodbye (old id:%s new id:%s",
name, n$id, id));
# We raise node_down() here for the old instance,
# but it's obviously fake and somewhat lying.
event Cluster::node_down(name, n$id);
}
}
# It is possible to publish Cluster::hello() directly if the nodeid_topic
# subscription for the other node was already seen. Otherwise, remember
# that Cluster::hello() has been seen and send Cluster::hello() in
# subscription processing further up.
if ( id in nodeid_subscriptions )
{
Cluster::publish(Cluster::nodeid_topic(id), Cluster::hello, Cluster::node, Cluster::node_id());
delete nodeid_subscriptions[id];
}
else
{
add nodeid_hellos[id];
}
}
# If the unsubscription is for a nodeid prefix, extract the
# nodeid that's gone, find the name of the node from the
# cluster layout and raise Cluster::node_down().
event Cluster::Backend::ZeroMQ::unsubscription(topic: string)
{
local prefix = nodeid_topic_prefix + ".";
if ( ! starts_with(topic, prefix) )
return;
local gone_node_id = topic[|prefix|:];
local name = "";
for ( node_name, n in Cluster::nodes ) {
if ( n?$id && n$id == gone_node_id ) {
name = node_name;
break;
}
}
if ( name != "" )
event Cluster::node_down(name, gone_node_id);
else
Reporter::warning(fmt("unsubscription of unknown node with id '%s'", gone_node_id));
}

View file

@ -11,6 +11,9 @@
# @load frameworks/control/controllee.zeek
# @load frameworks/control/controller.zeek
@load frameworks/cluster/backend/zeromq/__load__.zeek
# @load frameworks/cluster/backend/zeromq/connect.zeek
@load frameworks/cluster/backend/zeromq/main.zeek
@load frameworks/cluster/experimental.zeek
# Loaded via the above through test-all-policy-cluster.test
# when running as a manager, creates cluster.log entries

View file

@ -2,6 +2,7 @@
# Scripts which are commented out in test-all-policy.zeek.
@load protocols/ssl/decryption.zeek
@load frameworks/cluster/backend/zeromq/connect.zeek
@load frameworks/cluster/nodes-experimental/manager.zeek
@load frameworks/control/controllee.zeek
@load frameworks/control/controller.zeek
@ -28,6 +29,7 @@ event zeek_init() &priority=1000
# fail when run under zeekygen. For the purpose of zeekygen, we could
# probably disable all modules, too.
disable_module_events("Control");
disable_module_events("Cluster::Backend::ZeroMQ");
disable_module_events("Management::Agent::Runtime");
disable_module_events("Management::Controller::Runtime");
disable_module_events("Management::Node");

View file

@ -173,83 +173,3 @@ function Broker::__unsubscribe%(topic_prefix: string%): bool
auto rval = zeek::broker_mgr->Unsubscribe(topic_prefix->CheckString());
return zeek::val_mgr->Bool(rval);
%}
module Cluster;
type Cluster::Pool: record;
## Publishes an event to a node within a pool according to Round-Robin
## distribution strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2),
topic->AsString(), frame);
return zeek::val_mgr->Bool(rval);
%}
## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2),
topic->AsString(), frame);
return zeek::val_mgr->Bool(rval);
%}

View file

@ -136,4 +136,13 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
return zeek::val_mgr->False();
}
bool is_cluster_pool(const zeek::Val* pool) {
static zeek::RecordTypePtr pool_type = nullptr;
if ( ! pool_type )
pool_type = zeek::id::find_type<zeek::RecordType>("Cluster::Pool");
return pool->GetType() == pool_type;
}
} // namespace zeek::cluster::detail::bif

View file

@ -44,6 +44,8 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args);
*/
zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args);
bool is_cluster_pool(const zeek::Val* pool);
} // namespace cluster::detail::bif
} // namespace zeek

View file

@ -11,4 +11,5 @@ zeek_add_subdir_library(
BIFS
cluster.bif)
add_subdirectory(backend)
add_subdirectory(serializer)

View file

@ -0,0 +1,17 @@
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/zeromq/cmake")
find_package(ZeroMQ)
# Default to building ZeroMQ only if ZeroMQ was found.
#
# If a user enabled the cluster backend explicitly (-D ENABLE_CLUSTER_BACKEND_ZEROMQ:bool=ON),
# but ZeroMQ wasn' found, hard bail.
option(ENABLE_CLUSTER_BACKEND_ZEROMQ "Enable the ZeroMQ cluster backend" ${ZeroMQ_FOUND})
if (ENABLE_CLUSTER_BACKEND_ZEROMQ)
if (NOT ZeroMQ_FOUND)
message(FATAL_ERROR "ENABLE_CLUSTER_BACKEND_ZEROMQ set, but ZeroMQ library not available")
endif ()
add_subdirectory(zeromq)
endif ()

View file

@ -0,0 +1,19 @@
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
find_package(ZeroMQ REQUIRED)
zeek_add_plugin(
Zeek
Cluster_Backend_ZeroMQ
INCLUDE_DIRS
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
${ZeroMQ_INCLUDE_DIRS}
DEPENDENCIES
${ZeroMQ_LIBRARIES}
SOURCES
Plugin.cc
ZeroMQ-Proxy.cc
ZeroMQ.cc
BIFS
cluster_backend_zeromq.bif)

View file

@ -0,0 +1,22 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/Component.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ.h"
namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ {
Plugin plugin;
zeek::plugin::Configuration Plugin::Configure() {
AddComponent(new cluster::BackendComponent("ZeroMQ", zeek::cluster::zeromq::ZeroMQBackend::Instantiate));
zeek::plugin::Configuration config;
config.name = "Zeek::Cluster_Backend_ZeroMQ";
config.description = "Cluster backend using ZeroMQ";
return config;
}
} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ

View file

@ -0,0 +1,14 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include "zeek/plugin/Plugin.h"
namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ {
class Plugin : public zeek::plugin::Plugin {
public:
zeek::plugin::Configuration Configure() override;
};
} // namespace zeek::plugin::Zeek_Cluster_Backend_ZeroMQ

View file

@ -0,0 +1,72 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include <zmq.hpp>
#include "zeek/Reporter.h"
#include "zeek/util.h"
using namespace zeek::cluster::zeromq;
namespace {
/**
* Function that runs zmq_proxy() that provides a central XPUB/XSUB
* broker for other Zeek nodes to connect and exchange subscription
* information.
*/
void thread_fun(ProxyThread::Args* args) {
zeek::util::detail::set_thread_name("zmq-proxy-thread");
try {
zmq::proxy(args->xsub, args->xpub, zmq::socket_ref{} /*capture*/);
} catch ( zmq::error_t& err ) {
args->xsub.close();
args->xpub.close();
if ( err.num() != ETERM ) {
std::fprintf(stderr, "[zeromq] unexpected zmq_proxy() error: %s (%d)", err.what(), err.num());
throw;
}
}
}
} // namespace
bool ProxyThread::Start() {
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
try {
xpub.bind(xpub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to bind xpub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
return false;
}
try {
xsub.bind(xsub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to bind xsub socket %s: %s (%d)", xpub_endpoint.c_str(), err.what(), err.num());
return false;
}
args = {.xpub = std::move(xpub), .xsub = std::move(xsub)};
thread = std::thread(thread_fun, &args);
return true;
}
void ProxyThread::Shutdown() {
ctx.shutdown();
if ( thread.joinable() )
thread.join();
ctx.close();
}

View file

@ -0,0 +1,56 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <string>
#include <thread>
#include <zmq.hpp>
// Central XPUB/XSUB proxy.
//
// Spawns a thread that runs zmq_proxy() for a XPUB/XSUB pair.
namespace zeek::cluster::zeromq {
class ProxyThread {
public:
/**
* Constructor.
*
* @param xpub_endpoint the XPUB socket address to listen on.
* @param xsub_endpoint the XSUB socket address to listen on.
* @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket.
*/
ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop)
: xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), xpub_nodrop(xpub_nodrop) {}
~ProxyThread() { Shutdown(); }
/**
* Data kept in object and passed to thread.
*/
struct Args {
zmq::socket_t xpub;
zmq::socket_t xsub;
};
/**
* Bind the sockets and spawn the thread.
*/
bool Start();
/**
* Shutdown the ZeroMQ context and join the thread.
*/
void Shutdown();
private:
zmq::context_t ctx;
std::thread thread;
Args args;
std::string xpub_endpoint;
std::string xsub_endpoint;
int xpub_nodrop = 1;
};
} // namespace zeek::cluster::zeromq

View file

@ -0,0 +1,569 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "ZeroMQ.h"
#include <array>
#include <cerrno>
#include <chrono>
#include <cstddef>
#include <cstdio>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <zmq.hpp>
#include "zeek/DebugLogger.h"
#include "zeek/Event.h"
#include "zeek/EventRegistry.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/Val.h"
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
namespace zeek {
namespace plugin::Zeek_Cluster_Backend_ZeroMQ {
extern zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::Plugin plugin;
}
namespace cluster::zeromq {
enum class DebugFlag : zeek_uint_t {
NONE = 0,
POLL = 1,
};
constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
return static_cast<DebugFlag>(x & static_cast<zeek_uint_t>(y));
}
#define ZEROMQ_DEBUG(...) PLUGIN_DBG_LOG(zeek::plugin::Zeek_Cluster_Backend_ZeroMQ::plugin, __VA_ARGS__)
#define ZEROMQ_THREAD_PRINTF(...) \
do { \
std::fprintf(stderr, "[zeromq] " __VA_ARGS__); \
} while ( 0 )
#define ZEROMQ_DEBUG_THREAD_PRINTF(flag, ...) \
do { \
if ( (debug_flags & flag) == flag ) { \
ZEROMQ_THREAD_PRINTF(__VA_ARGS__); \
} \
} while ( 0 )
namespace {
void self_thread_fun(void* arg) {
auto* self = static_cast<ZeroMQBackend*>(arg);
self->Run();
}
} // namespace
// Constructor.
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls)
: ThreadedBackend(std::move(es), std::move(ls)) {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_push = zmq::socket_t(ctx, zmq::socket_type::push);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
}
void ZeroMQBackend::DoInitPostScript() {
ThreadedBackend::DoInitPostScript();
my_node_id = zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::my_node_id")->ToStdString();
listen_xpub_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xpub_endpoint")->ToStdString();
listen_xsub_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_xsub_endpoint")->ToStdString();
listen_xpub_nodrop =
zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::listen_xpub_nodrop")->AsBool() ? 1 : 0;
connect_xpub_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::connect_xpub_endpoint")->ToStdString();
connect_xsub_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::connect_xsub_endpoint")->ToStdString();
listen_log_endpoint =
zeek::id::find_val<zeek::StringVal>("Cluster::Backend::ZeroMQ::listen_log_endpoint")->ToStdString();
poll_max_messages = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::poll_max_messages")->Get();
debug_flags = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::debug_flags")->Get();
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
main_inproc.bind("inproc://publish-bridge");
child_inproc.connect("inproc://publish-bridge");
}
void ZeroMQBackend::DoTerminate() {
ZEROMQ_DEBUG("Shutting down ctx");
ctx.shutdown();
ZEROMQ_DEBUG("Joining self_thread");
if ( self_thread.joinable() )
self_thread.join();
log_push.close();
log_pull.close();
xsub.close();
xpub.close();
main_inproc.close();
child_inproc.close();
ZEROMQ_DEBUG("Closing ctx");
ctx.close();
// If running the proxy thread, terminate it, too.
if ( proxy_thread ) {
ZEROMQ_DEBUG("Shutting down proxy thread");
proxy_thread->Shutdown();
}
ZEROMQ_DEBUG("Terminated");
}
bool ZeroMQBackend::DoInit() {
auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0;
xpub.set(zmq::sockopt::linger, linger_ms);
xpub.set(zmq::sockopt::xpub_nodrop, xpub_nodrop);
try {
xsub.connect(connect_xsub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("ZeroMQ: Failed to connect to XSUB %s: %s", connect_xsub_endpoint.c_str(), err.what());
return false;
}
try {
xpub.connect(connect_xpub_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("ZeroMQ: Failed to connect to XPUB %s: %s", connect_xpub_endpoint.c_str(), err.what());
return false;
}
auto log_immediate =
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
auto log_sndhwm =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
auto log_sndbuf =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
auto log_rcvhwm =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
auto log_rcvbuf =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf,
log_rcvhwm, log_rcvbuf, linger_ms);
log_push.set(zmq::sockopt::sndhwm, log_sndhwm);
log_push.set(zmq::sockopt::sndbuf, log_sndbuf);
log_push.set(zmq::sockopt::linger, linger_ms);
log_push.set(zmq::sockopt::immediate, log_immediate);
log_pull.set(zmq::sockopt::rcvhwm, log_rcvhwm);
log_pull.set(zmq::sockopt::rcvbuf, log_rcvbuf);
if ( ! listen_log_endpoint.empty() ) {
ZEROMQ_DEBUG("Listening on log pull socket: %s", listen_log_endpoint.c_str());
try {
log_pull.bind(listen_log_endpoint);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("ZeroMQ: Failed to bind to PULL socket %s: %s", listen_log_endpoint.c_str(),
err.what());
return false;
}
}
const auto& log_endpoints = zeek::id::find_val<zeek::VectorVal>("Cluster::Backend::ZeroMQ::connect_log_endpoints");
for ( unsigned int i = 0; i < log_endpoints->Size(); i++ )
connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString());
for ( const auto& endp : connect_log_endpoints ) {
ZEROMQ_DEBUG("Connecting log_push socket with %s", endp.c_str());
try {
log_push.connect(endp);
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("ZeroMQ: Failed to connect to PUSH socket %s: %s", endp.c_str(), err.what());
return false;
}
}
// At this point we've connected xpub/xsub and any logging endpoints.
// However, we cannot tell if we're connected to anything as ZeroMQ does
// not trivially expose this information.
//
// There is the zmq_socket_monitor() API that we could use to get some
// more low-level events in the future for logging and possibly script
// layer eventing: http://api.zeromq.org/4-2:zmq-socket-monitor
// As of now, message processing happens in a separate thread that is
// started below. If we wanted to integrate ZeroMQ as a selectable IO
// source rather than going through ThreadedBackend and its flare, the
// following post might be useful:
//
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
self_thread = std::thread(self_thread_fun, this);
// After connecting, call ThreadedBackend::DoInit() to register
// the IO source with the loop.
return ThreadedBackend::DoInit();
}
bool ZeroMQBackend::SpawnZmqProxyThread() {
proxy_thread = std::make_unique<ProxyThread>(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop);
return proxy_thread->Start();
}
bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string& format,
const cluster::detail::byte_buffer& buf) {
// Publishing an event happens as a multipart message with 4 parts:
//
// * The topic to publish to - this is required by XPUB/XSUB
// * The node's identifier - see Cluster::node_id().
// * The format used to serialize the event.
// * The serialized event itself.
std::array<zmq::const_buffer, 4> parts = {
zmq::const_buffer(topic.data(), topic.size()),
zmq::const_buffer(my_node_id.data(), my_node_id.size()),
zmq::const_buffer(format.data(), format.size()),
zmq::const_buffer(buf.data(), buf.size()),
};
ZEROMQ_DEBUG("Publishing %zu bytes to %s", buf.size(), topic.c_str());
for ( size_t i = 0; i < parts.size(); i++ ) {
zmq::send_flags flags = zmq::send_flags::none;
if ( i < parts.size() - 1 )
flags = flags | zmq::send_flags::sndmore;
// This should never fail, it will instead block
// when HWM is reached. I guess we need to see if
// and how this can happen :-/
main_inproc.send(parts[i], flags);
}
return true;
}
bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) {
ZEROMQ_DEBUG("Subscribing to %s", topic_prefix.c_str());
try {
// Prepend 0x01 byte to indicate subscription to XSUB socket
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x01" + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
return false;
}
return true;
}
bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) {
ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str());
try {
// Prepend 0x00 byte to indicate subscription to XSUB socket.
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x00" + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what());
return false;
}
return true;
}
bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
cluster::detail::byte_buffer& buf) {
ZEROMQ_DEBUG("Publishing %zu bytes of log writes (path %s)", buf.size(), header.path.c_str());
static std::string message_type = "log-write";
// Publishing a log write is done using 4 parts
//
// * A constant "log-write" string
// * The node's identifier - see Cluster::node_id().
// * The format used to serialize the log write.
// * The serialized log write itself.
std::array<zmq::const_buffer, 4> parts = {
zmq::const_buffer{message_type.data(), message_type.size()},
zmq::const_buffer(my_node_id.data(), my_node_id.size()),
zmq::const_buffer{format.data(), format.size()},
zmq::const_buffer{buf.data(), buf.size()},
};
zmq::send_result_t result;
for ( size_t i = 0; i < parts.size(); i++ ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( i < parts.size() - 1 )
flags = flags | zmq::send_flags::sndmore;
result = log_push.send(parts[i], flags);
if ( ! result ) {
// XXX: Not exactly clear what we should do if we reach HWM.
// we could block and hope a logger comes along that empties
// our internal queue, or discard messages and log very loudly
// and have metrics about it. However, this may happen regularly
// at shutdown.
//
// Maybe that should be configurable?
// If no logging endpoints were configured, that almost seems on
// purpose (and there's a warning elsewhere about this), so skip
// logging an error when sending fails.
if ( connect_log_endpoints.empty() )
return true;
reporter->Error("Failed to send log write. HWM reached?");
return false;
}
}
return true;
}
void ZeroMQBackend::Run() {
using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) {
// sender, format, type, payload
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
qmsgs.emplace_back(LogMessage{.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)});
}
QueueForProcessing(std::move(qmsgs));
};
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge to xpub.
for ( auto& msg : msgs ) {
assert(msg.size() == 4);
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result;
do {
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
// XXX: Not sure if the return false is so great here.
//
// Also, if we fail to publish, should we block rather
// than discard?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num());
break;
}
// EAGAIN returns empty result, means try again!
} while ( ! result );
}
}
};
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
continue;
}
// Check if the messages starts with \x00 or \x01 to understand if it's
// a subscription or unsubscription message.
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
if ( first == 0 || first == 1 ) {
QueueMessage qm;
auto* start = msg[0].data<std::byte>() + 1;
auto* end = msg[0].data<std::byte>() + msg[0].size();
detail::byte_buffer topic(start, end);
if ( first == 1 ) {
qm = BackendMessage{1, std::move(topic)};
}
else if ( first == 0 ) {
qm = BackendMessage{0, std::move(topic)};
}
else {
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
continue;
}
qmsgs.emplace_back(std::move(qm));
}
}
QueueForProcessing(std::move(qmsgs));
};
auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
// Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == my_node_id )
continue;
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
qmsgs.emplace_back(EventMessage{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)});
}
QueueForProcessing(std::move(qmsgs));
};
struct SocketInfo {
zmq::socket_ref socket;
std::string name;
std::function<void(std::vector<MultipartMessage>&)> handler;
};
std::vector<SocketInfo> sockets = {
{.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages},
{.socket = xpub, .name = "xpub", .handler = HandleXPubMessages},
{.socket = xsub, .name = "xsub", .handler = HandleXSubMessages},
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
};
std::vector<zmq::pollitem_t> poll_items(sockets.size());
while ( true ) {
for ( size_t i = 0; i < sockets.size(); i++ )
poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR};
// Awkward.
std::vector<std::vector<MultipartMessage>> rcv_messages(sockets.size());
try {
int r = zmq::poll(poll_items, std::chrono::seconds(-1));
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: r=%d", r);
for ( size_t i = 0; i < poll_items.size(); i++ ) {
const auto& item = poll_items[i];
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: items[%lu]=%s %s %s\n", i, sockets[i].name.c_str(),
item.revents & ZMQ_POLLIN ? "pollin " : "",
item.revents & ZMQ_POLLERR ? "err" : "");
if ( item.revents & ZMQ_POLLERR ) {
// What should we be doing? Re-open sockets? Terminate?
ZEROMQ_THREAD_PRINTF("poll: error: POLLERR on socket %zu %s %p revents=%x\n", i,
sockets[i].name.c_str(), item.socket, item.revents);
}
// Nothing to do?
if ( (item.revents & ZMQ_POLLIN) == 0 )
continue;
bool consumed_one = false;
// Read messages from the socket.
do {
zmq::message_t msg;
rcv_messages[i].emplace_back(); // make room for a multipart message
auto& into = rcv_messages[i].back();
// Only receive up to poll_max_messages from an individual
// socket. Move on to the next when exceeded. The last pushed
// message (empty) is popped at the end of the loop.
if ( poll_max_messages > 0 && rcv_messages[i].size() > poll_max_messages ) {
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::POLL, "poll: %s rcv_messages[%zu] full!\n",
sockets[i].name.c_str(), i);
break;
}
consumed_one = false;
bool more = false;
// Read a multi-part message.
do {
auto recv_result = sockets[i].socket.recv(msg, zmq::recv_flags::dontwait);
if ( recv_result ) {
consumed_one = true;
more = msg.more();
into.emplace_back(std::move(msg));
}
else {
// EAGAIN and more flag set? Try again!
if ( more )
continue;
}
} while ( more );
} while ( consumed_one );
assert(rcv_messages[i].back().size() == 0);
rcv_messages[i].pop_back();
}
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
throw;
}
// At this point, we've received anything that was readable from the sockets.
// Now interpret and enqueue it into messages.
for ( size_t i = 0; i < sockets.size(); i++ ) {
if ( rcv_messages[i].empty() )
continue;
sockets[i].handler(rcv_messages[i]);
}
}
}
bool ZeroMQBackend::DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) {
if ( tag == 0 || tag == 1 ) {
std::string topic{reinterpret_cast<const char*>(payload.data()), payload.size()};
zeek::EventHandlerPtr eh = tag == 1 ? event_subscription : event_unsubscription;
ZEROMQ_DEBUG("BackendMessage: %s for %s", eh->Name(), topic.c_str());
zeek::event_mgr.Enqueue(eh, zeek::make_intrusive<zeek::StringVal>(topic));
return true;
}
else {
zeek::reporter->Error("Ignoring bad BackendMessage tag=%d", tag);
return false;
}
}
} // namespace cluster::zeromq
} // namespace zeek

View file

@ -0,0 +1,99 @@
// See the file "COPYING" in the main distribution directory for copyright.
#pragma once
#include <memory>
#include <thread>
#include <zmq.hpp>
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
namespace zeek::cluster::zeromq {
class ZeroMQBackend : public cluster::ThreadedBackend {
public:
/**
* Constructor.
*/
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls);
/**
* Spawns a thread running zmq_proxy() for the configured XPUB/XSUB listen
* sockets. Only one node in a cluster should do this.
*/
bool SpawnZmqProxyThread();
/**
* Run method for background thread.
*/
void Run();
/**
* Component factory.
*/
static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer) {
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer));
}
private:
void DoInitPostScript() override;
bool DoInit() override;
void DoTerminate() override;
bool DoPublishEvent(const std::string& topic, const std::string& format,
const cluster::detail::byte_buffer& buf) override;
bool DoSubscribe(const std::string& topic_prefix) override;
bool DoUnsubscribe(const std::string& topic_prefix) override;
bool DoPublishLogWrites(const logging::detail::LogWriteHeader& header, const std::string& format,
cluster::detail::byte_buffer& buf) override;
const char* Tag() override { return "ZeroMQ"; }
bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) override;
// Script level variables.
std::string my_node_id;
std::string connect_xsub_endpoint;
std::string connect_xpub_endpoint;
std::string listen_xsub_endpoint;
std::string listen_xpub_endpoint;
std::string listen_log_endpoint;
int listen_xpub_nodrop = 1;
zeek_uint_t poll_max_messages = 0;
zeek_uint_t debug_flags = 0;
EventHandlerPtr event_subscription;
EventHandlerPtr event_unsubscription;
zmq::context_t ctx;
zmq::socket_t xsub;
zmq::socket_t xpub;
// inproc sockets used for sending
// publish messages to xpub in a
// thread safe manner.
zmq::socket_t main_inproc;
zmq::socket_t child_inproc;
// Sockets used for logging. The log_push socket connects
// with one or more logger-like nodes. Logger nodes listen
// on the log_pull socket.
std::vector<std::string> connect_log_endpoints;
zmq::socket_t log_push;
zmq::socket_t log_pull;
std::thread self_thread;
std::unique_ptr<ProxyThread> proxy_thread;
};
} // namespace zeek::cluster::zeromq

@ -0,0 +1 @@
Subproject commit c94c20743ed7d4aa37835a5c46567ab0790d4acc

View file

@ -0,0 +1,16 @@
%%{
#include "ZeroMQ.h"
%%}
function Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread%(%): bool
%{
// Spawn the ZeroMQ broker thread.
auto *zeromq_backend = dynamic_cast<zeek::cluster::zeromq::ZeroMQBackend*>(zeek::cluster::backend);
if ( ! zeromq_backend )
{
zeek::emit_builtin_error("Cluster::backend not set to ZeroMQ?");
return zeek::val_mgr->Bool(false);
}
return zeek::val_mgr->Bool(zeromq_backend->SpawnZmqProxyThread());
%}

View file

@ -0,0 +1,50 @@
include(FindPackageHandleStandardArgs)
set(AUXIL_CPPZMQ_DIR ${CMAKE_CURRENT_LIST_DIR}/../auxil/cppzmq)
find_library(ZeroMQ_LIBRARY NAMES zmq HINTS ${ZeroMQ_ROOT_DIR}/lib)
find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.h HINTS ${ZeroMQ_ROOT_DIR}/include)
find_path(ZeroMQ_CPP_INCLUDE_DIR NAMES zmq.hpp HINTS ${ZeroMQ_ROOT_DIR}/include)
function (set_cppzmq_version)
# Extract the version from
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MAJOR_VERSION_H
REGEX "^#define CPPZMQ_VERSION_MAJOR [0-9]+$")
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_MINOR_VERSION_H
REGEX "^#define CPPZMQ_VERSION_MINOR [0-9]+$")
file(STRINGS "${ZeroMQ_CPP_INCLUDE_DIR}/zmq.hpp" CPPZMQ_PATCH_VERSION_H
REGEX "^#define CPPZMQ_VERSION_PATCH [0-9]+$")
string(REGEX REPLACE "^.*MAJOR ([0-9]+)$" "\\1" CPPZMQ_MAJOR_VERSION
"${CPPZMQ_MAJOR_VERSION_H}")
string(REGEX REPLACE "^.*MINOR ([0-9]+)$" "\\1" CPPZMQ_MINOR_VERSION
"${CPPZMQ_MINOR_VERSION_H}")
string(REGEX REPLACE "^.*PATCH ([0-9]+)$" "\\1" CPPZMQ_PATCH_VERSION
"${CPPZMQ_PATCH_VERSION_H}")
set(ZeroMQ_CPP_VERSION "${CPPZMQ_MAJOR_VERSION}.${CPPZMQ_MINOR_VERSION}.${CPPZMQ_PATCH_VERSION}"
PARENT_SCOPE)
endfunction ()
if (ZeroMQ_CPP_INCLUDE_DIR)
set_cppzmq_version()
endif ()
if (NOT ZeroMQ_CPP_VERSION)
# Probably no zmq.hpp file, use the version from auxil
set(ZeroMQ_CPP_INCLUDE_DIR ${AUXIL_CPPZMQ_DIR} CACHE FILEPATH "Include path for cppzmq" FORCE)
set_cppzmq_version()
elseif (ZeroMQ_CPP_VERSION VERSION_LESS "4.9.0")
message(STATUS "Found old cppzmq version ${ZeroMQ_CPP_VERSION}, using bundled version")
set(ZeroMQ_CPP_INCLUDE_DIR ${AUXIL_CPPZMQ_DIR} CACHE FILEPATH "Include path for cppzmq" FORCE)
set_cppzmq_version()
endif ()
find_package_handle_standard_args(
ZeroMQ FOUND_VAR ZeroMQ_FOUND REQUIRED_VARS ZeroMQ_LIBRARY ZeroMQ_INCLUDE_DIR
ZeroMQ_CPP_INCLUDE_DIR ZeroMQ_CPP_VERSION)
set(ZeroMQ_LIBRARIES ${ZeroMQ_LIBRARY})
set(ZeroMQ_INCLUDE_DIRS ${ZeroMQ_INCLUDE_DIR} ${ZeroMQ_CPP_INCLUDE_DIR})
set(ZeroMQ_FOUND ${ZeroMQ_FOUND})

View file

@ -69,3 +69,81 @@ function Cluster::Backend::__init%(%): bool
auto rval = zeek::cluster::backend->Init();
return zeek::val_mgr->Bool(rval);
%}
type Cluster::Pool: record;
## Publishes an event to a node within a pool according to Round-Robin
## distribution strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: Either the event arguments as already made by
## :zeek:see:`Cluster::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2);
return publish_event(topic, args);
%}
## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2);
ScriptLocationScope scope{frame};
return publish_event(topic, args);
%}

View file

@ -72,6 +72,7 @@ static std::unordered_map<std::string, unsigned int> func_attrs = {
{"Analyzer::__register_for_port", ATTR_NO_SCRIPT_SIDE_EFFECTS},
{"Analyzer::__schedule_analyzer", ATTR_NO_SCRIPT_SIDE_EFFECTS},
{"Analyzer::__tag", ATTR_FOLDABLE},
{"Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread", ATTR_NO_SCRIPT_SIDE_EFFECTS},
{"Cluster::Backend::__init", ATTR_NO_SCRIPT_SIDE_EFFECTS},
{"Cluster::__subscribe", ATTR_NO_SCRIPT_SIDE_EFFECTS},
{"Cluster::__unsubscribe", ATTR_NO_SCRIPT_SIDE_EFFECTS},

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, <internal>::#0))
error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, <internal>::#0))
error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, <internal>::#0))

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent()))

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Broker::make_event with Cluster::publish()
r=, T
Broker::make_event with Cluster::publish_hrw()
r=, T
Broker::make_event with Cluster::publish_rr()
r=, T
Cluster::publish() with wrong event
r=, F
Cluster::publish_hrw() with wrong event
r=, F
Cluster::publish_rr() with wrong event
r=, F

View file

@ -0,0 +1,7 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/cluster-publish-errors.zeek, line 30: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish(topic, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 39: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 47: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent()))

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Broker::make_event with Cluster::publish()
r=, F
Broker::make_event with Cluster::publish_hrw()
r=, F
Broker::make_event with Cluster::publish_rr()
r=, F
Cluster::publish() with wrong event
r=, F
Cluster::publish_hrw() with wrong event
r=, F
Cluster::publish_rr() with wrong event
r=, F

View file

@ -0,0 +1,201 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
0 worker-1
0 worker-2
1 worker-1
1 worker-2
2 worker-1
2 worker-2
3 worker-1
3 worker-2
4 worker-1
4 worker-2
5 worker-1
5 worker-2
6 worker-1
6 worker-2
7 worker-1
7 worker-2
8 worker-1
8 worker-2
9 worker-1
9 worker-2
10 worker-1
10 worker-2
11 worker-1
11 worker-2
12 worker-1
12 worker-2
13 worker-1
13 worker-2
14 worker-1
14 worker-2
15 worker-1
15 worker-2
16 worker-1
16 worker-2
17 worker-1
17 worker-2
18 worker-1
18 worker-2
19 worker-1
19 worker-2
20 worker-1
20 worker-2
21 worker-1
21 worker-2
22 worker-1
22 worker-2
23 worker-1
23 worker-2
24 worker-1
24 worker-2
25 worker-1
25 worker-2
26 worker-1
26 worker-2
27 worker-1
27 worker-2
28 worker-1
28 worker-2
29 worker-1
29 worker-2
30 worker-1
30 worker-2
31 worker-1
31 worker-2
32 worker-1
32 worker-2
33 worker-1
33 worker-2
34 worker-1
34 worker-2
35 worker-1
35 worker-2
36 worker-1
36 worker-2
37 worker-1
37 worker-2
38 worker-1
38 worker-2
39 worker-1
39 worker-2
40 worker-1
40 worker-2
41 worker-1
41 worker-2
42 worker-1
42 worker-2
43 worker-1
43 worker-2
44 worker-1
44 worker-2
45 worker-1
45 worker-2
46 worker-1
46 worker-2
47 worker-1
47 worker-2
48 worker-1
48 worker-2
49 worker-1
49 worker-2
50 worker-1
50 worker-2
51 worker-1
51 worker-2
52 worker-1
52 worker-2
53 worker-1
53 worker-2
54 worker-1
54 worker-2
55 worker-1
55 worker-2
56 worker-1
56 worker-2
57 worker-1
57 worker-2
58 worker-1
58 worker-2
59 worker-1
59 worker-2
60 worker-1
60 worker-2
61 worker-1
61 worker-2
62 worker-1
62 worker-2
63 worker-1
63 worker-2
64 worker-1
64 worker-2
65 worker-1
65 worker-2
66 worker-1
66 worker-2
67 worker-1
67 worker-2
68 worker-1
68 worker-2
69 worker-1
69 worker-2
70 worker-1
70 worker-2
71 worker-1
71 worker-2
72 worker-1
72 worker-2
73 worker-1
73 worker-2
74 worker-1
74 worker-2
75 worker-1
75 worker-2
76 worker-1
76 worker-2
77 worker-1
77 worker-2
78 worker-1
78 worker-2
79 worker-1
79 worker-2
80 worker-1
80 worker-2
81 worker-1
81 worker-2
82 worker-1
82 worker-2
83 worker-1
83 worker-2
84 worker-1
84 worker-2
85 worker-1
85 worker-2
86 worker-1
86 worker-2
87 worker-1
87 worker-2
88 worker-1
88 worker-2
89 worker-1
89 worker-2
90 worker-1
90 worker-2
91 worker-1
91 worker-2
92 worker-1
92 worker-2
93 worker-1
93 worker-2
94 worker-1
94 worker-2
95 worker-1
95 worker-2
96 worker-1
96 worker-2
97 worker-1
97 worker-2
98 worker-1
98 worker-2
99 worker-1
99 worker-2

View file

@ -0,0 +1,42 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got pong, 0, args, worker-2, args
got pong, 0, args, worker-2, make_event
got pong, 0, make_event, worker-2, args
got pong, 0, make_event, worker-2, make_event
got pong, 1, args, worker-1, args
got pong, 1, args, worker-1, make_event
got pong, 1, make_event, worker-1, args
got pong, 1, make_event, worker-1, make_event
got pong, 2, args, worker-2, args
got pong, 2, args, worker-2, make_event
got pong, 2, make_event, worker-2, args
got pong, 2, make_event, worker-2, make_event
got pong, 3, args, worker-1, args
got pong, 3, args, worker-1, make_event
got pong, 3, make_event, worker-1, args
got pong, 3, make_event, worker-1, make_event
got pong, 4, args, worker-1, args
got pong, 4, args, worker-1, make_event
got pong, 4, make_event, worker-1, args
got pong, 4, make_event, worker-1, make_event
got pong, 5, args, worker-2, args
got pong, 5, args, worker-2, make_event
got pong, 5, make_event, worker-2, args
got pong, 5, make_event, worker-2, make_event
got pong, 6, args, worker-1, args
got pong, 6, args, worker-1, make_event
got pong, 6, make_event, worker-1, args
got pong, 6, make_event, worker-1, make_event
got pong, 7, args, worker-2, args
got pong, 7, args, worker-2, make_event
got pong, 7, make_event, worker-2, args
got pong, 7, make_event, worker-2, make_event
got pong, 8, args, worker-1, args
got pong, 8, args, worker-1, make_event
got pong, 8, make_event, worker-1, args
got pong, 8, make_event, worker-1, make_event
got pong, 9, args, worker-1, args
got pong, 9, args, worker-1, make_event
got pong, 9, make_event, worker-1, args
got pong, 9, make_event, worker-1, make_event
have 40, finish!

View file

@ -0,0 +1,16 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
a node_up, manager
a node_up, worker-2
got ping, 1, args
got ping, 1, make_event
got ping, 3, args
got ping, 3, make_event
got ping, 4, args
got ping, 4, make_event
got ping, 6, args
got ping, 6, make_event
got ping, 8, args
got ping, 8, make_event
got ping, 9, args
got ping, 9, make_event
z got finish!

View file

@ -0,0 +1,12 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
a node_up, manager
a node_up, worker-1
got ping, 0, args
got ping, 0, make_event
got ping, 2, args
got ping, 2, make_event
got ping, 5, args
got ping, 5, make_event
got ping, 7, args
got ping, 7, make_event
z got finish!

View file

@ -0,0 +1,42 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got pong, 0, args, worker-1, args
got pong, 0, args, worker-1, make_event
got pong, 0, make_event, worker-1, args
got pong, 0, make_event, worker-1, make_event
got pong, 1, args, worker-2, args
got pong, 1, args, worker-2, make_event
got pong, 1, make_event, worker-2, args
got pong, 1, make_event, worker-2, make_event
got pong, 2, args, worker-1, args
got pong, 2, args, worker-1, make_event
got pong, 2, make_event, worker-1, args
got pong, 2, make_event, worker-1, make_event
got pong, 3, args, worker-2, args
got pong, 3, args, worker-2, make_event
got pong, 3, make_event, worker-2, args
got pong, 3, make_event, worker-2, make_event
got pong, 4, args, worker-1, args
got pong, 4, args, worker-1, make_event
got pong, 4, make_event, worker-1, args
got pong, 4, make_event, worker-1, make_event
got pong, 5, args, worker-2, args
got pong, 5, args, worker-2, make_event
got pong, 5, make_event, worker-2, args
got pong, 5, make_event, worker-2, make_event
got pong, 6, args, worker-1, args
got pong, 6, args, worker-1, make_event
got pong, 6, make_event, worker-1, args
got pong, 6, make_event, worker-1, make_event
got pong, 7, args, worker-2, args
got pong, 7, args, worker-2, make_event
got pong, 7, make_event, worker-2, args
got pong, 7, make_event, worker-2, make_event
got pong, 8, args, worker-1, args
got pong, 8, args, worker-1, make_event
got pong, 8, make_event, worker-1, args
got pong, 8, make_event, worker-1, make_event
got pong, 9, args, worker-2, args
got pong, 9, args, worker-2, make_event
got pong, 9, make_event, worker-2, args
got pong, 9, make_event, worker-2, make_event
have 40, finish!

View file

@ -0,0 +1,14 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
a node_up, manager
a node_up, worker-2
got ping, 0, args
got ping, 0, make_event
got ping, 2, args
got ping, 2, make_event
got ping, 4, args
got ping, 4, make_event
got ping, 6, args
got ping, 6, make_event
got ping, 8, args
got ping, 8, make_event
z got finish!

View file

@ -0,0 +1,14 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
a node_up, manager
a node_up, worker-1
got ping, 1, args
got ping, 1, make_event
got ping, 3, args
got ping, 3, make_event
got ping, 5, args
got ping, 5, make_event
got ping, 7, args
got ping, 7, make_event
got ping, 9, args
got ping, 9, make_event
z got finish!

View file

@ -0,0 +1,21 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)

View file

@ -0,0 +1,16 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A zeek_init, manager
B node_up, logger
B node_up, proxy
B node_up, worker-1
B node_up, worker-2
B nodes_up, 2
B nodes_up, 3
B nodes_up, 4
B nodes_up, 5
C send_finish
D node_down, logger
D node_down, proxy
D node_down, worker-1
D node_down, worker-2
D send_finish to logger

View file

@ -0,0 +1,21 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
logger manager
logger proxy
logger worker-1
logger worker-2
manager logger
manager proxy
manager worker-1
manager worker-2
proxy logger
proxy manager
proxy worker-1
proxy worker-2
worker-1 logger
worker-1 manager
worker-1 proxy
worker-1 worker-2
worker-2 logger
worker-2 manager
worker-2 proxy
worker-2 worker-1

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)

View file

@ -0,0 +1,11 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
A node_up, proxy
A node_up, worker-1
A node_up, worker-2
B nodes_up, 2
B nodes_up, 3
B nodes_up, 4
D node_down, proxy
D node_down, worker-1
D node_down, worker-2
zeek_init, manager

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
manager proxy
manager worker-1
manager worker-2
proxy manager
proxy worker-1
proxy worker-2
worker-1 manager
worker-1 proxy
worker-1 worker-2
worker-2 manager
worker-2 proxy
worker-2 worker-1

View file

@ -0,0 +1,21 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
logger got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
logger got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
manager got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)
proxy got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-1 got hello from worker-2 (zeromq_worker-2_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from logger (zeromq_logger_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from manager (zeromq_manager_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from proxy (zeromq_proxy_<hostname>_<pid>_NrFj3eGxkRR5)
worker-2 got hello from worker-1 (zeromq_worker-1_<hostname>_<pid>_NrFj3eGxkRR5)

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node_up, worker-1
node_down, worker-1

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
node_up, manager

View file

@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek
build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek
build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek
build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek

View file

@ -259,6 +259,7 @@ scripts/base/init-frameworks-and-bifs.zeek
build/scripts/base/bif/plugins/Zeek_WebSocket.functions.bif.zeek
build/scripts/base/bif/plugins/Zeek_WebSocket.types.bif.zeek
build/scripts/base/bif/plugins/Zeek_XMPP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
build/scripts/base/bif/plugins/Zeek_ARP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_UDP.events.bif.zeek
build/scripts/base/bif/plugins/Zeek_ICMP.events.bif.zeek

View file

@ -1,5 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal
received termination signal
received termination signal
received termination signal

View file

@ -1,2 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
541 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()
542 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()

View file

@ -337,6 +337,7 @@
0.000000 MetaHookPost LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> -1
0.000000 MetaHookPost LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> -1
@ -643,6 +644,7 @@
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek) -> (-1, <no content>)
0.000000 MetaHookPost LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek) -> (-1, <no content>)
@ -1281,6 +1283,7 @@
0.000000 MetaHookPre LoadFile(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek)
0.000000 MetaHookPre LoadFile(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek)
@ -1587,6 +1590,7 @@
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BenchmarkReader.benchmark.bif.zeek, <...>/Zeek_BenchmarkReader.benchmark.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BinaryReader.binary.bif.zeek, <...>/Zeek_BinaryReader.binary.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_BitTorrent.events.bif.zeek, <...>/Zeek_BitTorrent.events.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek, <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConfigReader.config.bif.zeek, <...>/Zeek_ConfigReader.config.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.events.bif.zeek, <...>/Zeek_ConnSize.events.bif.zeek)
0.000000 MetaHookPre LoadFileExtended(0, ./Zeek_ConnSize.functions.bif.zeek, <...>/Zeek_ConnSize.functions.bif.zeek)
@ -2224,6 +2228,7 @@
0.000000 | HookLoadFile ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek
0.000000 | HookLoadFile ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek
0.000000 | HookLoadFile ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek
0.000000 | HookLoadFile ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
0.000000 | HookLoadFile ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek
0.000000 | HookLoadFile ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek
0.000000 | HookLoadFile ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek
@ -2530,6 +2535,7 @@
0.000000 | HookLoadFileExtended ./Zeek_BenchmarkReader.benchmark.bif.zeek <...>/Zeek_BenchmarkReader.benchmark.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_BinaryReader.binary.bif.zeek <...>/Zeek_BinaryReader.binary.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_BitTorrent.events.bif.zeek <...>/Zeek_BitTorrent.events.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek <...>/Zeek_Cluster_Backend_ZeroMQ.cluster_backend_zeromq.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_ConfigReader.config.bif.zeek <...>/Zeek_ConfigReader.config.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_ConnSize.events.bif.zeek <...>/Zeek_ConnSize.events.bif.zeek
0.000000 | HookLoadFileExtended ./Zeek_ConnSize.functions.bif.zeek <...>/Zeek_ConnSize.functions.bif.zeek

View file

@ -0,0 +1,8 @@
redef Cluster::manager_is_logger = T;
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))],
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
};

View file

@ -0,0 +1,9 @@
redef Cluster::manager_is_logger = F;
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
["logger"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))],
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
};

View file

@ -0,0 +1,10 @@
redef Cluster::manager_is_logger = F;
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1],
["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_1"))],
["logger-2"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT_2"))],
["proxy"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1],
};

View file

@ -0,0 +1,11 @@
# Helper scripts for test expecting XPUB/XSUB ports allocated by
# btest and configuring the ZeroMQ globals.
@load base/utils/numbers
@load frameworks/cluster/backend/zeromq
@load frameworks/cluster/backend/zeromq/connect
redef Cluster::Backend::ZeroMQ::listen_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT")));
redef Cluster::Backend::ZeroMQ::listen_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT")));
redef Cluster::Backend::ZeroMQ::connect_xpub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XSUB_PORT")));
redef Cluster::Backend::ZeroMQ::connect_xsub_endpoint = fmt("tcp://127.0.0.1:%s", extract_count(getenv("XPUB_PORT")));

View file

@ -0,0 +1,74 @@
# @TEST-DOC: Test errors of cluster bifs
#
# @TEST-EXEC: zeek --parse-only -b %INPUT
# @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
module Cluster;
event ping1(c: count, how: string) &is_used
{
}
hook hook1(c: count, how: string) &is_used
{
}
event zeek_init()
{
# Fake the pool!
init_pool_node(Cluster::proxy_pool, "proxy-1");
mark_pool_node_alive(Cluster::proxy_pool, "proxy-1");
}
event zeek_init() &priority=-1
{
print "Broker::make_event with Cluster::publish()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish("topic", be);
print "r=", r;
}
event zeek_init() &priority=-2
{
print "Broker::make_event with Cluster::publish_hrw()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", be);
print "r=", r;
}
event zeek_init() &priority=-3
{
print "Broker::make_event with Cluster::publish_rr()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish_rr(Cluster::proxy_pool, "key", be);
print "r=", r;
}
type MyEvent: record {
x: count &default=1;
};
event zeek_init() &priority=-4
{
print "Cluster::publish() with wrong event";
local r = Cluster::publish("topic", MyEvent());
print "r=", r;
}
event zeek_init() &priority=-4
{
print "Cluster::publish_hrw() with wrong event";
local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", MyEvent());
print "r=", r;
}
event zeek_init() &priority=-4
{
print "Cluster::publish_rr() with wrong event";
local r = Cluster::publish_rr(Cluster::proxy_pool, "key", MyEvent());
print "r=", r;
}

View file

@ -0,0 +1,177 @@
# @TEST-DOC: Testing round-robin of Log::write() across two loggers.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT_1
# @TEST-PORT: LOG_PULL_PORT_2
#
# @TEST-EXEC: chmod +x ./check-log.sh
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-two-loggers.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run logger-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=logger-1 zeek -b ../common.zeek >out"
# @TEST-EXEC: btest-bg-run logger-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=logger-2 zeek -b ../common.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 10
#
# @TEST-EXEC: test $(grep -c worker-1 logger-1/rr2.log) -gt 10
# @TEST-EXEC: test $(grep -c worker-2 logger-1/rr2.log) -gt 10
# @TEST-EXEC: test $(grep -c worker-1 logger-2/rr2.log) -gt 10
# @TEST-EXEC: test $(grep -c worker-2 logger-2/rr2.log) -gt 10
# @TEST-EXEC: zeek-cut < logger-1/rr2.log > rr2.log
# @TEST-EXEC: zeek-cut < logger-2/rr2.log >> rr2.log
# @TEST-EXEC: sort -n rr2.log > rr2.log.sorted
# @TEST-EXEC: btest-diff rr2.log.sorted
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap.zeek
redef Log::default_rotation_interval = 0sec;
redef Log::flush_interval = 0.03sec;
redef Log::write_buffer_size = 7;
module LogRR;
export {
redef enum Log::ID += { LOG1, LOG2 };
type Info: record {
c: count &log;
from: string &log &default=Cluster::node;
};
global go: event();
global finish: event();
}
event zeek_init()
{
Log::create_stream(LOG1, [$columns=Info, $path="rr1"]);
Log::create_stream(LOG2, [$columns=Info, $path="rr2"]);
}
event finish()
{
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
event check_ready()
{
if ( ! piped_exec("../check-log.sh", "") )
{
Reporter::error("check-log.sh failed");
terminate();
}
if ( file_size("DONE") >= 0 )
{
Cluster::publish(Cluster::worker_topic, LogRR::go);
return;
}
schedule 0.1sec { check_ready() };
}
event zeek_init()
{
event check_ready();
}
global nodes_down: set[string];
event Cluster::node_down(name: string, id: string)
{
print current_time(), "node_down", name;
add nodes_down[name];
if ( |nodes_down| == 2 ) # workers down
Cluster::publish(Cluster::logger_topic, LogRR::finish);
if ( |nodes_down| == 4 ) # both loggers down
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
global do_write2 = F;
event write_log1(c: count)
{
if ( do_write2 )
{
Log::write(LogRR::LOG1, [$c=10000000]);
return;
}
Log::write(LogRR::LOG1, [$c=c]);
Log::flush(LogRR::LOG1);
schedule 0.05sec { write_log1(++c) };
}
event write_log2(c: count)
{
if ( c == 100 )
{
terminate();
return;
}
Log::write(LogRR::LOG2, [$c=c]);
schedule 0.012sec { write_log2(++c) };
}
event LogRR::go()
{
do_write2 = T;
event write_log2(0);
}
event zeek_init()
{
event write_log1(0);
}
# @TEST-END-FILE
@TEST-START-FILE check-log.sh
#!/usr/bin/env bash
#
# This script regularly checks for the loggers rr1.log file until
# both workers appear. Once this happens, creates a READY file
# which will result in workers getting the "go" and sending writes
# to rr2.log
set -eux
LOGGERS="logger-1 logger-2"
WORKERS="worker-1 worker-2"
for logger in $LOGGERS; do
for worker in $WORKERS; do
date +%s
echo check $logger $worker
if ! grep -q "${worker}" ../${logger}/rr1.log; then
exit 0
fi
done
done
echo "DONE"
echo "DONE" > DONE
exit 0
@TEST-END-FILE

View file

@ -0,0 +1,101 @@
# @TEST-DOC: Send ping/pong using publish_hrw(), publish() and make_event()
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: sort < ./manager/out > ./manager.sorted
# @TEST-EXEC: sort < ./worker-1/out > ./worker-1.sorted
# @TEST-EXEC: sort < ./worker-2/out > ./worker-2.sorted
# @TEST-EXEC: btest-diff manager.sorted
# @TEST-EXEC: btest-diff worker-1.sorted
# @TEST-EXEC: btest-diff worker-2.sorted
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap.zeek
global finish: event();
global ping: event(c: count, how: string);
global pong: event(c: count, how: string, from: string, from_how: string);
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_up: set[string];
global nodes_down: set[string];
global pongs: set[count, string, string, string];
global i = 0;
event send_hrw()
{
if (i >= 10 )
return;
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, "args");
local e = Cluster::make_event(ping, i, "make_event");
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
++i;
schedule 0.01sec { send_hrw() };
}
event pong(c: count, how: string, from: string, from_how: string)
{
print "got pong", c, how, from, from_how;
add pongs[c, how, from, from_how];
if ( |pongs| == 40 )
{
print "have 40, finish!";
Cluster::publish(Cluster::worker_topic, finish);
}
}
event Cluster::node_up(name: string, id: string) {
add nodes_up[name];
if ( |nodes_up| == 2 ) {
event send_hrw();
}
}
event Cluster::node_down(name: string, id: string) {
add nodes_down[name];
if ( |nodes_down| == 2 )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event ping(c: count, how: string) {
print "got ping", c, how;
Cluster::publish(Cluster::manager_topic, pong, c, how, Cluster::node, "args");
local e = Cluster::make_event(pong, c, how, Cluster::node, "make_event");
Cluster::publish(Cluster::manager_topic, e);
}
event Cluster::node_up(name: string, id: string) {
print "a node_up", name;
}
event finish() &is_used {
print "z got finish!";
terminate();
}
# @TEST-END-FILE

View file

@ -0,0 +1,101 @@
# @TEST-DOC: Send ping/pong using publish_rr(), publish() and make_event()
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: sort < ./manager/out > ./manager.sorted
# @TEST-EXEC: sort < ./worker-1/out > ./worker-1.sorted
# @TEST-EXEC: sort < ./worker-2/out > ./worker-2.sorted
# @TEST-EXEC: btest-diff manager.sorted
# @TEST-EXEC: btest-diff worker-1.sorted
# @TEST-EXEC: btest-diff worker-2.sorted
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap.zeek
global finish: event();
global ping: event(c: count, how: string);
global pong: event(c: count, how: string, from: string, from_how: string);
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_up: set[string];
global nodes_down: set[string];
global pongs: set[count, string, string, string];
global i = 0;
event send_rr()
{
if (i >= 10 )
return;
Cluster::publish_rr(Cluster::worker_pool, "ping-key-args", ping, i, "args");
local e = Cluster::make_event(ping, i, "make_event");
Cluster::publish_rr(Cluster::worker_pool, "ping-key-event", e);
++i;
schedule 0.01sec { send_rr() };
}
event pong(c: count, how: string, from: string, from_how: string)
{
print "got pong", c, how, from, from_how;
add pongs[c, how, from, from_how];
if ( |pongs| == 40 )
{
print "have 40, finish!";
Cluster::publish(Cluster::worker_topic, finish);
}
}
event Cluster::node_up(name: string, id: string) {
add nodes_up[name];
if ( |nodes_up| == 2 ) {
event send_rr();
}
}
event Cluster::node_down(name: string, id: string) {
add nodes_down[name];
if ( |nodes_down| == 2 )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event ping(c: count, how: string) {
print "got ping", c, how;
Cluster::publish(Cluster::manager_topic, pong, c, how, Cluster::node, "args");
local e = Cluster::make_event(pong, c, how, Cluster::node, "make_event");
Cluster::publish(Cluster::manager_topic, e);
}
event Cluster::node_up(name: string, id: string) {
print "a node_up", name;
}
event finish() &is_used {
print "z got finish!";
terminate();
}
# @TEST-END-FILE

View file

@ -0,0 +1,139 @@
# @TEST-DOC: Startup a ZeroMQ cluster by hand, testing basic logging and node_up and node_down events.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run logger "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=logger zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff cluster.log.normalized
# @TEST-EXEC: zeek-cut -F ' ' < ./logger/node_up.log | sort > node_up.sorted
# @TEST-EXEC: btest-diff node_up.sorted
# @TEST-EXEC: sort manager/out > manager.out
# @TEST-EXEC: btest-diff manager.out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
redef Log::default_rotation_interval = 0sec;
redef Log::flush_interval = 0.01sec;
type Info: record {
self: string &log &default=Cluster::node;
node: string &log;
};
redef enum Log::ID += { TEST_LOG };
global finish: event(name: string) &is_used;
event zeek_init() {
print "A zeek_init", Cluster::node;
Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]);
}
event Cluster::node_up(name: string, id: string) &priority=-5 {
print "B node_up", name;
Log::write(TEST_LOG, [$node=name]);
# Log::flush(TEST_LOG);
# Log::flush(Cluster::LOG);
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_up: set[string] = {"manager"};
global nodes_down: set[string] = {"manager"};
event send_finish() {
print "C send_finish";
for ( n in nodes_up )
if ( n != "logger" )
Cluster::publish(Cluster::node_topic(n), finish, Cluster::node);
}
event check_cluster_log() {
if ( file_size("DONE") >= 0 ) {
event send_finish();
return;
}
system("../check-cluster-log.sh");
schedule 0.1sec { check_cluster_log() };
}
event zeek_init() {
schedule 0.1sec { check_cluster_log() };
}
event Cluster::node_up(name: string, id: string) &priority=-1 {
add nodes_up[name];
print "B nodes_up", |nodes_up|;
}
event Cluster::node_down(name: string, id: string) {
print "D node_down", name;
add nodes_down[name];
if ( |nodes_down| == |Cluster::nodes| - 1 ) {
print "D send_finish to logger";
Cluster::publish(Cluster::node_topic("logger"), finish, Cluster::node);
}
if ( |nodes_down| == |Cluster::nodes| )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE other.zeek
@load ./common.zeek
event finish(name: string) {
print fmt("finish from %s", name);
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE check-cluster-log.sh
#!/bin/sh
#
# This script checks logger/cluster.log until the expected number
# of log entries have been observed and puts a normalized version
# into the testing directory for baselining.
CLUSTER_LOG=../logger/cluster.log
if [ ! -f $CLUSTER_LOG ]; then
echo "$CLUSTER_LOG not found!" >&2
exit 1;
fi
if [ -f DONE ]; then
exit 0
fi
# Remove hostname and pid from node id in message.
zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log.tmp
# 4 times 5
if [ $(wc -l < cluster.log.tmp) = 20 ]; then
echo "DONE!" >&2
mv cluster.log.tmp ../cluster.log.normalized
echo "DONE" > DONE
fi
exit 0
# @TEST-END-FILE

View file

@ -0,0 +1,129 @@
# @TEST-DOC: Startup a ZeroMQ cluster without a logger, testing logging through the manager.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run proxy "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=proxy zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../other.zeek >out"
# @TEST-EXEC: btest-bg-run worker-2 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-2 zeek -b ../other.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff cluster.log.normalized
# @TEST-EXEC: zeek-cut -F ' ' < ./manager/node_up.log | sort > node_up.sorted
# @TEST-EXEC: btest-diff node_up.sorted
# @TEST-EXEC: sort manager/out > manager.out
# @TEST-EXEC: btest-diff manager.out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
redef Log::default_rotation_interval = 0sec;
redef Log::flush_interval = 0.01sec;
type Info: record {
self: string &log &default=Cluster::node;
node: string &log;
};
redef enum Log::ID += { TEST_LOG };
global finish: event(name: string) &is_used;
event zeek_init() {
print "zeek_init", Cluster::node;
Log::create_stream(TEST_LOG, [$columns=Info, $path="node_up"]);
}
event Cluster::node_up(name: string, id: string) {
print "A node_up", name;
Log::write(TEST_LOG, [$node=name]);
}
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
global nodes_up: set[string] = {"manager"};
global nodes_down: set[string] = {"manager"};
event send_finish() {
for ( n in nodes_up )
Cluster::publish(Cluster::node_topic(n), finish, Cluster::node);
}
event check_cluster_log() {
if ( file_size("DONE") >= 0 ) {
event send_finish();
return;
}
system("../check-cluster-log.sh");
schedule 0.1sec { check_cluster_log() };
}
event zeek_init() {
schedule 0.1sec { check_cluster_log() };
}
event Cluster::node_up(name: string, id: string) {
add nodes_up[name];
print "B nodes_up", |nodes_up|;
}
event Cluster::node_down(name: string, id: string) {
print "D node_down", name;
add nodes_down[name];
if ( |nodes_down| == |Cluster::nodes| )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE other.zeek
@load ./common.zeek
event finish(name: string) {
print fmt("finish from %s", name);
terminate();
}
# @TEST-END-FILE
#
# @TEST-START-FILE check-cluster-log.sh
#!/bin/sh
#
# This script checks cluster.log until the expected number
# of log entries have been observed and puts a normalized version
# into the testing directory for baselining.
CLUSTER_LOG=cluster.log
if [ ! -f $CLUSTER_LOG ]; then
echo "$CLUSTER_LOG not found!" >&2
exit 1;
fi
if [ -f DONE ]; then
exit 0
fi
# Remove hostname and pid from node id in message.
zeek-cut node message < $CLUSTER_LOG | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log.tmp
# 4 times 3
if [ $(wc -l < cluster.log.tmp) = 12 ]; then
echo "DONE!" >&2
mv cluster.log.tmp ../cluster.log.normalized
echo "DONE" > DONE
fi
exit 0
# @TEST-END-FILE

View file

@ -0,0 +1,86 @@
# @TEST-DOC: Configure a ZeroMQ cluster with Zeek's supervisor.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-EXEC: chmod +x ./check-cluster-log.sh
#
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run supervisor "ZEEKPATH=$ZEEKPATH:.. && zeek -j ../supervisor.zeek >out"
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff supervisor/cluster.log
redef Log::default_rotation_interval = 0secs;
redef Log::flush_interval = 0.01sec;
@if ( ! Supervisor::is_supervisor() )
@load ./zeromq-test-bootstrap
@else
# The supervisor peeks into logger/cluster.log to initate a shutdown when
# all nodes have said hello to each other. See the check-cluster.log.sh
# script below.
event check_cluster_log() {
system_env("../check-cluster-log.sh", table(["SUPERVISOR_PID"] = cat(getpid())));
schedule 0.1sec { check_cluster_log() };
}
event zeek_init()
{
if ( ! Supervisor::is_supervisor() )
return;
Broker::listen("127.0.0.1", 9999/tcp);
local cluster: table[string] of Supervisor::ClusterEndpoint;
cluster["manager"] = [$role=Supervisor::MANAGER, $host=127.0.0.1, $p=0/unknown];
cluster["logger"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, $p=to_port(getenv("LOG_PULL_PORT"))];
cluster["proxy"] = [$role=Supervisor::PROXY, $host=127.0.0.1, $p=0/unknown];
cluster["worker-1"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
cluster["worker-2"] = [$role=Supervisor::WORKER, $host=127.0.0.1, $p=0/unknown];
for ( n, ep in cluster )
{
local sn = Supervisor::NodeConfig($name=n, $bare_mode=T, $cluster=cluster, $directory=n);
local res = Supervisor::create(sn);
if ( res != "" )
print fmt("supervisor failed to create node '%s': %s", n, res);
}
# Start polling the cluster.log
event check_cluster_log();
}
@endif
# @TEST-START-FILE check-cluster-log.sh
#!/bin/sh
#
# This script checks logger/cluster.log until the expected number
# of log entries have been observed and puts a normalized version
# into the current directory. This runs from the supervisor.
if [ ! -f logger/cluster.log ]; then
exit 1;
fi
if [ -f DONE ]; then
exit 0
fi
# Remove hostname and pid from node id in message.
zeek-cut node message < logger/cluster.log | sed -r 's/_[^_]+_[0-9]+_/_<hostname>_<pid>_/g' | sort > cluster.log
if [ $(wc -l < cluster.log) = 20 ]; then
echo "DONE!" >&2
# Trigger shutdown through supervisor.
kill ${ZEEK_ARG_SUPERVISOR_PID};
echo "DONE" > DONE
fi
# @TEST-END-FILE

View file

@ -0,0 +1,53 @@
# @TEST-DOC: Startup a manager running the ZeroMQ proxy thread, a worker connects and the manager sends a finish event to terminate the worker.
#
# @TEST-REQUIRES: have-zeromq
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/out
# @TEST-EXEC: btest-diff ./worker/out
# @TEST-START-FILE common.zeek
@load ./zeromq-test-bootstrap
global finish: event(name: string);
# @TEST-END-FILE
# @TEST-START-FILE manager.zeek
@load ./common.zeek
# If a node comes up that isn't us, send it a finish event.
event Cluster::node_up(name: string, id: string) {
print "node_up", name;
Cluster::publish(Cluster::nodeid_topic(id), finish, Cluster::node);
}
# If the worker vanishes, finish the test.
event Cluster::node_down(name: string, id: string) {
print "node_down", name;
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE worker.zeek
@load ./common.zeek
event Cluster::node_up(name: string, id: string) {
print "node_up", name;
}
event finish(name: string) &is_used {
terminate();
}
# @TEST-END-FILE

View file

@ -1,6 +1,7 @@
# @TEST-DOC: ZAM maintenance script for tracking changes in BiFs.
#
# @TEST-REQUIRES: have-spicy
# @TEST-REQUIRES: have-zeromq
#
# @TEST-EXEC: zeek -b %INPUT >output
# @TEST-EXEC: btest-diff output
@ -198,6 +199,7 @@ global known_BiFs = set(
"Telemetry::__histogram_observe",
"Telemetry::__histogram_sum",
"WebSocket::__configure_analyzer",
"Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread",
"__init_primary_bifs",
"__init_secondary_bifs",
"active_file",

4
testing/scripts/have-zeromq Executable file
View file

@ -0,0 +1,4 @@
#!/bin/sh
zeek -N Zeek::Cluster_Backend_ZeroMQ >/dev/null
exit $?