diff --git a/testing/btest/Files/ws/wstest.py b/testing/btest/Files/ws/wstest.py index 8286bb2ed2..fa746c882b 100644 --- a/testing/btest/Files/ws/wstest.py +++ b/testing/btest/Files/ws/wstest.py @@ -28,6 +28,7 @@ The wstest.main() helper will retry running test_fun on ConnectionRefusedError. import json import os +import socket import time from typing import Any, Callable, Optional, Union @@ -221,3 +222,58 @@ def main(f: Callable, *args, **kwargs): break except ConnectionRefusedError: time.sleep(MAIN_SLEEP) + + +def monkey_patch_close_socket(): + """ + Monkey patch websockets.sync.ClientConnection.close_socket() + + What's commented out from the original implementation is calling + receive_eof() on self.protocol as well as acknowledge_pending_pings(). + + The reason for doing this is that in the scneario where the websockets + library detects a closed socket during sending, it'll call close_socket() + which in turn invokes protocol.receive_eof(). + + However, when the concurrently running recv_events() thread is racing + and has just successfully received the server's CLOSE frame, the EOF + set on protocol results in an EOFError for the receiving thread instead + of processing the CLOSE frame. It then further reports an + "unexpected internal error". + + Changing the close_socket() implemenation allows the EOF condition + to be set only on the receiving side, avoiding the race. + """ + + def __custom_close_socket(self): + """ + The original implementation is taken from Connection.close_socket() + in sync/connection.py (version 15.0.1). + + """ + # shutdown() is required to interrupt recv() on Linux. + try: + self.socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass # socket is already closed + self.socket.close() + + # Calling protocol.receive_eof() is safe because it's idempotent. + # This guarantees that the protocol state becomes CLOSED. + # + # Commented out: Let the recv_events() threads do EOF handling. + # + # with self.protocol_mutex: + # self.protocol.receive_eof() + # assert self.protocol.state is CLOSED + + # Abort recv() with a ConnectionClosed exception. + self.recv_messages.close() + + # Acknowledge pings sent with the ack_on_close option. + # + # Commented out: Asserts on protcol.state is CLOSED + # + # self.acknowledge_pending_pings() + + ClientConnection.close_socket = __custom_close_socket diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek index 59120eec56..54cc99b428 100644 --- a/testing/btest/cluster/websocket/terminate-while-queuing.zeek +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -64,6 +64,8 @@ import websockets.exceptions import wstest +wstest.monkey_patch_close_socket() + def run(ws_url): with ( wstest.connect("ws1", ws_url) as tc1,