Add WebSocket support for exchanging events with external clients.

This exposes Broker's new WebSocket support in Zeek. To enable it,
call `Broker::listen_websocket()`. Zeek will then start listening on
port 9997 for incoming WebSocket connections.

See the Broker documentation for a description of the message format
expected over these WebSocket connections.
This commit is contained in:
Robin Sommer 2022-05-19 07:45:02 +02:00
parent 4b0e1063ed
commit d99f041ac5
No known key found for this signature in database
GPG key ID: 6BEDA4DA6B8B23E3
25 changed files with 248 additions and 26 deletions

@ -1 +1 @@
Subproject commit 5515c3b033b191a957b3ac45dbe7d0625936143a Subproject commit 0353de8e4e26d90e609c1a09df3462ec68af2553

View file

@ -26,4 +26,4 @@ RUN apk add --no-cache \
openssh-client \ openssh-client \
py3-pip py3-pip
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -15,7 +15,7 @@ RUN sed -i 's/enabled=1/enabled=0/' /etc/yum/pluginconf.d/fastestmirror.conf
RUN yum -y install \ RUN yum -y install \
https://repo.ius.io/ius-release-el7.rpm \ https://repo.ius.io/ius-release-el7.rpm \
https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm \ https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm \
&& yum -y install git224 ccache \ && yum -y install git236 ccache \
&& yum clean all && rm -rf /var/cache/yum && yum clean all && rm -rf /var/cache/yum
RUN yum -y install \ RUN yum -y install \
@ -55,7 +55,7 @@ RUN curl -sSL "https://github.com/westes/flex/releases/download/v${FLEX_VERSION}
&& make -j`nproc` install) \ && make -j`nproc` install) \
&& rm -rf /tmp/flex-${FLEX_VERSION} && rm -rf /tmp/flex-${FLEX_VERSION}
RUN pip3 install junit2html RUN pip3 install websockets junit2html
RUN echo 'unset BASH_ENV PROMPT_COMMAND ENV' > /usr/bin/zeek-ci-env && \ RUN echo 'unset BASH_ENV PROMPT_COMMAND ENV' > /usr/bin/zeek-ci-env && \
echo 'source /opt/rh/devtoolset-8/enable' >> /usr/bin/zeek-ci-env && \ echo 'source /opt/rh/devtoolset-8/enable' >> /usr/bin/zeek-ci-env && \

View file

@ -26,4 +26,4 @@ RUN dnf -y install \
zlib-devel \ zlib-devel \
&& dnf clean all && rm -rf /var/cache/dnf && dnf clean all && rm -rf /var/cache/dnf
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -42,4 +42,4 @@ RUN dnf -y --nobest install \
# Set the crypto policy to allow SHA-1 certificates - which we have in our tests # Set the crypto policy to allow SHA-1 certificates - which we have in our tests
RUN dnf -y --nobest install crypto-policies-scripts && update-crypto-policies --set LEGACY RUN dnf -y --nobest install crypto-policies-scripts && update-crypto-policies --set LEGACY
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -35,6 +35,6 @@ RUN apt-get update && apt-get -y install \
&& mkdir -p "${CMAKE_DIR}" \ && mkdir -p "${CMAKE_DIR}" \
&& curl -sSL "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" | tar xzf - -C "${CMAKE_DIR}" --strip-components 1 \ && curl -sSL "https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" | tar xzf - -C "${CMAKE_DIR}" --strip-components 1 \
&& pip3 install junit2html && pip3 install websockets junit2html
ENV PATH "${CMAKE_DIR}/bin:${PATH}" ENV PATH "${CMAKE_DIR}/bin:${PATH}"

View file

@ -30,4 +30,4 @@ RUN apt-get update && apt-get -y install \
xz-utils \ xz-utils \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -43,6 +43,6 @@ RUN update-alternatives --install /usr/bin/c++ c++ /usr/bin/clang++-11 100
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is # junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
# a higher version of python3 than debian 9 provides. Fix the version of junit2html # a higher version of python3 than debian 9 provides. Fix the version of junit2html
# to the last version before they required the newer jinja2. # to the last version before they required the newer jinja2.
RUN pip3 install junit2html==30.0.6 RUN pip3 install websockets junit2html==30.0.6
ENV CXXFLAGS=-stdlib=libc++ ENV CXXFLAGS=-stdlib=libc++

View file

@ -40,7 +40,7 @@ RUN apt-get update && apt-get -y install \
# junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is # junit2html >= 31.0.0 requires jinj2 >= 3.0 which requires python >= 3.7 which is
# a higher version of python3 than debian 9 provides. Fix the version of junit2html # a higher version of python3 than debian 9 provides. Fix the version of junit2html
# to the last version before they required the newer jinja2. # to the last version before they required the newer jinja2.
RUN pip3 install junit2html==30.0.6 RUN pip3 install websockets junit2html==30.0.6
ENV CC=/usr/bin/clang-11 ENV CC=/usr/bin/clang-11
ENV CXX=/usr/bin/clang++-11 ENV CXX=/usr/bin/clang++-11

View file

@ -25,4 +25,4 @@ RUN dnf -y install \
zlib-devel \ zlib-devel \
&& dnf clean all && rm -rf /var/cache/dnf && dnf clean all && rm -rf /var/cache/dnf
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -25,4 +25,4 @@ RUN dnf -y install \
zlib-devel \ zlib-devel \
&& dnf clean all && rm -rf /var/cache/dnf && dnf clean all && rm -rf /var/cache/dnf
RUN pip3 install junit2html RUN pip3 install websockets junit2html

View file

@ -11,7 +11,7 @@ pkg upgrade -y curl
pyver=$(python3 -c 'import sys; print(f"py{sys.version_info[0]}{sys.version_info[1]}")') pyver=$(python3 -c 'import sys; print(f"py{sys.version_info[0]}{sys.version_info[1]}")')
pkg install -y $pyver-sqlite3 $pyver-pip pkg install -y $pyver-sqlite3 $pyver-pip
python -m pip install junit2html python -m pip install websockets junit2html
# Spicy detects whether it is run from build directory via `/proc`. # Spicy detects whether it is run from build directory via `/proc`.
echo "proc /proc procfs rw,noauto 0 0" >>/etc/fstab echo "proc /proc procfs rw,noauto 0 0" >>/etc/fstab

View file

@ -8,3 +8,4 @@ set -x
brew update brew update
brew upgrade cmake openssl@1.1 brew upgrade cmake openssl@1.1
brew install swig bison flex ccache brew install swig bison flex ccache
python3 -m pip install --user websockets

View file

@ -37,5 +37,5 @@ RUN apt-get update && apt-get -y install \
# Note - the symlink is important, otherwise cmake uses the wrong .so files. # Note - the symlink is important, otherwise cmake uses the wrong .so files.
RUN wget https://www.openssl.org/source/openssl-3.0.0.tar.gz && tar xvf ./openssl-3.0.0.tar.gz && cd ./openssl-3.0.0 && ./Configure --prefix=/opt/openssl && make install && cd .. && rm -rf openssl-3.0.0 && ln -sf /opt/openssl/lib64 /opt/openssl/lib RUN wget https://www.openssl.org/source/openssl-3.0.0.tar.gz && tar xvf ./openssl-3.0.0.tar.gz && cd ./openssl-3.0.0 && ./Configure --prefix=/opt/openssl && make install && cd .. && rm -rf openssl-3.0.0 && ln -sf /opt/openssl/lib64 /opt/openssl/lib
RUN pip3 install junit2html RUN pip3 install websockets junit2html
RUN gem install coveralls-lcov RUN gem install coveralls-lcov

View file

@ -28,7 +28,7 @@ RUN zypper addrepo https://download.opensuse.org/repositories/openSUSE:Leap:15.2
tar \ tar \
&& rm -rf /var/cache/zypp && rm -rf /var/cache/zypp
RUN pip3 install junit2html RUN pip3 install websockets junit2html
ENV CXX g++-9 ENV CXX g++-9
ENV CC gcc-9 ENV CC gcc-9

View file

@ -42,5 +42,5 @@ RUN apt-get update && apt-get -y install \
ENV CC=/usr/bin/clang-10 ENV CC=/usr/bin/clang-10
ENV CXX=/usr/bin/clang++-10 ENV CXX=/usr/bin/clang++-10
RUN pip3 install junit2html RUN pip3 install websockets junit2html
RUN gem install coveralls-lcov RUN gem install coveralls-lcov

View file

@ -34,5 +34,5 @@ RUN apt-get update && apt-get -y install \
lcov \ lcov \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN pip3 install junit2html RUN pip3 install websockets junit2html
RUN gem install coveralls-lcov RUN gem install coveralls-lcov

View file

@ -34,5 +34,5 @@ RUN apt-get update && apt-get -y install \
lcov \ lcov \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
RUN pip3 install junit2html RUN pip3 install websockets junit2html
RUN gem install coveralls-lcov RUN gem install coveralls-lcov

View file

@ -3,10 +3,18 @@
module Broker; module Broker;
export { export {
## Default port for Broker communication. Where not specified ## Default port for native Broker communication. Where not specified
## otherwise, this is the port to connect to and listen on. ## otherwise, this is the port to connect to and listen on.
const default_port = 9999/tcp &redef; const default_port = 9999/tcp &redef;
## Default port for Broker WebSocket communication. Where not specified
## otherwise, this is the port to connect to and listen on for
## WebSocket connections.
##
## See the Broker documentation for a specification of the message
## format over WebSocket connections.
const default_port_websocket = 9997/tcp &redef;
## Default interval to retry listening on a port if it's currently in ## Default interval to retry listening on a port if it's currently in
## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable ## use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable
## (set as a number of seconds) will override this option and also ## (set as a number of seconds) will override this option and also
@ -18,6 +26,11 @@ export {
## .. zeek:see:: Broker::listen ## .. zeek:see:: Broker::listen
const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef; const default_listen_address = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
## Default address on which to listen for WebSocket connections.
##
## .. zeek:see:: Broker::listen_websocket
const default_listen_address_websocket = getenv("ZEEK_DEFAULT_LISTEN_ADDRESS") &redef;
## Default interval to retry connecting to a peer if it cannot be made to ## Default interval to retry connecting to a peer if it cannot be made to
## work initially, or if it ever becomes disconnected. Use of the ## work initially, or if it ever becomes disconnected. Use of the
## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of ## ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of
@ -267,7 +280,7 @@ export {
val: Broker::Data; val: Broker::Data;
}; };
## Listen for remote connections. ## Listen for remote connections using the native Broker protocol.
## ##
## a: an address string on which to accept connections, e.g. ## a: an address string on which to accept connections, e.g.
## "127.0.0.1". An empty string refers to INADDR_ANY. ## "127.0.0.1". An empty string refers to INADDR_ANY.
@ -287,6 +300,26 @@ export {
p: port &default = default_port, p: port &default = default_port,
retry: interval &default = default_listen_retry): port; retry: interval &default = default_listen_retry): port;
## Listen for remote connections using WebSocket.
##
## a: an address string on which to accept connections, e.g.
## "127.0.0.1". An empty string refers to INADDR_ANY.
##
## p: the TCP port to listen on. The value 0 means that the OS should choose
## the next available free port.
##
## retry: If non-zero, retries listening in regular intervals if the port cannot be
## acquired immediately. 0 disables retries. If the
## ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number
## of seconds), it overrides any value given here.
##
## Returns: the bound port or 0/? on failure.
##
## .. zeek:see:: Broker::status
global listen_websocket: function(a: string &default = default_listen_address_websocket,
p: port &default = default_port_websocket,
retry: interval &default = default_listen_retry): port;
## Initiate a remote connection. ## Initiate a remote connection.
## ##
## a: an address to connect to, e.g. "localhost" or "127.0.0.1". ## a: an address to connect to, e.g. "localhost" or "127.0.0.1".
@ -473,7 +506,7 @@ event retry_listen(a: string, p: port, retry: interval)
function listen(a: string, p: port, retry: interval): port function listen(a: string, p: port, retry: interval): port
{ {
local bound = __listen(a, p); local bound = __listen(a, p, Broker::NATIVE);
if ( bound == 0/tcp ) if ( bound == 0/tcp )
{ {
@ -489,6 +522,29 @@ function listen(a: string, p: port, retry: interval): port
return bound; return bound;
} }
event retry_listen_websocket(a: string, p: port, retry: interval)
{
listen_websocket(a, p, retry);
}
function listen_websocket(a: string, p: port, retry: interval): port
{
local bound = __listen(a, p, Broker::WEBSOCKET);
if ( bound == 0/tcp )
{
local e = getenv("ZEEK_DEFAULT_LISTEN_RETRY");
if ( e != "" )
retry = double_to_interval(to_double(e));
if ( retry != 0secs )
schedule retry { retry_listen_websocket(a, p, retry) };
}
return bound;
}
function peer(a: string, p: port, retry: interval): bool function peer(a: string, p: port, retry: interval): bool
{ {
return __peer(a, p, retry); return __peer(a, p, retry);

View file

@ -527,12 +527,21 @@ void Manager::ClearStores()
handle->store.clear(); handle->store.clear();
} }
uint16_t Manager::Listen(const string& addr, uint16_t port) uint16_t Manager::Listen(const string& addr, uint16_t port, BrokerProtocol type)
{ {
if ( bstate->endpoint.is_shutdown() ) if ( bstate->endpoint.is_shutdown() )
return 0; return 0;
switch ( type )
{
case BrokerProtocol::Native:
bound_port = bstate->endpoint.listen(addr, port); bound_port = bstate->endpoint.listen(addr, port);
break;
case BrokerProtocol::WebSocket:
bound_port = bstate->endpoint.web_socket_listen(addr, port);
break;
}
if ( bound_port == 0 ) if ( bound_port == 0 )
Error("Failed to listen on %s:%" PRIu16, addr.empty() ? "INADDR_ANY" : addr.c_str(), port); Error("Failed to listen on %s:%" PRIu16, addr.empty() ? "INADDR_ANY" : addr.c_str(), port);

View file

@ -79,6 +79,13 @@ struct Stats
class Manager : public iosource::IOSource class Manager : public iosource::IOSource
{ {
public: public:
/** Broker protocol to expect on a listening port. */
enum class BrokerProtocol
{
Native, /**< Broker's native binary protocol */
WebSocket /** Broker's WebSocket protocol for external clients. */
};
static const broker::endpoint_info NoPeer; static const broker::endpoint_info NoPeer;
/** /**
@ -118,11 +125,13 @@ public:
* @param port the TCP port to listen on. * @param port the TCP port to listen on.
* @param addr an address string on which to accept connections, e.g. * @param addr an address string on which to accept connections, e.g.
* "127.0.0.1". The empty string refers to @p INADDR_ANY. * "127.0.0.1". The empty string refers to @p INADDR_ANY.
* @param protocol protocol to speak on accepted connections
* @return 0 on failure or the bound port otherwise. If *port* != 0, then the * @return 0 on failure or the bound port otherwise. If *port* != 0, then the
* return value equals *port* on success. If *port* equals 0, then the * return value equals *port* on success. If *port* equals 0, then the
* return values represents the bound port as chosen by the OS. * return values represents the bound port as chosen by the OS.
*/ */
uint16_t Listen(const std::string& addr, uint16_t port); uint16_t Listen(const std::string& addr, uint16_t port,
BrokerProtocol protocol = BrokerProtocol::Native);
/** /**
* Initiate a peering with a remote endpoint. * Initiate a peering with a remote endpoint.

View file

@ -63,7 +63,12 @@ enum PeerStatus %{
RECONNECTING, RECONNECTING,
%} %}
function Broker::__listen%(a: string, p: port%): port enum BrokerProtocol %{
NATIVE,
WEBSOCKET,
%}
function Broker::__listen%(a: string, p: port, proto: BrokerProtocol%): port
%{ %{
zeek::Broker::Manager::ScriptScopeGuard ssg; zeek::Broker::Manager::ScriptScopeGuard ssg;
@ -73,7 +78,15 @@ function Broker::__listen%(a: string, p: port%): port
return zeek::val_mgr->Port(0, TRANSPORT_UNKNOWN); return zeek::val_mgr->Port(0, TRANSPORT_UNKNOWN);
} }
auto rval = broker_mgr->Listen(a->Len() ? a->CheckString() : "", p->Port()); zeek::Broker::Manager::BrokerProtocol proto_;
switch ( proto->AsEnum() )
{
case BifEnum::Broker::NATIVE: proto_ = zeek::Broker::Manager::BrokerProtocol::Native; break;
case BifEnum::Broker::WEBSOCKET: proto_ = zeek::Broker::Manager::BrokerProtocol::WebSocket; break;
default: reporter->InternalError("unknown Broker protocol");
}
auto rval = broker_mgr->Listen(a->Len() ? a->CheckString() : "", p->Port(), proto_);
return zeek::val_mgr->Port(rval, TRANSPORT_TCP); return zeek::val_mgr->Port(rval, TRANSPORT_TCP);
%} %}

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
ping ['my-message', 1]
ping ['my-message', 2]
ping ['my-message', 3]

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
sender got pong: my-message, 1
sender got pong: my-message, 2
sender got pong: my-message, 3
sender lost peer: endpoint=127.0.0.1 msg=lost connection to client

View file

@ -0,0 +1,124 @@
# @TEST-GROUP: broker
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: btest-diff client/output
# @TEST-EXEC: btest-diff server/output
redef exit_only_after_terminate = T;
redef Broker::disable_ssl = T;
global event_count = 0;
global ping: event(msg: string, c: count);
event zeek_init()
{
Broker::subscribe("/zeek/event/my_topic");
Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
function send_event()
{
++event_count;
local e = Broker::make_event(ping, "my-message", event_count);
Broker::publish("/zeek/event/my_topic", e);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
send_event();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event pong(msg: string, n: count)
{
print fmt("sender got pong: %s, %s", msg, n);
send_event();
}
@TEST-START-FILE client.py
import asyncio, websockets, os, time, json, sys
ws_port = os.environ['BROKER_PORT'].split('/')[0]
ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port
topic = '"/zeek/event/my_topic"'
def broker_value(type, val):
return {
'@data-type': type,
'data': val
}
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url)
connected = True
# send filter and wait for ack
await ws.send('[%s]' % topic)
ack_json = await ws.recv()
ack = json.loads(ack_json)
if not 'type' in ack or ack['type'] != 'ack':
print('*** unexpected ACK from server:')
print(ack_json)
sys.exit()
except Exception as e:
if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
time.sleep(1)
continue
else:
print('exception: %s' % e, file=sys.stderr)
sys.exit()
for round in range(3):
# wait for ping
msg = await ws.recv()
msg = json.loads(msg)
if not 'type' in msg or msg['type'] != 'data-message':
continue
ping = msg['data'][2]['data']
name = ping[0]['data']
args = [x['data'] for x in ping[1]['data']]
print(name, args)
# send pong
pong = [broker_value('string', 'pong'),
broker_value('vector', [
broker_value('string', args[0]),
broker_value('count', args[1])
])]
ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)]
msg = {
'type': 'data-message',
'topic': '/zeek/event/my_topic',
'@data-type': 'vector', 'data': ev
}
msg = json.dumps(msg)
await ws.send(msg)
await ws.close()
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE