Add compatibility tests for timestamped events.

This adds compatibility tests for receiving non-timestamped events as
well as providing timestamps via broker websockets.

Co-authored-by: Arne Welzel <arne.welzel@corelight.com>
This commit is contained in:
Jan Grashoefer 2023-05-02 11:43:21 +02:00
parent 1e807a9f0a
commit 1780d4cc2f
7 changed files with 259 additions and 1 deletions

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
send event without timestamp
send event with timestamp

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
added peer: endpoint=127.0.0.1 msg=handshake successful
got my_event(without ts) stamped to 42.0 at network time 42.0
got my_event(with ts) stamped to 23.0 at network time 42.0
lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
ping args ['my-message', 1] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:14.000'}]}]
ping args ['my-message', 2] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:24.000'}]}]
ping args ['my-message', 3] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:34.000'}]}]

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
sender added peer: endpoint=127.0.0.1 msg=handshake successful
sender got pong: my-message, 1 network_time=1681819994.0 current_event_time=1681819995.0
sender got pong: my-message, 2 network_time=1681820004.0 current_event_time=1681819996.0
sender got pong: my-message, 3 network_time=1681820014.0 current_event_time=1681819997.0
sender lost peer: endpoint=127.0.0.1 msg=lost connection to client

View file

@ -0,0 +1,92 @@
# @TEST-DOC: Test compatibility with peers sending events without timestamps.
#
# @TEST-GROUP: broker
# @TEST-PORT: BROKER_PORT
#
# @TEST-REQUIRES: python3 -V
# @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check
#
# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output"
# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: btest-diff server/output
# @TEST-EXEC: btest-diff client/output
redef exit_only_after_terminate = T;
redef allow_network_time_forward = F;
event zeek_init()
{
Broker::subscribe(getenv("TOPIC"));
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
set_network_time(double_to_time(42.0));
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event my_event(msg: string) &is_used
{
print fmt("got my_event(%s) stamped to %s at network time %s",
msg, current_event_time(), network_time());
}
@TEST-START-FILE client.py
"""
Python script sending timestamped and non-timestamped event to TOPIC
"""
import datetime
import os
import sys
# Prep the PYTHONPATH for the build directory.
broker_path = os.path.join(os.environ["BUILD"], "auxil", "broker", "python")
sys.path.insert(0, broker_path)
import broker
# 1024/tcp
broker_port = int(os.environ["BROKER_PORT"].split("/")[0])
broker_topic = os.environ["TOPIC"]
# We were able to import broker and parse the broker_port, should be good.
if len(sys.argv) > 1 and sys.argv[1] == "check":
sys.exit(0)
# Setup endpoint and connect to Zeek.
with broker.Endpoint() as ep, \
ep.make_status_subscriber(True) as ss:
ep.peer("127.0.0.1", broker_port)
st = ss.get(2)
if not (st[0].code() == broker.SC.EndpointDiscovered and
st[1].code() == broker.SC.PeerAdded):
print("could not connect")
exit(0)
# Send events and close connection
print("send event without timestamp")
my_event = broker.zeek.Event("my_event", "without ts")
ep.publish(broker_topic, my_event)
print("send event with timestamp")
ts = datetime.datetime.fromtimestamp(23.0, broker.utc)
metadata = {
broker.zeek.Metadata.NETWORK_TIMESTAMP: ts,
}
my_event = broker.zeek.Event("my_event", "with ts", metadata=metadata)
ep.publish(broker_topic, my_event)
ep.shutdown()
@TEST-END-FILE

View file

@ -0,0 +1,148 @@
# @TEST-GROUP: broker
#
# This test requires the websockets module, available via
# "pip install websockets".
# @TEST-REQUIRES: python3 -c 'import websockets'
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 5
# @TEST-EXEC: btest-diff client/output
# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff server/output
redef allow_network_time_forward = F;
redef exit_only_after_terminate = T;
redef Broker::disable_ssl = T;
global event_count = 0;
global ping: event(msg: string, c: count);
event zeek_init()
{
# Tue 18 Apr 2023 12:13:14 PM UTC
set_network_time(double_to_time(1681819994.0));
Broker::subscribe("/zeek/event/my_topic");
Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
function send_event()
{
++event_count;
local e = Broker::make_event(ping, "my-message", event_count);
Broker::publish("/zeek/event/my_topic", e);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
send_event();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event pong(msg: string, n: count) &is_used
{
print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s",
msg, n, network_time(), current_event_time());
set_network_time(network_time() + 10sec);
send_event();
}
@TEST-START-FILE client.py
import asyncio, datetime, websockets, os, time, json, sys
ws_port = os.environ['BROKER_PORT'].split('/')[0]
ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port
topic = '"/zeek/event/my_topic"'
def broker_value(type, val):
return {
'@data-type': type,
'data': val
}
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url)
connected = True
# send filter and wait for ack
await ws.send('[%s]' % topic)
ack_json = await ws.recv()
ack = json.loads(ack_json)
if not 'type' in ack or ack['type'] != 'ack':
print('*** unexpected ACK from server:')
print(ack_json)
sys.exit()
except Exception as e:
if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
await asyncio.sleep(1)
continue
else:
print('exception: %s' % e, file=sys.stderr)
sys.exit()
for round in range(3):
# wait for ping
msg = await ws.recv()
msg = json.loads(msg)
if not 'type' in msg or msg['type'] != 'data-message':
print("unexpected type", msg)
continue
ping = msg['data'][2]['data']
if len(ping) < 3:
print("no metadata on event")
continue
name = ping[0]['data']
args = [x['data'] for x in ping[1]['data']]
metadata = ping[2]['data']
print(name, "args", args, "metadata", metadata)
# send pong
dt = datetime.datetime.utcfromtimestamp(1681819994 + args[1])
ts_str = dt.isoformat('T', 'milliseconds')
pong = [
broker_value('string', 'pong'),
broker_value('vector', [
broker_value('string', args[0]),
broker_value('count', args[1]),
]),
broker_value('vector', [
broker_value('vector', [
broker_value('count', 1), # network_timestamp
broker_value('timestamp', ts_str),
]),
]),
]
ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)]
msg = {
'type': 'data-message',
'topic': '/zeek/event/my_topic',
'@data-type': 'vector', 'data': ev
}
msg = json.dumps(msg)
await ws.send(msg)
await ws.close()
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE

View file

@ -84,7 +84,7 @@ async def do_run():
except Exception as e: except Exception as e:
if not connected: if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
time.sleep(1) await asyncio.sleep(1)
continue continue
else: else:
print('exception: %s' % e, file=sys.stderr) print('exception: %s' % e, file=sys.stderr)