mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00

I believe there's a bug/usage issue in the websockets library where during send(), EOF is detected and stored, but the receiving thread is then discarding the last received frame. Avoid the bug by replacing the close_socket() implementation of the websockets library just for that test and leave detecting the EOF condition to the receiving thread.
279 lines
7.7 KiB
Python
279 lines
7.7 KiB
Python
"""
|
|
Testing library for WebSocket tests.
|
|
|
|
Usage in btests
|
|
===============
|
|
|
|
Add a TEST-EXEC line to make the library available in the test's directory:
|
|
|
|
# @TEST-EXEC: cp ${FILES}/ws/wstest.py .
|
|
|
|
Then, in a Python file for the WebSocket client, do something like the
|
|
following to connect, subscribe to /test/topic and send a ping event.
|
|
|
|
import wstest
|
|
|
|
def test_fun(url):
|
|
with wstest.connect("client", url) as tc:
|
|
ack = tc.hello_v1("/test/topic")
|
|
print(ack)
|
|
|
|
tc.send_json(wstest.build_event_v1("/test/topic", "ping", ["hello"]))
|
|
|
|
if __name__ == "__main__":
|
|
wstest.main(test_fun, wstest.WS4_URL_V1)
|
|
|
|
The wstest.main() helper will retry running test_fun on ConnectionRefusedError.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import socket
|
|
import time
|
|
from typing import Any, Callable, Optional, Union
|
|
|
|
import websockets.sync.client
|
|
from websockets.sync.client import ClientConnection
|
|
|
|
WS_PORT = (
|
|
int(os.environ["WEBSOCKET_PORT"].split("/")[0])
|
|
if "WEBSOCKET_PORT" in os.environ
|
|
else 0
|
|
)
|
|
|
|
# IPv4 non-secure WebSocker URL for version 1
|
|
WS4_URL_V1 = f"ws://127.0.0.1:{WS_PORT}/v1/messages/json"
|
|
|
|
DEFAULT_RECV_TIMEOUT = 0.1
|
|
OWN_TOPIC_PREFIX = "/zeek/wstest"
|
|
|
|
MAIN_TRIES = 200
|
|
MAIN_SLEEP = 0.05
|
|
|
|
|
|
class TestClient:
|
|
"""
|
|
Helper class wrapping a websockets ClientConnection
|
|
with a bit of convenience.
|
|
"""
|
|
|
|
def __init__(self, name: str, cc: ClientConnection):
|
|
self.__name = name
|
|
self.__cc = cc
|
|
self.__own_topic = f"{OWN_TOPIC_PREFIX}/{self.name}/"
|
|
|
|
def __enter__(self) -> "TestClient":
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback) -> None:
|
|
self.__cc.__exit__(exc_type, exc_value, traceback)
|
|
|
|
def hello_v1(self, topics: list[str]):
|
|
self.send_json([self.__own_topic] + topics[:])
|
|
ack = self.recv_json()
|
|
assert "type" in ack, repr(ack)
|
|
assert ack["type"] == "ack"
|
|
assert "endpoint" in ack, repr(ack)
|
|
assert "version" in ack
|
|
return ack
|
|
|
|
def recv_json(self, timeout=DEFAULT_RECV_TIMEOUT) -> dict:
|
|
raw = self.__cc.recv(timeout=timeout)
|
|
return json.loads(raw)
|
|
|
|
def send_json(self, data) -> None:
|
|
return self.__cc.send(json.dumps(data))
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.__name
|
|
|
|
|
|
def connect(name: str, url: Optional[str] = None) -> TestClient:
|
|
"""
|
|
Connect to a WebSocket server and return a TestClient instance.
|
|
"""
|
|
if url is None:
|
|
url = WS4_URL_V1
|
|
|
|
cc = websockets.sync.client.connect(url)
|
|
return TestClient(name, cc)
|
|
|
|
|
|
def recv_until_timeout(
|
|
clients: list[TestClient], *, timeout: float = DEFAULT_RECV_TIMEOUT, desc=None
|
|
):
|
|
"""
|
|
Read events from all entries in clients, stopping when all timeout,
|
|
otherwise print status messages.
|
|
"""
|
|
if desc:
|
|
print(f"recv_until_timeout: {desc}")
|
|
|
|
all_timeout = False
|
|
while not all_timeout:
|
|
msgs = []
|
|
all_timeout = True
|
|
for tc in clients:
|
|
data = None
|
|
try:
|
|
data = tc.recv_json(timeout=timeout)
|
|
ev = data["data"][2]["data"]
|
|
info = {
|
|
"client": tc.name,
|
|
"topic": data["topic"],
|
|
"event_name": ev[0]["data"],
|
|
"event_args": ev[1]["data"],
|
|
}
|
|
msgs += [info]
|
|
all_timeout = False
|
|
except TimeoutError:
|
|
msgs += [{"client": tc.name, "timeout": True}]
|
|
|
|
if not all_timeout:
|
|
for msg in msgs:
|
|
print(json.dumps(msg))
|
|
|
|
|
|
class TypedArg:
|
|
"""
|
|
Helper class for V1 event arguments.
|
|
"""
|
|
|
|
def __init__(self, typ: str, value: Any):
|
|
self.typ = typ
|
|
self.value = value
|
|
|
|
|
|
EventArg = Union[str, int, TypedArg]
|
|
|
|
|
|
def build_event_arg_v1(arg: EventArg):
|
|
"""
|
|
Convert arg into a Broker WebSocket v1 dict with keys `@data-type` and `data`.
|
|
|
|
This only supports `str`, `int` and `TypedArg` arguments.
|
|
"""
|
|
data_type: Union[str, None] = None
|
|
data: Any = None
|
|
|
|
if isinstance(arg, int):
|
|
data_type = "count"
|
|
data = arg
|
|
elif isinstance(arg, str):
|
|
data_type = "string"
|
|
data = arg
|
|
elif isinstance(arg, TypedArg):
|
|
data_type = arg.typ
|
|
data = arg.value
|
|
else:
|
|
raise TypeError(f"Unsupported arg {arg!r} of type {type(arg)}")
|
|
|
|
return {
|
|
"@data-type": data_type,
|
|
"data": data,
|
|
}
|
|
|
|
|
|
def build_event_v1(
|
|
topic: str, event_name: str, args: Optional[list[EventArg]] = None
|
|
) -> dict:
|
|
"""
|
|
Build an event for testing using the quirky v1 format.
|
|
|
|
Arguments can be whatever is supported by build_event_arg_v1().
|
|
"""
|
|
if args is None:
|
|
args = []
|
|
|
|
event_args = []
|
|
|
|
for arg in args:
|
|
event_args += [build_event_arg_v1(arg)]
|
|
|
|
return {
|
|
"type": "data-message",
|
|
"topic": topic,
|
|
"@data-type": "vector",
|
|
"data": [
|
|
{"@data-type": "count", "data": 1}, # Format
|
|
{"@data-type": "count", "data": 1}, # Type
|
|
{
|
|
"@data-type": "vector",
|
|
"data": [ # Event vector
|
|
{"@data-type": "string", "data": event_name},
|
|
{"@data-type": "vector", "data": event_args},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
def main(f: Callable, *args, **kwargs):
|
|
"""
|
|
Wrapper to start a test retrying on ConnectionRefusedError.
|
|
|
|
This handles ConnectionRefusedError and invokes f after sleeping a bit with
|
|
the assumption that the WebSocket server wasn't yet available.
|
|
"""
|
|
for _ in range(MAIN_TRIES):
|
|
try:
|
|
f(*args, **kwargs)
|
|
break
|
|
except ConnectionRefusedError:
|
|
time.sleep(MAIN_SLEEP)
|
|
|
|
|
|
def monkey_patch_close_socket():
|
|
"""
|
|
Monkey patch websockets.sync.ClientConnection.close_socket()
|
|
|
|
What's commented out from the original implementation is calling
|
|
receive_eof() on self.protocol as well as acknowledge_pending_pings().
|
|
|
|
The reason for doing this is that in the scneario where the websockets
|
|
library detects a closed socket during sending, it'll call close_socket()
|
|
which in turn invokes protocol.receive_eof().
|
|
|
|
However, when the concurrently running recv_events() thread is racing
|
|
and has just successfully received the server's CLOSE frame, the EOF
|
|
set on protocol results in an EOFError for the receiving thread instead
|
|
of processing the CLOSE frame. It then further reports an
|
|
"unexpected internal error".
|
|
|
|
Changing the close_socket() implemenation allows the EOF condition
|
|
to be set only on the receiving side, avoiding the race.
|
|
"""
|
|
|
|
def __custom_close_socket(self):
|
|
"""
|
|
The original implementation is taken from Connection.close_socket()
|
|
in sync/connection.py (version 15.0.1).
|
|
|
|
"""
|
|
# shutdown() is required to interrupt recv() on Linux.
|
|
try:
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
except OSError:
|
|
pass # socket is already closed
|
|
self.socket.close()
|
|
|
|
# Calling protocol.receive_eof() is safe because it's idempotent.
|
|
# This guarantees that the protocol state becomes CLOSED.
|
|
#
|
|
# Commented out: Let the recv_events() threads do EOF handling.
|
|
#
|
|
# with self.protocol_mutex:
|
|
# self.protocol.receive_eof()
|
|
# assert self.protocol.state is CLOSED
|
|
|
|
# Abort recv() with a ConnectionClosed exception.
|
|
self.recv_messages.close()
|
|
|
|
# Acknowledge pings sent with the ack_on_close option.
|
|
#
|
|
# Commented out: Asserts on protcol.state is CLOSED
|
|
#
|
|
# self.acknowledge_pending_pings()
|
|
|
|
ClientConnection.close_socket = __custom_close_socket
|