Merge remote-tracking branch 'origin/topic/awelzel/4420-simeon-breaking-websockets'

* origin/topic/awelzel/4420-simeon-breaking-websockets:
  Websocket: Close onloop during Terminate()
  OnLoop: notify_all() instead of notify_one()
This commit is contained in:
Arne Welzel 2025-05-06 14:20:22 +02:00
commit 4afb0ffeeb
9 changed files with 138 additions and 3 deletions

20
CHANGES
View file

@ -1,3 +1,23 @@
8.0.0-dev.44 | 2025-05-06 14:20:22 +0200
* GH-4420: Websocket: Close onloop during Terminate() (Arne Welzel, Corelight)
Terminate() is called when Zeek shuts down. If WebSocket client threads
were blocked in QueueForProcessing() due to reaching queue limits, these
previously would not exit QueueForProcessing() and instead block
indefinitely, resulting in the ixwebsocket library blocking and its
garbage collection thread running at 100%. Not great.
Closing the onloop instance will unblock the WebSocket client threads
for a timely shutdown.
Closes #4420
* OnLoop: notify_all() instead of notify_one() (Arne Welzel, Corelight)
There might be more than one thread blocked waiting for room in the
queue, ensure they all wake up when shutting down.
8.0.0-dev.40 | 2025-05-05 13:40:20 -0700
* Add baseline for find_first test, update comments, and reorder function imports (yexiaochuan)

View file

@ -1 +1 @@
8.0.0-dev.40
8.0.0-dev.44

View file

@ -97,8 +97,8 @@ public:
std::scoped_lock lock(mtx);
SetClosed(true);
// Wake a process stuck in queueing.
cond.notify_one();
// Wake other threads stuck in queueing, if any.
cond.notify_all();
}
// Wait for any active queuers to vanish, should be quick.

View file

@ -247,6 +247,8 @@ void WebSocketEventDispatcher::Terminate() {
}
clients.clear();
onloop->Close();
}
void WebSocketEventDispatcher::QueueForProcessing(WebSocketEvent&& event) {

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
connection closed ok
connection closed ok
connection closed ok

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
B Cluster::websocket_client_added, [/zeek/wstest/ws1/]
B Cluster::websocket_client_added, [/zeek/wstest/ws2/]
B Cluster::websocket_client_added, [/zeek/wstest/ws3/]
D got 1000 pings from 3 clients, terminating

View file

@ -0,0 +1,101 @@
# @TEST-DOC: Regression test for #4420. Clients publish fast and Zeek terminates after receiving 1000 events. Previously this would result in a hang at Zeek shutdown.
#
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/ws/wstest.py .
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && zeek -b ../manager.zeek >out"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py >out"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: sort ./manager/out > ./manager/out.sorted
# @TEST-EXEC: btest-diff ./manager/out.sorted
# @TEST-EXEC: btest-diff ./manager/.stderr
# @TEST-EXEC: btest-diff ./client/out
# @TEST-EXEC: btest-diff ./client/.stderr
# @TEST-START-FILE manager.zeek
redef exit_only_after_terminate = T;
# Force dispatcher queue being full quickly!
redef Cluster::default_websocket_max_event_queue_size = 1;
global ping_count = 0;
global ping: event(msg: string, c: count) &is_used;
global clients: set[string] = set();
event ping(client: string, n: count) &is_used
{
++ping_count;
add clients[client];
if ( ping_count == 1000 )
{
print fmt("D got 1000 pings from %s clients, terminating", |clients|);
terminate();
}
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "B Cluster::websocket_client_added", subscriptions;
}
event Cluster::websocket_client_lost(info: Cluster::EndpointInfo)
{
print "E Cluster::websocket_client_lost";
}
event zeek_init()
{
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
Cluster::subscribe("/test/pings/");
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import websockets.exceptions
import wstest
def run(ws_url):
with (
wstest.connect("ws1", ws_url) as tc1,
wstest.connect("ws2", ws_url) as tc2,
wstest.connect("ws3", ws_url) as tc3,
):
clients = [tc1, tc2, tc3]
for tc in clients:
tc.hello_v1([])
stop = False;
i = 0
saw_closed_ok = set()
while len(saw_closed_ok) < 3:
for idx, tc in enumerate(clients, 1):
if idx in saw_closed_ok: # Have seen a ConnectionClosedOK for this client?
continue
try:
i += 1
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [f"tc{idx}", i]))
except websockets.exceptions.ConnectionClosedOK as e:
print("connection closed ok")
assert e.code == 1001 # Remote going away
i -= 1
saw_closed_ok.add(idx)
assert len(saw_closed_ok) == 3
assert i >= 1000, f"expected to send at least 1000 events, only sent {i}"
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE