diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..client..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.zeromq/..client..stderr rename to testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..client..stderr diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..client.out b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..client.out new file mode 100644 index 0000000000..b5baf120ef --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..client.out @@ -0,0 +1,66 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Connected! +unique ids 3 +recv_until_timeout: tc1 - ping 1 +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 101)"}, {"@data-type": "count", "data": 1}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 101}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 101}]} +{"client": "ws1", "timeout": true} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 101)"}, {"@data-type": "count", "data": 1}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 101)"}, {"@data-type": "count", "data": 1}]} +recv_until_timeout: tc2 - ping 1 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 201}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 201)"}, {"@data-type": "count", "data": 2}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 201}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 201)"}, {"@data-type": "count", "data": 2}]} +{"client": "ws2", "timeout": true} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 201)"}, {"@data-type": "count", "data": 2}]} +recv_until_timeout: tc3 - ping 1 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 301}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 301}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 301)"}, {"@data-type": "count", "data": 3}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 301)"}, {"@data-type": "count", "data": 3}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 301)"}, {"@data-type": "count", "data": 3}]} +{"client": "ws3", "timeout": true} +recv_until_timeout: tc1 - ping 2 +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 102)"}, {"@data-type": "count", "data": 4}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 102}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 102}]} +{"client": "ws1", "timeout": true} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 102)"}, {"@data-type": "count", "data": 4}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 102)"}, {"@data-type": "count", "data": 4}]} +recv_until_timeout: tc2 - ping 2 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 202}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 202)"}, {"@data-type": "count", "data": 5}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 202}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 202)"}, {"@data-type": "count", "data": 5}]} +{"client": "ws2", "timeout": true} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 202)"}, {"@data-type": "count", "data": 5}]} +recv_until_timeout: tc3 - ping 2 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 302}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-manager"}, {"@data-type": "count", "data": 302}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 302)"}, {"@data-type": "count", "data": 6}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 302)"}, {"@data-type": "count", "data": 6}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "manager reply for ping(to-manager, 302)"}, {"@data-type": "count", "data": 6}]} +{"client": "ws3", "timeout": true} +recv_until_timeout: tc1 - ping 3 +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 103)"}, {"@data-type": "count", "data": 7}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 103}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 103}]} +{"client": "ws1", "timeout": true} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 103)"}, {"@data-type": "count", "data": 7}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 103)"}, {"@data-type": "count", "data": 7}]} +recv_until_timeout: tc2 - ping 3 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 203}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 203)"}, {"@data-type": "count", "data": 8}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 203}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 203)"}, {"@data-type": "count", "data": 8}]} +{"client": "ws2", "timeout": true} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 203)"}, {"@data-type": "count", "data": 8}]} +recv_until_timeout: tc3 - ping 3 +{"client": "ws1", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 303}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "ping", "event_args": [{"@data-type": "string", "data": "to-worker"}, {"@data-type": "count", "data": 303}]} +{"client": "ws3", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 303)"}, {"@data-type": "count", "data": 9}]} +{"client": "ws1", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 303)"}, {"@data-type": "count", "data": 9}]} +{"client": "ws2", "topic": "/test/pings/", "event_name": "pong", "event_args": [{"@data-type": "string", "data": "worker-1 reply for ping(to-worker, 303)"}, {"@data-type": "count", "data": 9}]} +{"client": "ws3", "timeout": true} diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..manager..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.zeromq/..manager..stderr rename to testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..manager..stderr diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..manager.out b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..manager.out new file mode 100644 index 0000000000..fb3a8125af --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..manager.out @@ -0,0 +1,24 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +XXXXXXXXXX.XXXXXX: Cluster::node_up worker-1 +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_added 1 [/test/pings/, /zeek/wstest/ws1/] +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_added 2 [/test/pings/, /zeek/wstest/ws2/] +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_added 3 [/test/pings/, /zeek/wstest/ws3/] +XXXXXXXXXX.XXXXXX: got ping: to-worker, 101 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 101), 1 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 201 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 201), 2 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 301 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 301), 3 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 102 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 202 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 302 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 103 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 103), 7 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 203 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 203), 8 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 303 +XXXXXXXXXX.XXXXXX: got pong: worker-1 reply for ping(to-worker, 303), 9 +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_lost 1 +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_lost 2 +XXXXXXXXXX.XXXXXX: Cluster::websocket_client_lost 3 +XXXXXXXXXX.XXXXXX: terminate() diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..worker-1..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.zeromq/..worker-1..stderr rename to testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..worker-1..stderr diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..worker-1.out b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..worker-1.out new file mode 100644 index 0000000000..5d456ba8b8 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.manager-worker-ping-pong/..worker-1.out @@ -0,0 +1,15 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +XXXXXXXXXX.XXXXXX: Cluster::node_up manager +XXXXXXXXXX.XXXXXX: got ping: to-worker, 101 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 201 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 301 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 102 +XXXXXXXXXX.XXXXXX: got pong: manager reply for ping(to-manager, 102), 4 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 202 +XXXXXXXXXX.XXXXXX: got pong: manager reply for ping(to-manager, 202), 5 +XXXXXXXXXX.XXXXXX: got ping: to-manager, 302 +XXXXXXXXXX.XXXXXX: got pong: manager reply for ping(to-manager, 302), 6 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 103 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 203 +XXXXXXXXXX.XXXXXX: got ping: to-worker, 303 +XXXXXXXXXX.XXXXXX: Cluster::node_down manager diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..client.out b/testing/btest/Baseline/cluster.websocket.zeromq/..client.out deleted file mode 100644 index d2723da11c..0000000000 --- a/testing/btest/Baseline/cluster.websocket.zeromq/..client.out +++ /dev/null @@ -1,6 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Connected! -unique ids 3 -ws1 timeout -ws2 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] -ws3 ev: topic /test/pings/42 event name ping args [{'@data-type': 'string', 'data': 'python-websocket-client'}, {'@data-type': 'count', 'data': 42}] diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out b/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out deleted file mode 100644 index 6cd1db5faa..0000000000 --- a/testing/btest/Baseline/cluster.websocket.zeromq/..manager.out +++ /dev/null @@ -1,9 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::node_up, worker-1 -Cluster::websocket_client_added, 1, [/test/pings/, /test/ws/1] -Cluster::websocket_client_added, 2, [/test/pings/, /test/ws/2] -Cluster::websocket_client_added, 3, [/test/pings/, /test/ws/3] -got ping: python-websocket-client, 42 -Cluster::websocket_client_lost, 1 -Cluster::websocket_client_lost, 2 -Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out b/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out deleted file mode 100644 index 7dcd0fc9ff..0000000000 --- a/testing/btest/Baseline/cluster.websocket.zeromq/..worker-1.out +++ /dev/null @@ -1,4 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::node_up, manager -got ping: python-websocket-client, 42 -Cluster::node_up, manager diff --git a/testing/btest/cluster/websocket/zeromq.zeek b/testing/btest/cluster/websocket/zeromq.zeek deleted file mode 100644 index 23f32ab351..0000000000 --- a/testing/btest/cluster/websocket/zeromq.zeek +++ /dev/null @@ -1,178 +0,0 @@ -# @TEST-DOC: Test WebSockets clients when the ZeroMQ cluster backend is enabled. -# -# @TEST-REQUIRES: python3 -c 'import websockets.sync' -# -# @TEST-PORT: XPUB_PORT -# @TEST-PORT: XSUB_PORT -# @TEST-PORT: LOG_PULL_PORT -# @TEST-PORT: WEBSOCKET_PORT -# -# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek -# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek -# -# @TEST-EXEC: zeek -b --parse-only manager.zeek worker.zeek -# @TEST-EXEC: python3 -m py_compile client.py -# -# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" -# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" -# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" -# -# @TEST-EXEC: btest-bg-wait 30 -# @TEST-EXEC: btest-diff ./manager/out -# @TEST-EXEC: btest-diff ./manager/.stderr -# @TEST-EXEC: btest-diff ./worker-1/out -# @TEST-EXEC: btest-diff ./worker-1/.stderr -# @TEST-EXEC: btest-diff ./client/out -# @TEST-EXEC: btest-diff ./client/.stderr - -# @TEST-START-FILE common.zeek -@load ./zeromq-test-bootstrap - -global ping: event(msg: string, c: count) &is_used; -global pong: event(msg: string, c: count) &is_used; - -# @TEST-END-FILE - -# # @TEST-START-FILE manager.zeek -@load ./common.zeek - -redef Log::enable_local_logging = T; -redef Log::default_rotation_interval = 0sec; - -redef Broker::disable_ssl = T; - -global ping_count = 0; - -event zeek_init() - { - Cluster::subscribe("/test/pings/"); - } - -event Cluster::node_up(name: string, id: string) - { - print "Cluster::node_up", name; - - # Delay listening on WebSocket clients until worker-1 is around. - if ( name == "worker-1" ) - Cluster::listen_websocket([ - $listen_host="127.0.0.1", - $listen_port=to_port(getenv("WEBSOCKET_PORT")) - ]); - } - -event ping(msg: string, n: count) &is_used - { - ++ping_count; - print fmt("got ping: %s, %s", msg, n); - local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); - Cluster::publish("/test/clients", e); - } - -global added = 0; -global lost = 0; - -event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) - { - ++added; - print "Cluster::websocket_client_added", added, subscriptions; - } - -event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) - { - ++lost; - print "Cluster::websocket_client_lost", lost; - if ( lost == 3 ) - terminate(); - } -# # @TEST-END-FILE - -# # @TEST-START-FILE worker.zeek -@load ./common.zeek - -event zeek_init() - { - Cluster::subscribe("/test/pings/"); - } - -event Cluster::node_up(name: string, id: string) - { - print "Cluster::node_up", name; - } - -event Cluster::node_down(name: string, id: string) - { - print "Cluster::node_up", name; - terminate(); - } - -event ping(msg: string, n: count) - { - print fmt("got ping: %s, %s", msg, n); - } -# # @TEST-END-FILE - - -# @TEST-START-FILE client.py -import json, os, time -from websockets.sync.client import connect - -ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] -ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' - -def make_ping(topic, c): - return { - "type": "data-message", - "topic": topic + str(c), - "@data-type": "vector", - "data": [ - {"@data-type": "count", "data": 1}, # Format - {"@data-type": "count", "data": 1}, # Type - {"@data-type": "vector", "data": [ - { "@data-type": "string", "data": "ping"}, # Event name - { "@data-type": "vector", "data": [ # event args - {"@data-type": "string", "data": f"python-websocket-client"}, - {"@data-type": "count", "data": c}, - ], }, - ], }, - ], - } - -def run(ws_url): - with connect(ws_url) as ws1: - with connect(ws_url) as ws2: - with connect(ws_url) as ws3: - clients = [ws1, ws2, ws3] - print("Connected!") - ids = set() - for i, c in enumerate(clients, 1): - c.send(json.dumps([f"/test/ws/{i}", "/test/pings/"])) - ack = json.loads(c.recv()) - assert "type" in ack, repr(ack) - assert ack["type"] == "ack" - assert "endpoint" in ack, repr(ack) - assert "version" in ack - ids.add(ack["endpoint"]) - - print("unique ids", len(ids)) - ws1.send(json.dumps(make_ping("/test/pings/", 42))) - - # Client 2 and client 3 receive the ping from client 1, client 1 gets a timeout. - for name, ws in [("ws1", ws1), ("ws2", ws2), ("ws3", ws3)]: - try: - data = json.loads(ws.recv(timeout=0.1)) - ev = data["data"][2]["data"] - print(name, "ev: topic", data["topic"], "event name", ev[0]["data"], "args", ev[1]["data"]) - except TimeoutError: - print(name, "timeout") - -def main(): - for _ in range(100): - try: - run(ws_url) - break - except ConnectionRefusedError: - time.sleep(0.1) - -if __name__ == "__main__": - main() -# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/zeromq/manager-worker-ping-pong.zeek b/testing/btest/cluster/websocket/zeromq/manager-worker-ping-pong.zeek new file mode 100644 index 0000000000..0212f601a8 --- /dev/null +++ b/testing/btest/cluster/websocket/zeromq/manager-worker-ping-pong.zeek @@ -0,0 +1,157 @@ +# @TEST-DOC: Test WebSockets clients when the ZeroMQ cluster backend is enabled. +# +# * 3 web socket clients connect to a manager node +# * one worker node running +# * All "parties" subscribe to /test/pings/ +# * WebSocket clients each send ping events with alternating messages +# instructing either the manager or worker to reply +# * Baselining confirms that manager, worker and the non-sending WebSocket +# clients see all ping. Additionally, the reply (pong) messages are visible +# by all WebSocket clients and the non-replying worker or manager node. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: XPUB_PORT +# @TEST-PORT: XSUB_PORT +# @TEST-PORT: LOG_PULL_PORT +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek +# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: zeek -b --parse-only worker.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./worker-1/out +# @TEST-EXEC: btest-diff ./worker-1/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE common.zeek +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; + +@load ./zeromq-test-bootstrap + +global ping: event(msg: string, c: count) &is_used; +global pong: event(msg: string, c: count) &is_used; + +global ping_count = 0; + +event zeek_init() + { + Cluster::subscribe("/test/pings/"); + } + +event pong(msg: string, n: count) &is_used + { + print fmt("%s: got pong: %s, %s", current_time(), msg, n); + } + +event ping(msg: string, n: count) &is_used + { + ++ping_count; + print fmt("%s: got ping: %s, %s", current_time(), msg, n); + + local reply_msg = fmt("%s reply for ping(%s, %s)", Cluster::node, msg, n); + if ( (msg == "to-manager" && Cluster::local_node_type() == Cluster::MANAGER) || + (msg == "to-worker" && Cluster::local_node_type() == Cluster::WORKER) ) + Cluster::publish("/test/pings/", pong, reply_msg, ping_count); + } +# @TEST-END-FILE + +# @TEST-START-FILE manager.zeek +@load ./common.zeek + +event Cluster::node_up(name: string, id: string) + { + print fmt("%s: Cluster::node_up %s", current_time(), name); + + # Delay listening on WebSocket clients until worker-1 is around. + if ( name == "worker-1" ) + Cluster::listen_websocket([ + $listen_host="127.0.0.1", + $listen_port=to_port(getenv("WEBSOCKET_PORT")) + ]); + } + +global added = 0; +global lost = 0; + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + ++added; + print fmt("%s: Cluster::websocket_client_added %s %s", current_time(), added, subscriptions); + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + ++lost; + print fmt("%s: Cluster::websocket_client_lost %s", current_time(), lost); + + if ( lost == 3 ) + { + print fmt("%s: terminate()", current_time()); + terminate(); + } + } +# @TEST-END-FILE + +# @TEST-START-FILE worker.zeek +@load ./common.zeek + +event Cluster::node_up(name: string, id: string) + { + print fmt("%s: Cluster::node_up %s", current_time(), name); + } + +event Cluster::node_down(name: string, id: string) + { + print fmt("%s: Cluster::node_down %s", current_time(), name); + terminate(); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + with ( + wstest.connect("ws1", ws_url) as tc1, + wstest.connect("ws2", ws_url) as tc2, + wstest.connect("ws3", ws_url) as tc3, + ): + clients = [tc1, tc2, tc3] + print("Connected!") + ids = set() + for tc in clients: + ack = tc.hello_v1(["/test/pings/"]) + ids.add(ack["endpoint"]) + + print("unique ids", len(ids)) + + for i in range(1, 4): + msg = "to-manager" if i % 2 == 0 else "to-worker" + tc1.send_json(wstest.build_event_v1("/test/pings/", "ping", [msg, 100 + i])) + wstest.recv_until_timeout(clients, desc=f"tc1 - ping {i}") + + tc2.send_json(wstest.build_event_v1("/test/pings/", "ping", [msg, 200 + i])) + wstest.recv_until_timeout(clients, desc=f"tc2 - ping {i}") + + tc3.send_json(wstest.build_event_v1("/test/pings/", "ping", [msg, 300 + i])) + wstest.recv_until_timeout(clients, desc=f"tc3 - ping {i}") + + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE