diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out b/testing/btest/Baseline/cluster.websocket.three/..manager.out deleted file mode 100644 index 30dca05f31..0000000000 --- a/testing/btest/Baseline/cluster.websocket.three/..manager.out +++ /dev/null @@ -1,23 +0,0 @@ -### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::websocket_client_added, 1, [/test/clients] -Cluster::websocket_client_added, 2, [/test/clients] -Cluster::websocket_client_added, 3, [/test/clients] -got ping: ws1, 0 -got ping: ws2, 1 -got ping: ws3, 2 -got ping: ws1, 3 -got ping: ws2, 4 -got ping: ws3, 5 -got ping: ws1, 6 -got ping: ws2, 7 -got ping: ws3, 8 -got ping: ws1, 9 -got ping: ws2, 10 -got ping: ws3, 11 -got ping: ws1, 12 -got ping: ws2, 13 -got ping: ws3, 14 -got ping: ws1, 15 -Cluster::websocket_client_lost, 1 -Cluster::websocket_client_lost, 2 -Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted new file mode 100644 index 0000000000..69d725b682 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.three/..manager.out.sorted @@ -0,0 +1,28 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +A subscription, /test/client-0 +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager +B Cluster::websocket_client_added, [/test/client-0, /test/clients] +B Cluster::websocket_client_added, [/test/client-1, /test/clients] +B Cluster::websocket_client_added, [/test/client-2, /test/clients] +C got ping: ws1, 0 +C got ping: ws1, 12 +C got ping: ws1, 15 +C got ping: ws1, 3 +C got ping: ws1, 6 +C got ping: ws1, 9 +C got ping: ws2, 1 +C got ping: ws2, 10 +C got ping: ws2, 13 +C got ping: ws2, 4 +C got ping: ws2, 7 +C got ping: ws3, 11 +C got ping: ws3, 14 +C got ping: ws3, 2 +C got ping: ws3, 5 +C got ping: ws3, 8 +D Cluster::websocket_client_lost, 1 +D Cluster::websocket_client_lost, 2 +D Cluster::websocket_client_lost, 3 diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out index 616a268fed..79efa77003 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..client.out @@ -12,43 +12,43 @@ Sending ping 4 - ws1 Sending ping 4 - ws2 Receiving ack - ws1 Receiving ack - ws2 -Receiving pong 0 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 0 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] -Receiving pong 1 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] +Receiving pong 0 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 1}] Receiving pong 1 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] -Receiving pong 2 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] +Receiving pong 1 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 2}] Receiving pong 2 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] -Receiving pong 3 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] +Receiving pong 2 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 3}] Receiving pong 3 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] -Receiving pong 4 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] +Receiving pong 3 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 4}] Receiving pong 4 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] -Receiving pong 5 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] +Receiving pong 4 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 5}] Receiving pong 5 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] -Receiving pong 6 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] +Receiving pong 5 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 6}] Receiving pong 6 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] -Receiving pong 7 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] +Receiving pong 6 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 7}] Receiving pong 7 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] -Receiving pong 8 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] +Receiving pong 7 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 8}] Receiving pong 8 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] -Receiving pong 9 - ws0 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] +Receiving pong 8 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 9}] Receiving pong 9 - ws1 -topic /zeek/event/to_client event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] +Receiving pong 9 - ws2 +topic /test/clients event name pong args [{'@data-type': 'string', 'data': 'my-message'}, {'@data-type': 'count', 'data': 10}] diff --git a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted index ae8f79c02d..8b8657ed17 100644 --- a/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted +++ b/testing/btest/Baseline/cluster.websocket.two-pipelining/..manager.out.sorted @@ -1,6 +1,10 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -A Cluster::websocket_client_added, 1, [/zeek/event/to_client] -A Cluster::websocket_client_added, 2, [/zeek/event/to_client] +A Cluster::websocket_client_added, [/test/client-1, /test/clients] +A Cluster::websocket_client_added, [/test/client-2, /test/clients] +A subscription, /test/client-1 +A subscription, /test/client-2 +A subscription, /test/clients +A subscription, /test/manager B got ping: ws1, 0 B got ping: ws1, 1 B got ping: ws1, 2 diff --git a/testing/btest/cluster/websocket/three.zeek b/testing/btest/cluster/websocket/three.zeek index 3810c04e3c..ca6f998be9 100644 --- a/testing/btest/cluster/websocket/three.zeek +++ b/testing/btest/cluster/websocket/three.zeek @@ -20,7 +20,8 @@ # @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" # # @TEST-EXEC: btest-bg-wait 30 -# @TEST-EXEC: btest-diff ./manager/out +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted # @TEST-EXEC: btest-diff ./manager/.stderr # @TEST-EXEC: btest-diff ./client/out # @TEST-EXEC: btest-diff ./client/.stderr @@ -34,35 +35,44 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/test/manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } +global added = 0; +global lost = 0; event ping(msg: string, n: count) &is_used { ++ping_count; - print fmt("got ping: %s, %s", msg, n); + print fmt("C got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, fmt("orig_msg=%s", msg), ping_count); Cluster::publish("/test/clients", e); } -global added = 0; -global lost = 0; - event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "Cluster::websocket_client_added", added, subscriptions; + print "B Cluster::websocket_client_added", subscriptions; } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) { ++lost; - print "Cluster::websocket_client_lost", lost; + print "D Cluster::websocket_client_lost", lost; if ( lost == 3 ) terminate(); +} + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); } # @TEST-END-FILE @@ -100,8 +110,10 @@ def run(ws_url): clients = [ws1, ws2, ws3] print("Connected!") ids = set() - for c in clients: - c.send(json.dumps([topic])) + for i, c in enumerate(clients): + client_topic = f"/test/client-{i}" + c.send(json.dumps([topic, client_topic])) + for c in clients: ack = json.loads(c.recv()) assert "type" in ack, repr(ack) diff --git a/testing/btest/cluster/websocket/two-pipelining.zeek b/testing/btest/cluster/websocket/two-pipelining.zeek index cfef86522f..92ec628c7b 100644 --- a/testing/btest/cluster/websocket/two-pipelining.zeek +++ b/testing/btest/cluster/websocket/two-pipelining.zeek @@ -35,12 +35,6 @@ global ping_count = 0; global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; -event zeek_init() - { - Cluster::subscribe("/zeek/event/to_manager"); - Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); - } - global added = 0; global lost = 0; @@ -51,10 +45,26 @@ type Item: record { global queue: vector of Item; +function is_ready(): bool + { + return added == 2; + } + +function drain_if_ready() + { + if ( is_ready() && |queue| > 0 ) + { + for ( _, item in queue ) + event ping(item$msg, item$n); + + delete queue; + } + } + event ping(msg: string, n: count) &is_used { # Queue the pings if we haven't seen both clients yet. - if ( added < 2 ) + if ( ! is_ready() ) { queue += Item($msg=msg, $n=n); return; @@ -63,20 +73,15 @@ event ping(msg: string, n: count) &is_used ++ping_count; print fmt("B got ping: %s, %s", msg, n); local e = Cluster::make_event(pong, "my-message", ping_count); - Cluster::publish("/zeek/event/to_client", e); + Cluster::publish("/test/clients", e); } event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) { ++added; - print "A Cluster::websocket_client_added", added, subscriptions; + print "A Cluster::websocket_client_added", subscriptions; - if ( added == 2 ) - { - # Anything in the queue? - for ( _, item in queue ) - event ping(item$msg, item$n); - } + drain_if_ready(); } event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) @@ -86,6 +91,21 @@ event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) if ( lost == 2 ) terminate(); } + +# Extra testing output. +event Cluster::Backend::ZeroMQ::subscription(topic: string) + { + if ( ! starts_with(topic, "/test") ) + return; + + print "A subscription", topic; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/manager"); + } # @TEST-END-FILE @@ -95,12 +115,12 @@ from websockets.sync.client import connect ws_port = os.environ['WEBSOCKET_PORT'].split('/')[0] ws_url = f'ws://127.0.0.1:{ws_port}/v1/messages/json' -topic = '/zeek/event/to_client' +topic = '/test/clients' def make_ping(c, who): return { "type": "data-message", - "topic": "/zeek/event/to_manager", + "topic": "/test/manager", "@data-type": "vector", "data": [ {"@data-type": "count", "data": 1}, # Format @@ -121,8 +141,9 @@ def run(ws_url): clients = [ws1, ws2] print("Connected!") # Send subscriptions - for ws in clients: - ws.send(json.dumps([topic])) + for c, ws in enumerate(clients, 1): + client_topic = f"/test/client-{c}" + ws.send(json.dumps([topic, client_topic])) for i in range(5): for c, ws in enumerate(clients, 1): @@ -138,7 +159,7 @@ def run(ws_url): assert "version" in ack for i in range(10): - for c, ws in enumerate(clients): + for c, ws in enumerate(clients, 1): print(f"Receiving pong {i} - ws{c}") pong = json.loads(ws.recv()) assert pong["@data-type"] == "vector"