From 82731992d912aa8de3f77bd953b1052ec3ec1d27 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 7 May 2025 16:12:11 +0200 Subject: [PATCH] wstest/terminate-while-queueing: Patch close_socket() I believe there's a bug/usage issue in the websockets library where during send(), EOF is detected and stored, but the receiving thread is then discarding the last received frame. Avoid the bug by replacing the close_socket() implementation of the websockets library just for that test and leave detecting the EOF condition to the receiving thread. --- testing/btest/Files/ws/wstest.py | 56 +++++++++++++++++++ .../websocket/terminate-while-queuing.zeek | 2 + 2 files changed, 58 insertions(+) diff --git a/testing/btest/Files/ws/wstest.py b/testing/btest/Files/ws/wstest.py index 8286bb2ed2..fa746c882b 100644 --- a/testing/btest/Files/ws/wstest.py +++ b/testing/btest/Files/ws/wstest.py @@ -28,6 +28,7 @@ The wstest.main() helper will retry running test_fun on ConnectionRefusedError. import json import os +import socket import time from typing import Any, Callable, Optional, Union @@ -221,3 +222,58 @@ def main(f: Callable, *args, **kwargs): break except ConnectionRefusedError: time.sleep(MAIN_SLEEP) + + +def monkey_patch_close_socket(): + """ + Monkey patch websockets.sync.ClientConnection.close_socket() + + What's commented out from the original implementation is calling + receive_eof() on self.protocol as well as acknowledge_pending_pings(). + + The reason for doing this is that in the scneario where the websockets + library detects a closed socket during sending, it'll call close_socket() + which in turn invokes protocol.receive_eof(). + + However, when the concurrently running recv_events() thread is racing + and has just successfully received the server's CLOSE frame, the EOF + set on protocol results in an EOFError for the receiving thread instead + of processing the CLOSE frame. It then further reports an + "unexpected internal error". + + Changing the close_socket() implemenation allows the EOF condition + to be set only on the receiving side, avoiding the race. + """ + + def __custom_close_socket(self): + """ + The original implementation is taken from Connection.close_socket() + in sync/connection.py (version 15.0.1). + + """ + # shutdown() is required to interrupt recv() on Linux. + try: + self.socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass # socket is already closed + self.socket.close() + + # Calling protocol.receive_eof() is safe because it's idempotent. + # This guarantees that the protocol state becomes CLOSED. + # + # Commented out: Let the recv_events() threads do EOF handling. + # + # with self.protocol_mutex: + # self.protocol.receive_eof() + # assert self.protocol.state is CLOSED + + # Abort recv() with a ConnectionClosed exception. + self.recv_messages.close() + + # Acknowledge pings sent with the ack_on_close option. + # + # Commented out: Asserts on protcol.state is CLOSED + # + # self.acknowledge_pending_pings() + + ClientConnection.close_socket = __custom_close_socket diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek index 59120eec56..54cc99b428 100644 --- a/testing/btest/cluster/websocket/terminate-while-queuing.zeek +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -64,6 +64,8 @@ import websockets.exceptions import wstest +wstest.monkey_patch_close_socket() + def run(ws_url): with ( wstest.connect("ws1", ws_url) as tc1,