diff --git a/testing/btest/Baseline/broker.remote_event_ts_compat/client.output b/testing/btest/Baseline/broker.remote_event_ts_compat/client.output new file mode 100644 index 0000000000..d61cedbbe1 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts_compat/client.output @@ -0,0 +1,3 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +send event without timestamp +send event with timestamp diff --git a/testing/btest/Baseline/broker.remote_event_ts_compat/server.output b/testing/btest/Baseline/broker.remote_event_ts_compat/server.output new file mode 100644 index 0000000000..34a7c3a50b --- /dev/null +++ b/testing/btest/Baseline/broker.remote_event_ts_compat/server.output @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +added peer: endpoint=127.0.0.1 msg=handshake successful +got my_event(without ts) stamped to 42.0 at network time 42.0 +got my_event(with ts) stamped to 23.0 at network time 42.0 +lost peer: endpoint=127.0.0.1 msg=lost connection to remote peer diff --git a/testing/btest/Baseline/broker.web-socket-events-metadata/client.output b/testing/btest/Baseline/broker.web-socket-events-metadata/client.output new file mode 100644 index 0000000000..5fa2aec2b5 --- /dev/null +++ b/testing/btest/Baseline/broker.web-socket-events-metadata/client.output @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +ping args ['my-message', 1] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:14.000'}]}] +ping args ['my-message', 2] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:24.000'}]}] +ping args ['my-message', 3] metadata [{'@data-type': 'vector', 'data': [{'@data-type': 'count', 'data': 1}, {'@data-type': 'timestamp', 'data': '2023-04-18T12:13:34.000'}]}] diff --git a/testing/btest/Baseline/broker.web-socket-events-metadata/server.output b/testing/btest/Baseline/broker.web-socket-events-metadata/server.output new file mode 100644 index 0000000000..92f0a20dbf --- /dev/null +++ b/testing/btest/Baseline/broker.web-socket-events-metadata/server.output @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +sender added peer: endpoint=127.0.0.1 msg=handshake successful +sender got pong: my-message, 1 network_time=1681819994.0 current_event_time=1681819995.0 +sender got pong: my-message, 2 network_time=1681820004.0 current_event_time=1681819996.0 +sender got pong: my-message, 3 network_time=1681820014.0 current_event_time=1681819997.0 +sender lost peer: endpoint=127.0.0.1 msg=lost connection to client diff --git a/testing/btest/broker/remote_event_ts_compat.zeek b/testing/btest/broker/remote_event_ts_compat.zeek new file mode 100644 index 0000000000..3bf5fa6f7d --- /dev/null +++ b/testing/btest/broker/remote_event_ts_compat.zeek @@ -0,0 +1,92 @@ +# @TEST-DOC: Test compatibility with peers sending events without timestamps. +# +# @TEST-GROUP: broker +# @TEST-PORT: BROKER_PORT +# +# @TEST-REQUIRES: python3 -V +# @TEST-REQUIRES: TOPIC=/zeek/my_topic python3 client.py check +# +# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run server "zeek %INPUT >output" +# @TEST-EXEC: TOPIC=/zeek/my_topic btest-bg-run client "python3 ../client.py >output" +# +# @TEST-EXEC: btest-bg-wait 45 +# @TEST-EXEC: btest-diff server/output +# @TEST-EXEC: btest-diff client/output + +redef exit_only_after_terminate = T; +redef allow_network_time_forward = F; + +event zeek_init() + { + Broker::subscribe(getenv("TOPIC")); + Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT"))); + set_network_time(double_to_time(42.0)); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + terminate(); + } + +event my_event(msg: string) &is_used + { + print fmt("got my_event(%s) stamped to %s at network time %s", + msg, current_event_time(), network_time()); + } + + +@TEST-START-FILE client.py +""" +Python script sending timestamped and non-timestamped event to TOPIC +""" +import datetime +import os +import sys + +# Prep the PYTHONPATH for the build directory. +broker_path = os.path.join(os.environ["BUILD"], "auxil", "broker", "python") +sys.path.insert(0, broker_path) + +import broker + +# 1024/tcp +broker_port = int(os.environ["BROKER_PORT"].split("/")[0]) +broker_topic = os.environ["TOPIC"] + +# We were able to import broker and parse the broker_port, should be good. +if len(sys.argv) > 1 and sys.argv[1] == "check": + sys.exit(0) + +# Setup endpoint and connect to Zeek. +with broker.Endpoint() as ep, \ + ep.make_status_subscriber(True) as ss: + + ep.peer("127.0.0.1", broker_port) + st = ss.get(2) + if not (st[0].code() == broker.SC.EndpointDiscovered and + st[1].code() == broker.SC.PeerAdded): + print("could not connect") + exit(0) + + # Send events and close connection + print("send event without timestamp") + my_event = broker.zeek.Event("my_event", "without ts") + ep.publish(broker_topic, my_event) + + print("send event with timestamp") + ts = datetime.datetime.fromtimestamp(23.0, broker.utc) + metadata = { + broker.zeek.Metadata.NETWORK_TIMESTAMP: ts, + } + my_event = broker.zeek.Event("my_event", "with ts", metadata=metadata) + ep.publish(broker_topic, my_event) + + ep.shutdown() + +@TEST-END-FILE diff --git a/testing/btest/broker/web-socket-events-metadata.zeek b/testing/btest/broker/web-socket-events-metadata.zeek new file mode 100644 index 0000000000..52b1c8ab3b --- /dev/null +++ b/testing/btest/broker/web-socket-events-metadata.zeek @@ -0,0 +1,148 @@ +# @TEST-GROUP: broker +# +# This test requires the websockets module, available via +# "pip install websockets". +# @TEST-REQUIRES: python3 -c 'import websockets' +# +# @TEST-PORT: BROKER_PORT +# +# @TEST-EXEC: btest-bg-run server "zeek -b %INPUT >output" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >output" +# +# @TEST-EXEC: btest-bg-wait 5 +# @TEST-EXEC: btest-diff client/output +# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff server/output + +redef allow_network_time_forward = F; +redef exit_only_after_terminate = T; +redef Broker::disable_ssl = T; + +global event_count = 0; + +global ping: event(msg: string, c: count); + +event zeek_init() + { + # Tue 18 Apr 2023 12:13:14 PM UTC + set_network_time(double_to_time(1681819994.0)); + Broker::subscribe("/zeek/event/my_topic"); + Broker::listen_websocket("127.0.0.1", to_port(getenv("BROKER_PORT"))); + } + +function send_event() + { + ++event_count; + local e = Broker::make_event(ping, "my-message", event_count); + Broker::publish("/zeek/event/my_topic", e); + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender added peer: endpoint=%s msg=%s", endpoint$network$address, msg); + send_event(); + } + +event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) + { + print fmt("sender lost peer: endpoint=%s msg=%s", endpoint$network$address, msg); + terminate(); + } + +event pong(msg: string, n: count) &is_used + { + print fmt("sender got pong: %s, %s network_time=%s current_event_time=%s", + msg, n, network_time(), current_event_time()); + set_network_time(network_time() + 10sec); + send_event(); + } + + +@TEST-START-FILE client.py +import asyncio, datetime, websockets, os, time, json, sys + +ws_port = os.environ['BROKER_PORT'].split('/')[0] +ws_url = 'ws://localhost:%s/v1/messages/json' % ws_port +topic = '"/zeek/event/my_topic"' + +def broker_value(type, val): + return { + '@data-type': type, + 'data': val + } + +async def do_run(): + # Try up to 30 times. + connected = False + for i in range(30): + try: + ws = await websockets.connect(ws_url) + connected = True + + # send filter and wait for ack + await ws.send('[%s]' % topic) + ack_json = await ws.recv() + ack = json.loads(ack_json) + if not 'type' in ack or ack['type'] != 'ack': + print('*** unexpected ACK from server:') + print(ack_json) + sys.exit() + except Exception as e: + if not connected: + print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) + await asyncio.sleep(1) + continue + else: + print('exception: %s' % e, file=sys.stderr) + sys.exit() + + for round in range(3): + # wait for ping + msg = await ws.recv() + msg = json.loads(msg) + if not 'type' in msg or msg['type'] != 'data-message': + print("unexpected type", msg) + continue + ping = msg['data'][2]['data'] + if len(ping) < 3: + print("no metadata on event") + continue + + name = ping[0]['data'] + args = [x['data'] for x in ping[1]['data']] + metadata = ping[2]['data'] + print(name, "args", args, "metadata", metadata) + + # send pong + dt = datetime.datetime.utcfromtimestamp(1681819994 + args[1]) + ts_str = dt.isoformat('T', 'milliseconds') + pong = [ + broker_value('string', 'pong'), + broker_value('vector', [ + broker_value('string', args[0]), + broker_value('count', args[1]), + ]), + broker_value('vector', [ + broker_value('vector', [ + broker_value('count', 1), # network_timestamp + broker_value('timestamp', ts_str), + ]), + ]), + ] + + ev = [broker_value('count', 1), broker_value('count', 1), broker_value('vector', pong)] + msg = { + 'type': 'data-message', + 'topic': '/zeek/event/my_topic', + '@data-type': 'vector', 'data': ev + } + + msg = json.dumps(msg) + await ws.send(msg) + + await ws.close() + sys.exit() + +loop = asyncio.get_event_loop() +loop.run_until_complete(do_run()) + +@TEST-END-FILE diff --git a/testing/btest/broker/web-socket-events.zeek b/testing/btest/broker/web-socket-events.zeek index b94ef9c362..9002b803b4 100644 --- a/testing/btest/broker/web-socket-events.zeek +++ b/testing/btest/broker/web-socket-events.zeek @@ -84,7 +84,7 @@ async def do_run(): except Exception as e: if not connected: print('failed to connect to %s, try again (%s)' % (ws_url, e), file=sys.stderr) - time.sleep(1) + await asyncio.sleep(1) continue else: print('exception: %s' % e, file=sys.stderr)