diff --git a/CHANGES b/CHANGES index e7842cbd4c..c24c5eb429 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,26 @@ +7.2.0-dev.624 | 2025-04-24 14:03:52 +0200 + + * cluster/websocket: Raise websocket_client_lost() after terminate (Arne Welzel, Corelight) + + Just in case events are created during backend->Terminate(). These + should come before the Cluster::websocket_client_lost() event. + + * cluster/ThreadedBackend: Invoke onloop->Process() during DoTerminate() (Arne Welzel, Corelight) + + Also, document how to use ThreadedBackend's DoTerminate() + + * cluster/ThreadedBackend: Remove Process() (Arne Welzel, Corelight) + + This must have been left-over from before OnLoopProcess existed. It + wasn't called or used anymore. + + * zeromq: Call super class DoTerminate() after stopping thread (Arne Welzel, Corelight) + + The internal ZeroMQ thread would call QueueForProcessing() thereby + accessing the onloop member. As ThreadedBackend::DoTerminate() unsets it, + this was a) reported as a data race by TSAN and b) potentially caused + missed events that were still to be queued. + 7.2.0-dev.617 | 2025-04-24 08:17:08 +0200 * cluster/websocket: Short-circuit clients without subscriptions (Arne Welzel, Corelight) diff --git a/NEWS b/NEWS index a365ac1173..f5ed50e96d 100644 --- a/NEWS +++ b/NEWS @@ -80,6 +80,12 @@ New Functionality Note that the new ``Cluster::listen_websocket()`` API will only become stable with Zeek 8.0. + Two new events, ``Cluster::websocket_client_added()`` and ``Cluster::websocket_client_lost()``, + have been added for WebSocket clients connecting and disconnecting. Note that + currently, even after ``Cluster::websocket_client_lost()`` ran, events sent from + that client may still be in transit and later executed, even on the node running + the WebSocket server. + Changed Functionality --------------------- diff --git a/VERSION b/VERSION index 09c3c6cd07..3aa0914ec6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.617 +7.2.0-dev.624 diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 59e19f7ae0..575d0aeaf7 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -203,6 +203,7 @@ bool ThreadedBackend::DoInit() { void ThreadedBackend::DoTerminate() { if ( onloop ) { + onloop->Process(); onloop->Close(); onloop = nullptr; } @@ -213,11 +214,6 @@ void ThreadedBackend::QueueForProcessing(QueueMessage&& qmessages) { onloop->QueueForProcessing(std::move(qmessages)); } -void ThreadedBackend::Process() { - if ( onloop ) - onloop->Process(); -} - void ThreadedBackend::Process(QueueMessage&& msg) { // sonarlint wants to use std::visit. not sure... if ( auto* emsg = std::get_if(&msg) ) { diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index ba689be8d3..1b09fb4c88 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -564,18 +564,13 @@ protected: /** * To be used by implementations to enqueue messages for processing on the IO loop. * - * It's safe to call this method from any thread. + * It's safe to call this method from any thread before ThreadedBackend's + * DoTerminate() implementation is invoked. * * @param messages Messages to be enqueued. */ void QueueForProcessing(QueueMessage&& messages); - /** - * Delegate to onloop->Process() to trigger processing - * of outstanding queued messages explicitly, if any. - */ - void Process(); - /** * The default DoInit() implementation of ThreadedBackend * registers itself as a counting IO source to keep the IO @@ -588,6 +583,17 @@ protected: */ bool DoInit() override; + /** + * Common DoTerminate() functionality for threaded backends. + * + * The default DoTerminate() implementation of ThreadedBackend + * runs OnLoop's Process() once to drain any pending messages, then + * closes and unsets it. + * + * Classes deriving from ThreadedBackend need to ensure that all threads + * calling QeueuForProcessing() have terminated before invoking the + * ThreadedBackend's DoTerminate() implementation. + */ void DoTerminate() override; private: diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index d69d4ba7e4..a620c8d82c 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -116,8 +116,6 @@ void ZeroMQBackend::DoInitPostScript() { } void ZeroMQBackend::DoTerminate() { - ThreadedBackend::DoTerminate(); - // If self_thread is running, notify it to shutdown via the inproc // socket, then wait for it to terminate. if ( self_thread.joinable() && ! self_thread_shutdown_requested ) { @@ -152,6 +150,8 @@ void ZeroMQBackend::DoTerminate() { proxy_thread.reset(); } + // ThreadedBackend::DoTerminate() cleans up the onloop instance. + ThreadedBackend::DoTerminate(); ZEROMQ_DEBUG("Terminated"); } diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index c70fd1c728..1f2610a99b 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -342,11 +342,14 @@ void WebSocketEventDispatcher::Process(const WebSocketClose& close) { // If the client doesn't have a backend, it wasn't ever properly instantiated. if ( backend ) { + backend->Terminate(); + + // Raise Cluster::websocket_client_lost() after the backend has terminated. + // In case any messages/events were still pending, Cluster::websocket_client_lost() + // should be the last event related to this WebSocket client. auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), wsc->getRemotePort(), TRANSPORT_TCP); zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec)); - - backend->Terminate(); } clients.erase(it); diff --git a/testing/btest/cluster/websocket/bad-subscriptions.zeek b/testing/btest/cluster/websocket/bad-subscriptions.zeek index 81b9ec3743..e70e70f7e3 100644 --- a/testing/btest/cluster/websocket/bad-subscriptions.zeek +++ b/testing/btest/cluster/websocket/bad-subscriptions.zeek @@ -29,8 +29,6 @@ @load ./zeromq-test-bootstrap redef exit_only_after_terminate = T; -global event_count = 0; - global ping: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used; diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/no-subscriptions.zeek index 50405ea74b..09701e5fc9 100644 --- a/testing/btest/cluster/websocket/no-subscriptions.zeek +++ b/testing/btest/cluster/websocket/no-subscriptions.zeek @@ -30,9 +30,6 @@ @load ./zeromq-test-bootstrap redef exit_only_after_terminate = T; -global expected_ping_count = 100; -global ping_count = 0; - event zeek_init() { Cluster::subscribe("/test/pings");