mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/robin/websocket'
* origin/topic/robin/websocket: Add WebSocket support for exchanging events with external clients.
This commit is contained in:
commit
24c2090ffb
28 changed files with 267 additions and 27 deletions
12
CHANGES
12
CHANGES
|
@ -1,3 +1,15 @@
|
||||||
|
5.0.0-dev.545 | 2022-06-02 12:00:53 +0200
|
||||||
|
|
||||||
|
* Add WebSocket support for exchanging events with external clients.
|
||||||
|
(Robin Sommer, Corelight)
|
||||||
|
|
||||||
|
This exposes Broker's new WebSocket support in Zeek. To enable it,
|
||||||
|
call `Broker::listen_websocket()`. Zeek will then start listening on
|
||||||
|
port 9997 for incoming WebSocket connections.
|
||||||
|
|
||||||
|
See the Broker documentation for a description of the message format
|
||||||
|
expected over these WebSocket connections.
|
||||||
|
|
||||||
5.0.0-dev.540 | 2022-06-01 11:08:42 -0700
|
5.0.0-dev.540 | 2022-06-01 11:08:42 -0700
|
||||||
|
|
||||||
* GH-2136: Clean up DNS_Mgr before shutting down (Tim Wojtulewicz, Corelight)
|
* GH-2136: Clean up DNS_Mgr before shutting down (Tim Wojtulewicz, Corelight)
|
||||||
|
|
6
NEWS
6
NEWS
|
@ -52,6 +52,12 @@ New Functionality
|
||||||
use this functionality, see the TLS Decryption documentation at
|
use this functionality, see the TLS Decryption documentation at
|
||||||
https://docs.zeek.org/en/master/frameworks/tls-decryption.html
|
https://docs.zeek.org/en/master/frameworks/tls-decryption.html
|
||||||
|
|
||||||
|
- Zeek now provides WebSocket support for exchanging events with
|
||||||
|
external clients. To enable it, call `Broker::listen_websocket()`.
|
||||||
|
Zeek will then start listening on port 9997 for incoming WebSocket
|
||||||
|
connections. See the Broker documentation for a description of the
|
||||||
|
message format expected over these WebSocket connections.
|
||||||
|
|
||||||
- The new --with-gen-zam configure flag and its corresponding GEN_ZAM_EXE_PATH
|
- The new --with-gen-zam configure flag and its corresponding GEN_ZAM_EXE_PATH
|
||||||
cmake variable allow reuse of a previously built Gen-ZAM code generator. This
|
cmake variable allow reuse of a previously built Gen-ZAM code generator. This
|
||||||
aids cross-compilation: the Zeek build process normally compiles Gen-ZAM on
|
aids cross-compilation: the Zeek build process normally compiles Gen-ZAM on
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
5.0.0-dev.543
|
5.0.0-dev.545
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 5515c3b033b191a957b3ac45dbe7d0625936143a
|
Subproject commit 0353de8e4e26d90e609c1a09df3462ec68af2553
|
|
@ -26,4 +26,4 @@ RUN apk add --no-cache \
|
||||||
openssh-client \
|
openssh-client \
|
||||||
py3-pip
|
py3-pip
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -15,7 +15,7 @@ RUN sed -i 's/enabled=1/enabled=0/' /etc/yum/pluginconf.d/fastestmirror.conf
|
||||||
RUN yum -y install \
|
RUN yum -y install \
|
||||||
https://repo.ius.io/ius-release-el7.rpm \
|
https://repo.ius.io/ius-release-el7.rpm \
|
||||||
https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm \
|
https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm \
|
||||||
&& yum -y install git224 ccache \
|
&& yum -y install git236 ccache \
|
||||||
&& yum clean all && rm -rf /var/cache/yum
|
&& yum clean all && rm -rf /var/cache/yum
|
||||||
|
|
||||||
RUN yum -y install \
|
RUN yum -y install \
|
||||||
|
@ -55,7 +55,7 @@ RUN curl -sSL "https://github.com/westes/flex/releases/download/v${FLEX_VERSION}
|
||||||
&& make -j`nproc` install) \
|
&& make -j`nproc` install) \
|
||||||
&& rm -rf /tmp/flex-${FLEX_VERSION}
|
&& rm -rf /tmp/flex-${FLEX_VERSION}
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
||||||
RUN echo 'unset BASH_ENV PROMPT_COMMAND ENV' > /usr/bin/zeek-ci-env && \
|
RUN echo 'unset BASH_ENV PROMPT_COMMAND ENV' > /usr/bin/zeek-ci-env && \
|
||||||
echo 'source /opt/rh/devtoolset-8/enable' >> /usr/bin/zeek-ci-env && \
|
echo 'source /opt/rh/devtoolset-8/enable' >> /usr/bin/zeek-ci-env && \
|
||||||
|
|
|
@ -26,4 +26,4 @@ RUN dnf -y install \
|
||||||
zlib-devel \
|
zlib-devel \
|
||||||
&& dnf clean all && rm -rf /var/cache/dnf
|
&& dnf clean all && rm -rf /var/cache/dnf
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -42,4 +42,4 @@ RUN dnf -y --nobest install \
|
||||||
# Set the crypto policy to allow SHA-1 certificates - which we have in our tests
|
# Set the crypto policy to allow SHA-1 certificates - which we have in our tests
|
||||||
RUN dnf -y --nobest install crypto-policies-scripts && update-crypto-policies --set LEGACY
|
RUN dnf -y --nobest install crypto-policies-scripts && update-crypto-policies --set LEGACY
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -35,6 +35,6 @@ RUN apt-get update && apt-get -y install \
|
||||||
&& mkdir -p "${CMAKE_DIR}" \
|
&& mkdir -p "${CMAKE_DIR}" \
|
||||||
&& curl -sSL "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" | tar xzf - -C "${CMAKE_DIR}" --strip-components 1 \
|
&& curl -sSL "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" | tar xzf - -C "${CMAKE_DIR}" --strip-components 1 \
|
||||||
|
|
||||||
&& pip3 install junit2html
|
&& pip3 install websockets junit2html
|
||||||
|
|
||||||
ENV PATH "${CMAKE_DIR}/bin:${PATH}"
|
ENV PATH "${CMAKE_DIR}/bin:${PATH}"
|
||||||
|
|
|
@ -30,4 +30,4 @@ RUN apt-get update && apt-get -y install \
|
||||||
xz-utils \
|
xz-utils \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -43,6 +43,6 @@ RUN update-alternatives --install /usr/bin/c++ c++ /usr/bin/clang++-11 100
|
||||||
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
|
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
|
||||||
# a higher version of python3 than debian 9 provides. Fix the version of junit2html
|
# a higher version of python3 than debian 9 provides. Fix the version of junit2html
|
||||||
# to the last version before they required the newer jinja2.
|
# to the last version before they required the newer jinja2.
|
||||||
RUN pip3 install junit2html==30.0.6
|
RUN pip3 install websockets junit2html==30.0.6
|
||||||
|
|
||||||
ENV CXXFLAGS=-stdlib=libc++
|
ENV CXXFLAGS=-stdlib=libc++
|
||||||
|
|
|
@ -40,7 +40,7 @@ RUN apt-get update && apt-get -y install \
|
||||||
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
|
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
|
||||||
# a higher version of python3 than debian 9 provides. Fix the version of junit2html
|
# a higher version of python3 than debian 9 provides. Fix the version of junit2html
|
||||||
# to the last version before they required the newer jinja2.
|
# to the last version before they required the newer jinja2.
|
||||||
RUN pip3 install junit2html==30.0.6
|
RUN pip3 install websockets junit2html==30.0.6
|
||||||
|
|
||||||
ENV CC=/usr/bin/clang-11
|
ENV CC=/usr/bin/clang-11
|
||||||
ENV CXX=/usr/bin/clang++-11
|
ENV CXX=/usr/bin/clang++-11
|
||||||
|
|
|
@ -25,4 +25,4 @@ RUN dnf -y install \
|
||||||
zlib-devel \
|
zlib-devel \
|
||||||
&& dnf clean all && rm -rf /var/cache/dnf
|
&& dnf clean all && rm -rf /var/cache/dnf
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -25,4 +25,4 @@ RUN dnf -y install \
|
||||||
zlib-devel \
|
zlib-devel \
|
||||||
&& dnf clean all && rm -rf /var/cache/dnf
|
&& dnf clean all && rm -rf /var/cache/dnf
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
|
@ -11,7 +11,7 @@ pkg upgrade -y curl
|
||||||
pyver=$(python3 -c 'import sys; print(f"py{sys.version_info[0]}{sys.version_info[1]}")')
|
pyver=$(python3 -c 'import sys; print(f"py{sys.version_info[0]}{sys.version_info[1]}")')
|
||||||
pkg install -y $pyver-sqlite3 $pyver-pip
|
pkg install -y $pyver-sqlite3 $pyver-pip
|
||||||
|
|
||||||
python -m pip install junit2html
|
python -m pip install websockets junit2html
|
||||||
|
|
||||||
# Spicy detects whether it is run from build directory via `/proc`.
|
# Spicy detects whether it is run from build directory via `/proc`.
|
||||||
echo "proc /proc procfs rw,noauto 0 0" >>/etc/fstab
|
echo "proc /proc procfs rw,noauto 0 0" >>/etc/fstab
|
||||||
|
|
|
@ -8,3 +8,4 @@ set -x
|
||||||
brew update
|
brew update
|
||||||
brew upgrade cmake openssl@1.1
|
brew upgrade cmake openssl@1.1
|
||||||
brew install swig bison flex ccache
|
brew install swig bison flex ccache
|
||||||
|
python3 -m pip install --user websockets
|
||||||
|
|
|
@ -37,5 +37,5 @@ RUN apt-get update && apt-get -y install \
|
||||||
# Note - the symlink is important, otherwise cmake uses the wrong .so files.
|
# Note - the symlink is important, otherwise cmake uses the wrong .so files.
|
||||||
RUN wget https://www.openssl.org/source/openssl-3.0.0.tar.gz && tar xvf ./openssl-3.0.0.tar.gz && cd ./openssl-3.0.0 && ./Configure --prefix=/opt/openssl && make install && cd .. && rm -rf openssl-3.0.0 && ln -sf /opt/openssl/lib64 /opt/openssl/lib
|
RUN wget https://www.openssl.org/source/openssl-3.0.0.tar.gz && tar xvf ./openssl-3.0.0.tar.gz && cd ./openssl-3.0.0 && ./Configure --prefix=/opt/openssl && make install && cd .. && rm -rf openssl-3.0.0 && ln -sf /opt/openssl/lib64 /opt/openssl/lib
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
RUN gem install coveralls-lcov
|
RUN gem install coveralls-lcov
|
||||||
|
|
|
@ -28,7 +28,7 @@ RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.2
|
||||||
tar \
|
tar \
|
||||||
&& rm -rf /var/cache/zypp
|
&& rm -rf /var/cache/zypp
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
|
|
||||||
ENV CXX g++-9
|
ENV CXX g++-9
|
||||||
ENV CC gcc-9
|
ENV CC gcc-9
|
||||||
|
|
|
@ -42,5 +42,5 @@ RUN apt-get update && apt-get -y install \
|
||||||
ENV CC=/usr/bin/clang-10
|
ENV CC=/usr/bin/clang-10
|
||||||
ENV CXX=/usr/bin/clang++-10
|
ENV CXX=/usr/bin/clang++-10
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
RUN gem install coveralls-lcov
|
RUN gem install coveralls-lcov
|
||||||
|
|
|
@ -34,5 +34,5 @@ RUN apt-get update && apt-get -y install \
|
||||||
lcov \
|
lcov \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
RUN gem install coveralls-lcov
|
RUN gem install coveralls-lcov
|
||||||
|
|
|
@ -34,5 +34,5 @@ RUN apt-get update && apt-get -y install \
|
||||||
lcov \
|
lcov \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
RUN pip3 install junit2html
|
RUN pip3 install websockets junit2html
|
||||||
RUN gem install coveralls-lcov
|
RUN gem install coveralls-lcov
|
||||||
|
|
|
@ -3,10 +3,18 @@
|
||||||
module Broker;
|
module Broker;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
## Default port for Broker communication. Where not specified
|
## Default port for native Broker communication. Where not specified
|
||||||
## otherwise, this is the port to connect to and listen on.
|
## otherwise, this is the port to connect to and listen on.
|
||||||
const default_port = 9999/tcp &redef;
|
const default_port = 9999/tcp &redef;
|
||||||
|
|
||||||
|
## Default port for Broker WebSocket communication. Where not specified
|
||||||
|
## otherwise, this is the port to connect to and listen on for
|
||||||
|
## WebSocket connections.
|
||||||
|
##
|
||||||
|
## See the Broker documentation for a specification of the message
|
||||||
|
## format over WebSocket connections.
|
||||||
|
const default_port_websocket = 9997/tcp &redef;
|
||||||
|
|
||||||
## Default interval to retry listening on a port if it's currently in
|
## Default interval to retry listening on a port if it's currently in
|
||||||
## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable
|
## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable
|
||||||
## (set as a number of seconds) will override this option and also
|
## (set as a number of seconds) will override this option and also
|
||||||
|
@ -18,6 +26,11 @@ export {
|
||||||
## .. zeek:see:: Broker::listen
|
## .. zeek:see:: Broker::listen
|
||||||
const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
|
const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
|
||||||
|
|
||||||
|
## Default address on which to listen for WebSocket connections.
|
||||||
|
##
|
||||||
|
## .. zeek:see:: Broker::listen_websocket
|
||||||
|
const default_listen_address_websocket = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
|
||||||
|
|
||||||
## Default interval to retry connecting to a peer if it cannot be made to
|
## Default interval to retry connecting to a peer if it cannot be made to
|
||||||
## work initially, or if it ever becomes disconnected. Use of the
|
## work initially, or if it ever becomes disconnected. Use of the
|
||||||
## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of
|
## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of
|
||||||
|
@ -267,7 +280,7 @@ export {
|
||||||
val: Broker::Data;
|
val: Broker::Data;
|
||||||
};
|
};
|
||||||
|
|
||||||
## Listen for remote connections.
|
## Listen for remote connections using the native Broker protocol.
|
||||||
##
|
##
|
||||||
## a: an address string on which to accept connections, e.g.
|
## a: an address string on which to accept connections, e.g.
|
||||||
## "127.0.0.1". An empty string refers to INADDR_ANY.
|
## "127.0.0.1". An empty string refers to INADDR_ANY.
|
||||||
|
@ -287,6 +300,26 @@ export {
|
||||||
p: port &default = default_port,
|
p: port &default = default_port,
|
||||||
retry: interval &default = default_listen_retry): port;
|
retry: interval &default = default_listen_retry): port;
|
||||||
|
|
||||||
|
## Listen for remote connections using WebSocket.
|
||||||
|
##
|
||||||
|
## a: an address string on which to accept connections, e.g.
|
||||||
|
## "127.0.0.1". An empty string refers to INADDR_ANY.
|
||||||
|
##
|
||||||
|
## p: the TCP port to listen on. The value 0 means that the OS should choose
|
||||||
|
## the next available free port.
|
||||||
|
##
|
||||||
|
## retry: If non-zero, retries listening in regular intervals if the port cannot be
|
||||||
|
## acquired immediately. 0 disables retries. If the
|
||||||
|
## ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number
|
||||||
|
## of seconds), it overrides any value given here.
|
||||||
|
##
|
||||||
|
## Returns: the bound port or 0/? on failure.
|
||||||
|
##
|
||||||
|
## .. zeek:see:: Broker::status
|
||||||
|
global listen_websocket: function(a: string &default = default_listen_address_websocket,
|
||||||
|
p: port &default = default_port_websocket,
|
||||||
|
retry: interval &default = default_listen_retry): port;
|
||||||
|
|
||||||
## Initiate a remote connection.
|
## Initiate a remote connection.
|
||||||
##
|
##
|
||||||
## a: an address to connect to, e.g. "localhost" or "127.0.0.1".
|
## a: an address to connect to, e.g. "localhost" or "127.0.0.1".
|
||||||
|
@ -473,7 +506,7 @@ event retry_listen(a: string, p: port, retry: interval)
|
||||||
|
|
||||||
function listen(a: string, p: port, retry: interval): port
|
function listen(a: string, p: port, retry: interval): port
|
||||||
{
|
{
|
||||||
local bound = __listen(a, p);
|
local bound = __listen(a, p, Broker::NATIVE);
|
||||||
|
|
||||||
if ( bound == 0/tcp )
|
if ( bound == 0/tcp )
|
||||||
{
|
{
|
||||||
|
@ -489,6 +522,29 @@ function listen(a: string, p: port, retry: interval): port
|
||||||
return bound;
|
return bound;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event retry_listen_websocket(a: string, p: port, retry: interval)
|
||||||
|
{
|
||||||
|
listen_websocket(a, p, retry);
|
||||||
|
}
|
||||||
|
|
||||||
|
function listen_websocket(a: string, p: port, retry: interval): port
|
||||||
|
{
|
||||||
|
local bound = __listen(a, p, Broker::WEBSOCKET);
|
||||||
|
|
||||||
|
if ( bound == 0/tcp )
|
||||||
|
{
|
||||||
|
local e = getenv("ZEEK_DEFAULT_LISTEN_RETRY");
|
||||||
|
|
||||||
|
if ( e != "" )
|
||||||
|
retry = double_to_interval(to_double(e));
|
||||||
|
|
||||||
|
if ( retry != 0secs )
|
||||||
|
schedule retry { retry_listen_websocket(a, p, retry) };
|
||||||
|
}
|
||||||
|
|
||||||
|
return bound;
|
||||||
|
}
|
||||||
|
|
||||||
function peer(a: string, p: port, retry: interval): bool
|
function peer(a: string, p: port, retry: interval): bool
|
||||||
{
|
{
|
||||||
return __peer(a, p, retry);
|
return __peer(a, p, retry);
|
||||||
|
|
|
@ -527,12 +527,21 @@ void Manager::ClearStores()
|
||||||
handle->store.clear();
|
handle->store.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t Manager::Listen(const string& addr, uint16_t port)
|
uint16_t Manager::Listen(const string& addr, uint16_t port, BrokerProtocol type)
|
||||||
{
|
{
|
||||||
if ( bstate->endpoint.is_shutdown() )
|
if ( bstate->endpoint.is_shutdown() )
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
bound_port = bstate->endpoint.listen(addr, port);
|
switch ( type )
|
||||||
|
{
|
||||||
|
case BrokerProtocol::Native:
|
||||||
|
bound_port = bstate->endpoint.listen(addr, port);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case BrokerProtocol::WebSocket:
|
||||||
|
bound_port = bstate->endpoint.web_socket_listen(addr, port);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if ( bound_port == 0 )
|
if ( bound_port == 0 )
|
||||||
Error("Failed to listen on %s:%" PRIu16, addr.empty() ? "INADDR_ANY" : addr.c_str(), port);
|
Error("Failed to listen on %s:%" PRIu16, addr.empty() ? "INADDR_ANY" : addr.c_str(), port);
|
||||||
|
|
|
@ -79,6 +79,13 @@ struct Stats
|
||||||
class Manager : public iosource::IOSource
|
class Manager : public iosource::IOSource
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
/** Broker protocol to expect on a listening port. */
|
||||||
|
enum class BrokerProtocol
|
||||||
|
{
|
||||||
|
Native, /**< Broker's native binary protocol */
|
||||||
|
WebSocket /** Broker's WebSocket protocol for external clients. */
|
||||||
|
};
|
||||||
|
|
||||||
static const broker::endpoint_info NoPeer;
|
static const broker::endpoint_info NoPeer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -118,11 +125,13 @@ public:
|
||||||
* @param port the TCP port to listen on.
|
* @param port the TCP port to listen on.
|
||||||
* @param addr an address string on which to accept connections, e.g.
|
* @param addr an address string on which to accept connections, e.g.
|
||||||
* "127.0.0.1". The empty string refers to @p INADDR_ANY.
|
* "127.0.0.1". The empty string refers to @p INADDR_ANY.
|
||||||
|
* @param protocol protocol to speak on accepted connections
|
||||||
* @return 0 on failure or the bound port otherwise. If *port* != 0, then the
|
* @return 0 on failure or the bound port otherwise. If *port* != 0, then the
|
||||||
* return value equals *port* on success. If *port* equals 0, then the
|
* return value equals *port* on success. If *port* equals 0, then the
|
||||||
* return values represents the bound port as chosen by the OS.
|
* return values represents the bound port as chosen by the OS.
|
||||||
*/
|
*/
|
||||||
uint16_t Listen(const std::string& addr, uint16_t port);
|
uint16_t Listen(const std::string& addr, uint16_t port,
|
||||||
|
BrokerProtocol protocol = BrokerProtocol::Native);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initiate a peering with a remote endpoint.
|
* Initiate a peering with a remote endpoint.
|
||||||
|
|
|
@ -63,7 +63,12 @@ enum PeerStatus %{
|
||||||
RECONNECTING,
|
RECONNECTING,
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Broker::__listen%(a: string, p: port%): port
|
enum BrokerProtocol %{
|
||||||
|
NATIVE,
|
||||||
|
WEBSOCKET,
|
||||||
|
%}
|
||||||
|
|
||||||
|
function Broker::__listen%(a: string, p: port, proto: BrokerProtocol%): port
|
||||||
%{
|
%{
|
||||||
zeek::Broker::Manager::ScriptScopeGuard ssg;
|
zeek::Broker::Manager::ScriptScopeGuard ssg;
|
||||||
|
|
||||||
|
@ -73,7 +78,15 @@ function Broker::__listen%(a: string, p: port%): port
|
||||||
return zeek::val_mgr->Port(0, TRANSPORT_UNKNOWN);
|
return zeek::val_mgr->Port(0, TRANSPORT_UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto rval = broker_mgr->Listen(a->Len() ? a->CheckString() : "", p->Port());
|
zeek::Broker::Manager::BrokerProtocol proto_;
|
||||||
|
switch ( proto->AsEnum() )
|
||||||
|
{
|
||||||
|
case BifEnum::Broker::NATIVE: proto_ = zeek::Broker::Manager::BrokerProtocol::Native; break;
|
||||||
|
case BifEnum::Broker::WEBSOCKET: proto_ = zeek::Broker::Manager::BrokerProtocol::WebSocket; break;
|
||||||
|
default: reporter->InternalError("unknown Broker protocol");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto rval = broker_mgr->Listen(a->Len() ? a->CheckString() : "", p->Port(), proto_);
|
||||||
return zeek::val_mgr->Port(rval, TRANSPORT_TCP);
|
return zeek::val_mgr->Port(rval, TRANSPORT_TCP);
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
ping ['my-message', 1]
|
||||||
|
ping ['my-message', 2]
|
||||||
|
ping ['my-message', 3]
|
|
@ -0,0 +1,6 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
sender added peer: endpoint=127.0.0.1 msg=handshake successful
|
||||||
|
sender got pong: my-message, 1
|
||||||
|
sender got pong: my-message, 2
|
||||||
|
sender got pong: my-message, 3
|
||||||
|
sender lost peer: endpoint=127.0.0.1 msg=lost connection to client
|
124
testing/btest/broker/web-socket-events.zeek
Normal file
124
testing/btest/broker/web-socket-events.zeek
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
# @TEST-GROUP: broker
|
||||||
|
#
|
||||||
|
# @TEST-PORT: BROKER_PORT
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output"
|
||||||
|
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 45
|
||||||
|
# @TEST-EXEC: btest-diff client/output
|
||||||
|
# @TEST-EXEC: btest-diff server/output
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
redef Broker::disable_ssl = T;
|
||||||
|
|
||||||
|
global event_count = 0;
|
||||||
|
|
||||||
|
global ping: event(msg: string, c: count);
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Broker::subscribe("/zeek/event/my_topic");
|
||||||
|
Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
}
|
||||||
|
|
||||||
|
function send_event()
|
||||||
|
{
|
||||||
|
++event_count;
|
||||||
|
local e = Broker::make_event(ping, "my-message", event_count);
|
||||||
|
Broker::publish("/zeek/event/my_topic", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
|
||||||
|
send_event();
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
event pong(msg: string, n: count)
|
||||||
|
{
|
||||||
|
print fmt("sender got pong: %s, %s", msg, n);
|
||||||
|
send_event();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@TEST-START-FILE client.py
|
||||||
|
import asyncio, websockets, os, time, json, sys
|
||||||
|
|
||||||
|
ws_port = os.environ['BROKER_PORT'].split('/')[0]
|
||||||
|
ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port
|
||||||
|
topic = '"/zeek/event/my_topic"'
|
||||||
|
|
||||||
|
def broker_value(type, val):
|
||||||
|
return {
|
||||||
|
'@data-type': type,
|
||||||
|
'data': val
|
||||||
|
}
|
||||||
|
|
||||||
|
async def do_run():
|
||||||
|
# Try up to 30 times.
|
||||||
|
connected = False
|
||||||
|
for i in range(30):
|
||||||
|
try:
|
||||||
|
ws = await websockets.connect(ws_url)
|
||||||
|
connected = True
|
||||||
|
|
||||||
|
# send filter and wait for ack
|
||||||
|
await ws.send('[%s]' % topic)
|
||||||
|
ack_json = await ws.recv()
|
||||||
|
ack = json.loads(ack_json)
|
||||||
|
if not 'type' in ack or ack['type'] != 'ack':
|
||||||
|
print('*** unexpected ACK from server:')
|
||||||
|
print(ack_json)
|
||||||
|
sys.exit()
|
||||||
|
except Exception as e:
|
||||||
|
if not connected:
|
||||||
|
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
|
||||||
|
time.sleep(1)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
print('exception: %s' % e, file=sys.stderr)
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
for round in range(3):
|
||||||
|
# wait for ping
|
||||||
|
msg = await ws.recv()
|
||||||
|
msg = json.loads(msg)
|
||||||
|
if not 'type' in msg or msg['type'] != 'data-message':
|
||||||
|
continue
|
||||||
|
|
||||||
|
ping = msg['data'][2]['data']
|
||||||
|
name = ping[0]['data']
|
||||||
|
args = [x['data'] for x in ping[1]['data']]
|
||||||
|
print(name, args)
|
||||||
|
|
||||||
|
# send pong
|
||||||
|
pong = [broker_value('string', 'pong'),
|
||||||
|
broker_value('vector', [
|
||||||
|
broker_value('string', args[0]),
|
||||||
|
broker_value('count', args[1])
|
||||||
|
])]
|
||||||
|
|
||||||
|
ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)]
|
||||||
|
msg = {
|
||||||
|
'type': 'data-message',
|
||||||
|
'topic': '/zeek/event/my_topic',
|
||||||
|
'@data-type': 'vector', 'data': ev
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = json.dumps(msg)
|
||||||
|
await ws.send(msg)
|
||||||
|
|
||||||
|
await ws.close()
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(do_run())
|
||||||
|
|
||||||
|
@TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue