From 6ebec6dde7789b52ed11639f675916565c62d141 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 May 2025 11:52:57 +0200 Subject: [PATCH 1/2] OnLoop: notify_all() instead of notify_one() There might be more than one thread blocked waiting for room in the queue, ensure they all wake up when shutting down. --- src/cluster/OnLoop.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 3ca295162a..86cd6a136e 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -97,8 +97,8 @@ public: std::scoped_lock lock(mtx); SetClosed(true); - // Wake a process stuck in queueing. - cond.notify_one(); + // Wake other threads stuck in queueing, if any. + cond.notify_all(); } // Wait for any active queuers to vanish, should be quick. From bb06af601f69402c11faeda60ee9b6b72c7ebadb Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 6 May 2025 11:53:15 +0200 Subject: [PATCH 2/2] Websocket: Close onloop during Terminate() Terminate() is called when Zeek shuts down. If WebSocket client threads were blocked in QueueForProcessing() due to reaching queue limits, these previously would not exit QueueForProcessing() and instead block indefinitely, resulting in the ixwebsocket library blocking and its garbage collection thread running at 100%. Not great. Closing the onloop instance will unblock the WebSocket client threads for a timely shutdown. Closes #4420 --- src/cluster/websocket/WebSocket.cc | 2 + .../..client..stderr | 1 + .../..client.out | 4 + .../..manager..stderr | 2 + .../..manager.out.sorted | 5 + .../websocket/terminate-while-queuing.zeek | 101 ++++++++++++++++++ 6 files changed, 115 insertions(+) create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr create mode 100644 testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted create mode 100644 testing/btest/cluster/websocket/terminate-while-queuing.zeek diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 8ca19a91de..4611e852d4 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -247,6 +247,8 @@ void WebSocketEventDispatcher::Terminate() { } clients.clear(); + + onloop->Close(); } void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) { diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out new file mode 100644 index 0000000000..94b29f65e6 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connection closed ok +connection closed ok +connection closed ok diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted new file mode 100644 index 0000000000..e446720504 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +B Cluster::websocket_client_added, [/zeek/wstest/ws1/] +B Cluster::websocket_client_added, [/zeek/wstest/ws2/] +B Cluster::websocket_client_added, [/zeek/wstest/ws3/] +D got 1000 pings from 3 clients, terminating diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek new file mode 100644 index 0000000000..530f5d1ecf --- /dev/null +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -0,0 +1,101 @@ +# @TEST-DOC: Regression test for #4420. Clients publish fast and Zeek terminates after receiving 1000 events. Previously this would result in a hang at Zeek shutdown. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +# Force dispatcher queue being full quickly! +redef Cluster::default_websocket_max_event_queue_size = 1; + +global ping_count = 0; +global ping: event(msg: string, c: count) &is_used; + +global clients: set[string] = set(); + +event ping(client: string, n: count) &is_used + { + ++ping_count; + add clients[client]; + + if ( ping_count == 1000 ) + { + print fmt("D got 1000 pings from %s clients, terminating", |clients|); + terminate(); + } + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "B Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "E Cluster::websocket_client_lost"; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/pings/"); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import websockets.exceptions + +import wstest + +def run(ws_url): + with ( + wstest.connect("ws1", ws_url) as tc1, + wstest.connect("ws2", ws_url) as tc2, + wstest.connect("ws3", ws_url) as tc3, + ): + clients = [tc1, tc2, tc3] + for tc in clients: + tc.hello_v1([]) + + stop = False; + i = 0 + + saw_closed_ok = set() + + while len(saw_closed_ok) < 3: + for idx, tc in enumerate(clients, 1): + if idx in saw_closed_ok: # Have seen a ConnectionClosedOK for this client? + continue + + try: + i += 1 + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"tc{idx}", i])) + except websockets.exceptions.ConnectionClosedOK as e: + print("connection closed ok") + assert e.code == 1001 # Remote going away + i -= 1 + saw_closed_ok.add(idx) + + assert len(saw_closed_ok) == 3 + assert i >= 1000, f"expected to send at least 1000 events, only sent {i}" + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE