mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Websocket: Close onloop during Terminate()
Terminate() is called when Zeek shuts down. If WebSocket client threads were blocked in QueueForProcessing() due to reaching queue limits, these previously would not exit QueueForProcessing() and instead block indefinitely, resulting in the ixwebsocket library blocking and its garbage collection thread running at 100%. Not great. Closing the onloop instance will unblock the WebSocket client threads for a timely shutdown. Closes #4420
This commit is contained in:
parent
6ebec6dde7
commit
bb06af601f
6 changed files with 115 additions and 0 deletions
101
testing/btest/cluster/websocket/terminate-while-queuing.zeek
Normal file
101
testing/btest/cluster/websocket/terminate-while-queuing.zeek
Normal file
|
@ -0,0 +1,101 @@
|
|||
# @TEST-DOC: Regression test for #4420. Clients publish fast and Zeek terminates after receiving 1000 events. Previously this would result in a hang at Zeek shutdown.
|
||||
#
|
||||
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
|
||||
#
|
||||
# @TEST-PORT: WEBSOCKET_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/ws/wstest.py .
|
||||
# @TEST-EXEC: zeek -b --parse-only manager.zeek
|
||||
# @TEST-EXEC: python3 -m py_compile client.py
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted
|
||||
# @TEST-EXEC: btest-diff ./manager/out.sorted
|
||||
# @TEST-EXEC: btest-diff ./manager/.stderr
|
||||
# @TEST-EXEC: btest-diff ./client/out
|
||||
# @TEST-EXEC: btest-diff ./client/.stderr
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
redef exit_only_after_terminate = T;
|
||||
|
||||
# Force dispatcher queue being full quickly!
|
||||
redef Cluster::default_websocket_max_event_queue_size = 1;
|
||||
|
||||
global ping_count = 0;
|
||||
global ping: event(msg: string, c: count) &is_used;
|
||||
|
||||
global clients: set[string] = set();
|
||||
|
||||
event ping(client: string, n: count) &is_used
|
||||
{
|
||||
++ping_count;
|
||||
add clients[client];
|
||||
|
||||
if ( ping_count == 1000 )
|
||||
{
|
||||
print fmt("D got 1000 pings from %s clients, terminating", |clients|);
|
||||
terminate();
|
||||
}
|
||||
}
|
||||
|
||||
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
|
||||
{
|
||||
print "B Cluster::websocket_client_added", subscriptions;
|
||||
}
|
||||
|
||||
event Cluster::websocket_client_lost(info: Cluster::EndpointInfo)
|
||||
{
|
||||
print "E Cluster::websocket_client_lost";
|
||||
}
|
||||
|
||||
event zeek_init()
|
||||
{
|
||||
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
|
||||
Cluster::subscribe("/test/pings/");
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
|
||||
# @TEST-START-FILE client.py
|
||||
import websockets.exceptions
|
||||
|
||||
import wstest
|
||||
|
||||
def run(ws_url):
|
||||
with (
|
||||
wstest.connect("ws1", ws_url) as tc1,
|
||||
wstest.connect("ws2", ws_url) as tc2,
|
||||
wstest.connect("ws3", ws_url) as tc3,
|
||||
):
|
||||
clients = [tc1, tc2, tc3]
|
||||
for tc in clients:
|
||||
tc.hello_v1([])
|
||||
|
||||
stop = False;
|
||||
i = 0
|
||||
|
||||
saw_closed_ok = set()
|
||||
|
||||
while len(saw_closed_ok) < 3:
|
||||
for idx, tc in enumerate(clients, 1):
|
||||
if idx in saw_closed_ok: # Have seen a ConnectionClosedOK for this client?
|
||||
continue
|
||||
|
||||
try:
|
||||
i += 1
|
||||
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"tc{idx}", i]))
|
||||
except websockets.exceptions.ConnectionClosedOK as e:
|
||||
print("connection closed ok")
|
||||
assert e.code == 1001 # Remote going away
|
||||
i -= 1
|
||||
saw_closed_ok.add(idx)
|
||||
|
||||
assert len(saw_closed_ok) == 3
|
||||
assert i >= 1000, f"expected to send at least 1000 events, only sent {i}"
|
||||
|
||||
if __name__ == "__main__":
|
||||
wstest.main(run, wstest.WS4_URL_V1)
|
||||
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue