diff --git a/CHANGES b/CHANGES index 76126938ea..0e71613450 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,23 @@ +8.0.0-dev.44 | 2025-05-06 14:20:22 +0200 + + * GH-4420: Websocket: Close onloop during Terminate() (Arne Welzel, Corelight) + + Terminate() is called when Zeek shuts down. If WebSocket client threads + were blocked in QueueForProcessing() due to reaching queue limits, these + previously would not exit QueueForProcessing() and instead block + indefinitely, resulting in the ixwebsocket library blocking and its + garbage collection thread running at 100%. Not great. + + Closing the onloop instance will unblock the WebSocket client threads + for a timely shutdown. + + Closes #4420 + + * OnLoop: notify_all() instead of notify_one() (Arne Welzel, Corelight) + + There might be more than one thread blocked waiting for room in the + queue, ensure they all wake up when shutting down. + 8.0.0-dev.40 | 2025-05-05 13:40:20 -0700 * Add baseline for find_first test, update comments, and reorder function imports (yexiaochuan) diff --git a/VERSION b/VERSION index 2c094fa9cd..9af9cdb47d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0-dev.40 +8.0.0-dev.44 diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 3ca295162a..86cd6a136e 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -97,8 +97,8 @@ public: std::scoped_lock lock(mtx); SetClosed(true); - // Wake a process stuck in queueing. - cond.notify_one(); + // Wake other threads stuck in queueing, if any. + cond.notify_all(); } // Wait for any active queuers to vanish, should be quick. diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 8ca19a91de..4611e852d4 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -247,6 +247,8 @@ void WebSocketEventDispatcher::Terminate() { } clients.clear(); + + onloop->Close(); } void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) { diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client..stderr @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out new file mode 100644 index 0000000000..94b29f65e6 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..client.out @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +connection closed ok +connection closed ok +connection closed ok diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted new file mode 100644 index 0000000000..e446720504 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.terminate-while-queuing/..manager.out.sorted @@ -0,0 +1,5 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +B Cluster::websocket_client_added, [/zeek/wstest/ws1/] +B Cluster::websocket_client_added, [/zeek/wstest/ws2/] +B Cluster::websocket_client_added, [/zeek/wstest/ws3/] +D got 1000 pings from 3 clients, terminating diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek new file mode 100644 index 0000000000..530f5d1ecf --- /dev/null +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -0,0 +1,101 @@ +# @TEST-DOC: Regression test for #4420. Clients publish fast and Zeek terminates after receiving 1000 events. Previously this would result in a hang at Zeek shutdown. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/out.sorted +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/out +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +# Force dispatcher queue being full quickly! +redef Cluster::default_websocket_max_event_queue_size = 1; + +global ping_count = 0; +global ping: event(msg: string, c: count) &is_used; + +global clients: set[string] = set(); + +event ping(client: string, n: count) &is_used + { + ++ping_count; + add clients[client]; + + if ( ping_count == 1000 ) + { + print fmt("D got 1000 pings from %s clients, terminating", |clients|); + terminate(); + } + } + +event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) + { + print "B Cluster::websocket_client_added", subscriptions; + } + +event Cluster::websocket_client_lost(info: Cluster::EndpointInfo) + { + print "E Cluster::websocket_client_lost"; + } + +event zeek_init() + { + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + Cluster::subscribe("/test/pings/"); + } +# @TEST-END-FILE + + +# @TEST-START-FILE client.py +import websockets.exceptions + +import wstest + +def run(ws_url): + with ( + wstest.connect("ws1", ws_url) as tc1, + wstest.connect("ws2", ws_url) as tc2, + wstest.connect("ws3", ws_url) as tc3, + ): + clients = [tc1, tc2, tc3] + for tc in clients: + tc.hello_v1([]) + + stop = False; + i = 0 + + saw_closed_ok = set() + + while len(saw_closed_ok) < 3: + for idx, tc in enumerate(clients, 1): + if idx in saw_closed_ok: # Have seen a ConnectionClosedOK for this client? + continue + + try: + i += 1 + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"tc{idx}", i])) + except websockets.exceptions.ConnectionClosedOK as e: + print("connection closed ok") + assert e.code == 1001 # Remote going away + i -= 1 + saw_closed_ok.add(idx) + + assert len(saw_closed_ok) == 3 + assert i >= 1000, f"expected to send at least 1000 events, only sent {i}" + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE