btest/cluster/telemetry: Add smoke testing for telemetry

This commit is contained in:
Arne Welzel 2025-06-20 14:41:14 +02:00
parent b28e5f261e
commit 0e1431eef4
10 changed files with 347 additions and 2 deletions

View file

@ -90,14 +90,18 @@ class TestClient:
return self.__name
def connect(name: str, url: Optional[str] = None) -> TestClient:
def connect(
name: str,
url: Optional[str] = None,
additional_headers: Optional[dict[str, str]] = None,
) -> TestClient:
"""
Connect to a WebSocket server and return a TestClient instance.
"""
if url is None:
url = WS4_URL_V1
cc = websockets.sync.client.connect(url)
cc = websockets.sync.client.connect(url, additional_headers=additional_headers)
return TestClient(name, cc)