From 01747191b6422df3a96c4a40b94391016e77f5ba Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 May 2025 14:20:22 +0200 Subject: [PATCH] Merge remote-tracking branch 'origin/topic/awelzel/4420-simeon-breaking-websockets' * origin/topic/awelzel/4420-simeon-breaking-websockets: Websocket: Close onloop during Terminate() OnLoop: notify_all() instead of notify_one() (cherry picked from commit 4afb0ffeebd000211feb1e30891a3439235977bf) --- CHANGES | 10 ++ VERSION | 2 +- src/cluster/OnLoop.h | 4 +- src/cluster/websocket/WebSocket.cc | 2 + .../..client..stderr | 1 + .../..client.out | 4 + .../..manager..stderr | 2 + .../..manager.out.sorted | 5 + .../websocket/terminate-while-queuing.zeek | 101 ++++++++++++++++++ 9 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted create mode 100644 testing/btest/cluster/websocket/terminate-while-queuing.zeek diff --git a/CHANGES b/CHANGES index fa4976a166..a107c5977e 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,13 @@ +7.2.0-rc1.10 | 2025-05-07 11:10:09 +0200 + + * Websocket: Close onloop during Terminate() (Arne Welzel, Corelight) + + (cherry picked from commit 4afb0ffeebd000211feb1e30891a3439235977bf) + + * OnLoop: notify_all() instead of notify_one() (Arne Welzel, Corelight) + + (cherry picked from commit 4afb0ffeebd000211feb1e30891a3439235977bf) + 7.2.0-rc1.9 | 2025-05-06 11:42:23 -0700 * Add comments to the static methods for the Redis implementation (Tim Wojtulewicz, Corelight) diff --git a/VERSION b/VERSION index a82eafc46b..f2ac5127b0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-rc1.9 +7.2.0-rc1.10 diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 3ca295162a..86cd6a136e 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -97,8 +97,8 @@ public: std::scoped_lock lock(mtx); SetClosed(true); - // Wake a process stuck in queueing. - cond.notify_one(); + // Wake other threads stuck in queueing, if any. + cond.notify_all(); } // Wait for any active queuers to vanish, should be quick. diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 8ca19a91de..4611e852d4 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -247,6 +247,8 @@ void WebSocketEventDispatcher::Terminate() { } clients.clear(); + + onloop->Close(); } void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) { diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out new file mode 100644 index 0000000000..94b29f65e6 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connection closed ok +connection closed ok +connection closed ok diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted new file mode 100644 index 0000000000..e446720504 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +B Cluster::websocket_client_added, [/zeek/wstest/ws1/] +B Cluster::websocket_client_added, [/zeek/wstest/ws2/] +B Cluster::websocket_client_added, [/zeek/wstest/ws3/] +D got 1000 pings from 3 clients, terminating diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek new file mode 100644 index 0000000000..530f5d1ecf --- /dev/null +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -0,0 +1,101 @@ +# @TEST-DOC: Regression test for #4420. Clients publish fast and Zeek terminates after receiving 1000 events. Previously this would result in a hang at Zeek shutdown. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +# Force dispatcher queue being full quickly! +redef Cluster::default_websocket_max_event_queue_size = 1; + +global ping_count = 0; +global ping: event(msg: string, c: count) &is_used; + +global clients: set[string] = set(); + +event ping(client: string, n: count) &is_used + { + ++ping_count; + add clients[client]; + + if ( ping_count == 1000 ) + { + print fmt("D got 1000 pings from %s clients, terminating", |clients|); + terminate(); + } + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "B Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "E Cluster::websocket_client_lost"; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/pings/"); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import websockets.exceptions + +import wstest + +def run(ws_url): + with ( + wstest.connect("ws1", ws_url) as tc1, + wstest.connect("ws2", ws_url) as tc2, + wstest.connect("ws3", ws_url) as tc3, + ): + clients = [tc1, tc2, tc3] + for tc in clients: + tc.hello_v1([]) + + stop = False; + i = 0 + + saw_closed_ok = set() + + while len(saw_closed_ok) < 3: + for idx, tc in enumerate(clients, 1): + if idx in saw_closed_ok: # Have seen a ConnectionClosedOK for this client? + continue + + try: + i += 1 + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"tc{idx}", i])) + except websockets.exceptions.ConnectionClosedOK as e: + print("connection closed ok") + assert e.code == 1001 # Remote going away + i -= 1 + saw_closed_ok.add(idx) + + assert len(saw_closed_ok) == 3 + assert i >= 1000, f"expected to send at least 1000 events, only sent {i}" + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE