cluster/websocket: Propagate code and reason to websocket_client_lost()

This allows to get visibility into the reason why ixwebsocket or the
client decided to disconnect.

Closed #4440
This commit is contained in:
Arne Welzel 2025-05-09 16:23:17 +02:00
parent aaddeb19ad
commit a61aff010f
11 changed files with 131 additions and 6 deletions

View file

@ -680,10 +680,11 @@ event websocket_client_added(endpoint: EndpointInfo, subscriptions: string_vec)
Cluster::log(msg); Cluster::log(msg);
} }
event websocket_client_lost(endpoint: EndpointInfo) event websocket_client_lost(endpoint: EndpointInfo, code: count, reason: string)
{ {
local msg = fmt("WebSocket client '%s' (%s:%d) gone", local msg = fmt("WebSocket client '%s' (%s:%d) gone with code %d%s",
endpoint$id, endpoint$network$address, endpoint$network$bound_port); endpoint$id, endpoint$network$address, endpoint$network$bound_port, code,
|reason| > 0 ? fmt(" and reason '%s'", reason) : "");
Cluster::log(msg); Cluster::log(msg);
} }

View file

@ -135,7 +135,8 @@ std::unique_ptr<WebSocketServer> StartServer(std::unique_ptr<WebSocketEventDispa
dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str}); dispatcher->QueueForProcessing(WebSocketMessage{id, msg->str});
} }
else if ( msg->type == ix::WebSocketMessageType::Close ) { else if ( msg->type == ix::WebSocketMessageType::Close ) {
dispatcher->QueueForProcessing(WebSocketClose{id}); auto& ci = msg->closeInfo;
dispatcher->QueueForProcessing(WebSocketClose{id, ci.code, std::move(ci.reason)});
} }
else if ( msg->type == ix::WebSocketMessageType::Error ) { else if ( msg->type == ix::WebSocketMessageType::Error ) {
dispatcher->QueueForProcessing(WebSocketClose{id}); dispatcher->QueueForProcessing(WebSocketClose{id});

View file

@ -356,7 +356,8 @@ void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
// should be the last event related to this WebSocket client. // should be the last event related to this WebSocket client.
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
wsc->getRemotePort(), TRANSPORT_TCP); wsc->getRemotePort(), TRANSPORT_TCP);
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec)); zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec), zeek::val_mgr->Count(close.code),
zeek::make_intrusive<zeek::StringVal>(close.reason));
} }
clients.erase(it); clients.erase(it);

View file

@ -131,6 +131,8 @@ struct WebSocketOpen {
// A WebSocket client disconnected. // A WebSocket client disconnected.
struct WebSocketClose { struct WebSocketClose {
std::string id; std::string id;
uint16_t code;
std::string reason;
}; };
// A WebSocket client send a message. // A WebSocket client send a message.

View file

@ -10,4 +10,8 @@ event websocket_client_added%(endpoint: EndpointInfo, subscriptions: string_vec%
## Generated when a WebSocket client was lost. ## Generated when a WebSocket client was lost.
## ##
## endpoint: Various information about the WebSocket client. ## endpoint: Various information about the WebSocket client.
event websocket_client_lost%(endpoint: EndpointInfo%); ## code: The code sent by the client in its CLOSE frame, or a code generated
## internally if the server disconnected the client.
## reason: The reason sent by the client in its CLOSE frame, or a reason generated
## internally if the server disconnected the client.
event websocket_client_lost%(endpoint: EndpointInfo, code: count, reason: string%);

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received 1011 (internal error) Ping timeout; then sent 1011 (internal error) Ping timeout
received 1011 (internal error) Ping timeout; then sent 1011 (internal error) Ping timeout

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to []
WebSocket client <nodeid> (127.0.0.1:<port>) subscribed to []
WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1011 and reason 'Ping timeout'
WebSocket client <nodeid> (127.0.0.1:<port>) gone with code 1011 and reason 'Ping timeout'

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, []
Cluster::websocket_client_added, []
Cluster::websocket_client_lost, 1011, Ping timeout
Cluster::websocket_client_lost, 1011, Ping timeout

View file

@ -0,0 +1,100 @@
# @TEST-DOC: Ensure the websocket_client_lost() event contains code and reason. This starts two WebSocket client that aren't replying to PING frames.
#
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/ws/wstest.py .
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/out
# @TEST-EXEC: btest-diff ./manager/.stderr
# @TEST-EXEC: zeek-cut message < ./manager/cluster.log | sed -r "s/client '.+' /client <nodeid> /g" | sed -r "s/:[0-9]+/:<port>/g" > ./manager/cluster.log.cannonified
# @TEST-EXEC: btest-diff ./manager/cluster.log.cannonified
# @TEST-EXEC: btest-diff ./client/out
# @TEST-EXEC: btest-diff ./client/.stderr
# @TEST-START-FILE manager.zeek
redef exit_only_after_terminate = T;
global lost = 0;
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
event Cluster::websocket_client_lost(info: Cluster::EndpointInfo, code: count, reason: string)
{
++lost;
print "Cluster::websocket_client_lost", code, reason;
if ( lost == 2 )
terminate();
}
event zeek_init()
{
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT")), $ping_interval=1sec]);
Cluster::subscribe("/test/pings/");
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import json
import functools
import wstest
from websockets.sync.client import connect
from websockets.sync.client import ClientConnection
from websockets.frames import OP_PONG
class MyClientConnection(ClientConnection):
"""
Custom Client class patching the protocol.send_frame() function
to discard any PONG frames. The websocket library responds
automatically to these in a thread and can't easily turn this off,
but we want to test Zeek behavior when a client fails to respond
with PONG frames quickly enough.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__orig_send_frame = self.protocol.send_frame
def __my_send_frame(_self, frame):
if frame.opcode != OP_PONG:
self.__orig_send_frame(frame)
self.protocol.send_frame = functools.partial(__my_send_frame, self.protocol)
def run(ws_url):
with (
connect(ws_url, create_connection=MyClientConnection) as c1,
connect(ws_url, create_connection=MyClientConnection) as c2,
):
c1.send(json.dumps([]))
ack1 = json.loads(c1.recv())
assert ack1["type"] == "ack", repr(ack1)
c2.send(json.dumps([]))
ack2 = json.loads(c2.recv())
assert ack2["type"] == "ack", repr(ack2)
try:
c1.recv()
except Exception as e:
print(e)
try:
c2.recv()
except Exception as e:
print(e)
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE