Merge remote-tracking branch 'origin/topic/awelzel/more-terminate-while-queueing-hardening'

* origin/topic/awelzel/more-terminate-while-queueing-hardening:
  btest/cluster/generic/publish-any: Apply Christian's fix from broker/publish-any
  wstest/terminate-while-queueing: Patch close_socket()

(cherry picked from commit 8089f5bed4)
This commit is contained in:
Arne Welzel 2025-05-07 17:23:54 +02:00
parent 3ee6a3d6c0
commit e326e31d7e
5 changed files with 77 additions and 5 deletions

10
CHANGES
View file

@ -1,3 +1,13 @@
7.2.0-rc1.12 | 2025-05-07 17:25:07 +0200
* btest/cluster/generic/publish-any: Apply Christian's fix from broker/publish-any (Arne Welzel, Corelight)
(cherry picked from commit 8089f5bed422e0da38dde631f88d6371c3d66c2f)
* wstest/terminate-while-queueing: Patch close_socket() (Arne Welzel, Corelight)
(cherry picked from commit 8089f5bed422e0da38dde631f88d6371c3d66c2f)
7.2.0-rc1.11 | 2025-05-07 14:06:06 +0200 7.2.0-rc1.11 | 2025-05-07 14:06:06 +0200
* cluster/websocket: Stop and wait for reply thread during Terminate() (Arne Welzel, Corelight) * cluster/websocket: Stop and wait for reply thread during Terminate() (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.2.0-rc1.11 7.2.0-rc1.12

View file

@ -28,6 +28,7 @@ The wstest.main() helper will retry running test_fun on ConnectionRefusedError.
import json import json
import os import os
import socket
import time import time
from typing import Any, Callable, Optional, Union from typing import Any, Callable, Optional, Union
@ -221,3 +222,58 @@ def main(f: Callable, *args, **kwargs):
break break
except ConnectionRefusedError: except ConnectionRefusedError:
time.sleep(MAIN_SLEEP) time.sleep(MAIN_SLEEP)
def monkey_patch_close_socket():
"""
Monkey patch websockets.sync.ClientConnection.close_socket()
What's commented out from the original implementation is calling
receive_eof() on self.protocol as well as acknowledge_pending_pings().
The reason for doing this is that in the scneario where the websockets
library detects a closed socket during sending, it'll call close_socket()
which in turn invokes protocol.receive_eof().
However, when the concurrently running recv_events() thread is racing
and has just successfully received the server's CLOSE frame, the EOF
set on protocol results in an EOFError for the receiving thread instead
of processing the CLOSE frame. It then further reports an
"unexpected internal error".
Changing the close_socket() implemenation allows the EOF condition
to be set only on the receiving side, avoiding the race.
"""
def __custom_close_socket(self):
"""
The original implementation is taken from Connection.close_socket()
in sync/connection.py (version 15.0.1).
"""
# shutdown() is required to interrupt recv() on Linux.
try:
self.socket.shutdown(socket.SHUT_RDWR)
except OSError:
pass # socket is already closed
self.socket.close()
# Calling protocol.receive_eof() is safe because it's idempotent.
# This guarantees that the protocol state becomes CLOSED.
#
# Commented out: Let the recv_events() threads do EOF handling.
#
# with self.protocol_mutex:
# self.protocol.receive_eof()
# assert self.protocol.state is CLOSED
# Abort recv() with a ConnectionClosed exception.
self.recv_messages.close()
# Acknowledge pings sent with the ack_on_close option.
#
# Commented out: Asserts on protcol.state is CLOSED
#
# self.acknowledge_pending_pings()
ClientConnection.close_socket = __custom_close_socket

View file

@ -57,8 +57,6 @@ event send_any()
local e = Cluster::make_event(ping, i, type_name(val), val); local e = Cluster::make_event(ping, i, type_name(val), val);
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e); Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
++i; ++i;
schedule 0.05sec { send_any() };
} }
event pong(c: count, what: string, val: any) event pong(c: count, what: string, val: any)
@ -66,10 +64,16 @@ event pong(c: count, what: string, val: any)
++pongs; ++pongs;
print "got pong", pongs, "with", c, what, type_name(val), val; print "got pong", pongs, "with", c, what, type_name(val), val;
# We send 5 pings in 3 different variations and # The manager sends 5 types of pings, in 3 different ways. The worker
# get two pongs for each. # answers each ping in two ways, for a total of 30 expected pongs at the
# manager. Every batch of pings involves 6 pongs.
if ( pongs == 30 ) if ( pongs == 30 )
Cluster::publish(Cluster::worker_topic, finish); Cluster::publish(Cluster::worker_topic, finish);
else if ( pongs > 0 && pongs % 6 == 0 )
{
# Wait for a batch to complete before sending the next.
event send_any();
}
} }
event Cluster::node_up(name: string, id: string) event Cluster::node_up(name: string, id: string)

View file

@ -64,6 +64,8 @@ import websockets.exceptions
import wstest import wstest
wstest.monkey_patch_close_socket()
def run(ws_url): def run(ws_url):
with ( with (
wstest.connect("ws1", ws_url) as tc1, wstest.connect("ws1", ws_url) as tc1,