zeek/testing/btest/broker/web-socket-events-metadata.zeek
Arne Welzel 53b0f0ad64 Event: Deprecate default network timestamp metadata
This deprecates the Event constructor and the ``ts`` parameter of Enqueue()
Instead, versions are introduced that take a detail::MetadataVectorPtr which
can hold the network timestamp metadata and is meant to be allocated by the
caller instead of automatically during Enqueue() or within the Event
constructor.

This also introduces a BifConst ``EventMetadata::add_network_timestamp`` to
opt-in adding network timestamps to events globally. It's disabled by
default as there are not a lot of known use cases that need this.
2025-05-23 19:32:23 +02:00

153 lines
4.7 KiB
Text

# @TEST-GROUP: broker
#
# This test requires the websockets module, available via
# "pip install websockets".
# @TEST-REQUIRES: python3 -c 'import websockets'
#
# @TEST-PORT: BROKER_PORT
#
# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output"
#
# @TEST-EXEC: btest-bg-wait 5
# @TEST-EXEC: btest-diff client/output
# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff server/output
redef allow_network_time_forward = F;
redef exit_only_after_terminate = T;
redef Broker::disable_ssl = T;
redef EventMetadata::add_network_timestamp = T;
global event_count = 0;
global ping: event(msg: string, c: count);
event zeek_init()
{
# Tue 18 Apr 2023 12:13:14 PM UTC
set_network_time(double_to_time(1681819994.0));
Broker::subscribe("/zeek/event/my_topic");
Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT")));
}
event send_event()
{
++event_count;
local e = Broker::make_event(ping, "my-message", event_count);
Broker::publish("/zeek/event/my_topic", e);
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg);
event send_event();
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
{
print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg);
terminate();
}
event pong(msg: string, n: count) &is_used
{
print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s",
msg, n, network_time(), current_event_time());
set_network_time(network_time() + 10sec);
# pong is a remote event and a Broker::publish() would take
# current_event_time() as the network time for Broker::publish(),
# prevent this by queuing a new send_event().
event send_event();
}
# @TEST-START-FILE client.py
import asyncio, datetime, websockets, os, time, json, sys
ws_port = os.environ['BROKER_PORT'].split('/')[0]
ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port
topic = '"/zeek/event/my_topic"'
def broker_value(type, val):
return {
'@data-type': type,
'data': val
}
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url)
connected = True
# send filter and wait for ack
await ws.send('[%s]' % topic)
ack_json = await ws.recv()
ack = json.loads(ack_json)
if not 'type' in ack or ack['type'] != 'ack':
print('*** unexpected ACK from server:')
print(ack_json)
sys.exit()
except Exception as e:
if not connected:
print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr)
await asyncio.sleep(1)
continue
else:
print('exception: %s' % e, file=sys.stderr)
sys.exit()
for round in range(3):
# wait for ping
msg = await ws.recv()
msg = json.loads(msg)
if not 'type' in msg or msg['type'] != 'data-message':
print("unexpected type", msg)
continue
ping = msg['data'][2]['data']
if len(ping) < 3:
print("no metadata on event")
continue
name = ping[0]['data']
args = [x['data'] for x in ping[1]['data']]
metadata = ping[2]['data']
print(name, "args", args, "metadata", metadata)
# send pong
dt = datetime.datetime.utcfromtimestamp(1681819994 + args[1])
ts_str = dt.isoformat('T', 'milliseconds')
pong = [
broker_value('string', 'pong'),
broker_value('vector', [
broker_value('string', args[0]),
broker_value('count', args[1]),
]),
broker_value('vector', [
broker_value('vector', [
broker_value('count', 1), # network_timestamp
broker_value('timestamp', ts_str),
]),
]),
]
ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)]
msg = {
'type': 'data-message',
'topic': '/zeek/event/my_topic',
'@data-type': 'vector', 'data': ev
}
msg = json.dumps(msg)
await ws.send(msg)
await ws.close()
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
# @TEST-END-FILE