diff --git a/CHANGES b/CHANGES index 6a8d396a87..90adf6df42 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,13 @@ +7.2.0-rc1.12 | 2025-05-07 17:25:07 +0200 + + * btest/cluster/generic/publish-any: Apply Christian's fix from broker/publish-any (Arne Welzel, Corelight) + + (cherry picked from commit 8089f5bed422e0da38dde631f88d6371c3d66c2f) + + * wstest/terminate-while-queueing: Patch close_socket() (Arne Welzel, Corelight) + + (cherry picked from commit 8089f5bed422e0da38dde631f88d6371c3d66c2f) + 7.2.0-rc1.11 | 2025-05-07 14:06:06 +0200 * cluster/websocket: Stop and wait for reply thread during Terminate() (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index d9fec82f98..4ca1764b33 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-rc1.11 +7.2.0-rc1.12 diff --git a/testing/btest/Files/ws/wstest.py b/testing/btest/Files/ws/wstest.py index 8286bb2ed2..fa746c882b 100644 --- a/testing/btest/Files/ws/wstest.py +++ b/testing/btest/Files/ws/wstest.py @@ -28,6 +28,7 @@ The wstest.main() helper will retry running test_fun on ConnectionRefusedError. import json import os +import socket import time from typing import Any, Callable, Optional, Union @@ -221,3 +222,58 @@ def main(f: Callable, *args, **kwargs): break except ConnectionRefusedError: time.sleep(MAIN_SLEEP) + + +def monkey_patch_close_socket(): + """ + Monkey patch websockets.sync.ClientConnection.close_socket() + + What's commented out from the original implementation is calling + receive_eof() on self.protocol as well as acknowledge_pending_pings(). + + The reason for doing this is that in the scneario where the websockets + library detects a closed socket during sending, it'll call close_socket() + which in turn invokes protocol.receive_eof(). + + However, when the concurrently running recv_events() thread is racing + and has just successfully received the server's CLOSE frame, the EOF + set on protocol results in an EOFError for the receiving thread instead + of processing the CLOSE frame. It then further reports an + "unexpected internal error". + + Changing the close_socket() implemenation allows the EOF condition + to be set only on the receiving side, avoiding the race. + """ + + def __custom_close_socket(self): + """ + The original implementation is taken from Connection.close_socket() + in sync/connection.py (version 15.0.1). + + """ + # shutdown() is required to interrupt recv() on Linux. + try: + self.socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass # socket is already closed + self.socket.close() + + # Calling protocol.receive_eof() is safe because it's idempotent. + # This guarantees that the protocol state becomes CLOSED. + # + # Commented out: Let the recv_events() threads do EOF handling. + # + # with self.protocol_mutex: + # self.protocol.receive_eof() + # assert self.protocol.state is CLOSED + + # Abort recv() with a ConnectionClosed exception. + self.recv_messages.close() + + # Acknowledge pings sent with the ack_on_close option. + # + # Commented out: Asserts on protcol.state is CLOSED + # + # self.acknowledge_pending_pings() + + ClientConnection.close_socket = __custom_close_socket diff --git a/testing/btest/cluster/generic/publish-any.zeek b/testing/btest/cluster/generic/publish-any.zeek index 2078019f17..3866bd9812 100644 --- a/testing/btest/cluster/generic/publish-any.zeek +++ b/testing/btest/cluster/generic/publish-any.zeek @@ -57,8 +57,6 @@ event send_any() local e = Cluster::make_event(ping, i, type_name(val), val); Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); ++i; - - schedule 0.05sec { send_any() }; } event pong(c: count, what: string, val: any) @@ -66,10 +64,16 @@ event pong(c: count, what: string, val: any) ++pongs; print "got pong", pongs, "with", c, what, type_name(val), val; - # We send 5 pings in 3 different variations and - # get two pongs for each. + # The manager sends 5 types of pings, in 3 different ways. The worker + # answers each ping in two ways, for a total of 30 expected pongs at the + # manager. Every batch of pings involves 6 pongs. if ( pongs == 30 ) Cluster::publish(Cluster::worker_topic, finish); + else if ( pongs > 0 && pongs % 6 == 0 ) + { + # Wait for a batch to complete before sending the next. + event send_any(); + } } event Cluster::node_up(name: string, id: string) diff --git a/testing/btest/cluster/websocket/terminate-while-queuing.zeek b/testing/btest/cluster/websocket/terminate-while-queuing.zeek index 59120eec56..54cc99b428 100644 --- a/testing/btest/cluster/websocket/terminate-while-queuing.zeek +++ b/testing/btest/cluster/websocket/terminate-while-queuing.zeek @@ -64,6 +64,8 @@ import websockets.exceptions import wstest +wstest.monkey_patch_close_socket() + def run(ws_url): with ( wstest.connect("ws1", ws_url) as tc1,