Merge remote-tracking branch 'origin/topic/awelzel/cluster-fix-tsan-zeromq-do-terminate'

* origin/topic/awelzel/cluster-fix-tsan-zeromq-do-terminate:
  NEWS: Add entry about WebSocket client events
  btest/cluster: Testing cleanup
  cluster/websocket: Raise websocket_client_lost() after terminate
  cluster/ThreadedBackend: Invoke onloop->Process() during DoTerminate()
  cluster/ThreadedBackend: Remove Process()
  zeromq: Call super class DoTerminate() after stopping thread
This commit is contained in:
Arne Welzel 2025-04-24 14:03:52 +02:00
commit 69a1ad2c3d
9 changed files with 51 additions and 22 deletions

23
CHANGES
View file

@ -1,3 +1,26 @@
7.2.0-dev.624 | 2025-04-24 14:03:52 +0200
* cluster/websocket: Raise websocket_client_lost() after terminate (Arne Welzel, Corelight)
Just in case events are created during backend->Terminate(). These
should come before the Cluster::websocket_client_lost() event.
* cluster/ThreadedBackend: Invoke onloop->Process() during DoTerminate() (Arne Welzel, Corelight)
Also, document how to use ThreadedBackend's DoTerminate()
* cluster/ThreadedBackend: Remove Process() (Arne Welzel, Corelight)
This must have been left-over from before OnLoopProcess existed. It
wasn't called or used anymore.
* zeromq: Call super class DoTerminate() after stopping thread (Arne Welzel, Corelight)
The internal ZeroMQ thread would call QueueForProcessing() thereby
accessing the onloop member. As ThreadedBackend::DoTerminate() unsets it,
this was a) reported as a data race by TSAN and b) potentially caused
missed events that were still to be queued.
7.2.0-dev.617 | 2025-04-24 08:17:08 +0200 7.2.0-dev.617 | 2025-04-24 08:17:08 +0200
* cluster/websocket: Short-circuit clients without subscriptions (Arne Welzel, Corelight) * cluster/websocket: Short-circuit clients without subscriptions (Arne Welzel, Corelight)

6
NEWS
View file

@ -80,6 +80,12 @@ New Functionality
Note that the new ``Cluster::listen_websocket()`` API will only become stable Note that the new ``Cluster::listen_websocket()`` API will only become stable
with Zeek 8.0. with Zeek 8.0.
Two new events, ``Cluster::websocket_client_added()`` and ``Cluster::websocket_client_lost()``,
have been added for WebSocket clients connecting and disconnecting. Note that
currently, even after ``Cluster::websocket_client_lost()`` ran, events sent from
that client may still be in transit and later executed, even on the node running
the WebSocket server.
Changed Functionality Changed Functionality
--------------------- ---------------------

View file

@ -1 +1 @@
7.2.0-dev.617 7.2.0-dev.624

View file

@ -203,6 +203,7 @@ bool ThreadedBackend::DoInit() {
void ThreadedBackend::DoTerminate() { void ThreadedBackend::DoTerminate() {
if ( onloop ) { if ( onloop ) {
onloop->Process();
onloop->Close(); onloop->Close();
onloop = nullptr; onloop = nullptr;
} }
@ -213,11 +214,6 @@ void ThreadedBackend::QueueForProcessing(QueueMessage&& qmessages) {
onloop->QueueForProcessing(std::move(qmessages)); onloop->QueueForProcessing(std::move(qmessages));
} }
void ThreadedBackend::Process() {
if ( onloop )
onloop->Process();
}
void ThreadedBackend::Process(QueueMessage&& msg) { void ThreadedBackend::Process(QueueMessage&& msg) {
// sonarlint wants to use std::visit. not sure... // sonarlint wants to use std::visit. not sure...
if ( auto* emsg = std::get_if<EventMessage>(&msg) ) { if ( auto* emsg = std::get_if<EventMessage>(&msg) ) {

View file

@ -564,18 +564,13 @@ protected:
/** /**
* To be used by implementations to enqueue messages for processing on the IO loop. * To be used by implementations to enqueue messages for processing on the IO loop.
* *
* It's safe to call this method from any thread. * It's safe to call this method from any thread before ThreadedBackend's
* DoTerminate() implementation is invoked.
* *
* @param messages Messages to be enqueued. * @param messages Messages to be enqueued.
*/ */
void QueueForProcessing(QueueMessage&& messages); void QueueForProcessing(QueueMessage&& messages);
/**
* Delegate to onloop->Process() to trigger processing
* of outstanding queued messages explicitly, if any.
*/
void Process();
/** /**
* The default DoInit() implementation of ThreadedBackend * The default DoInit() implementation of ThreadedBackend
* registers itself as a counting IO source to keep the IO * registers itself as a counting IO source to keep the IO
@ -588,6 +583,17 @@ protected:
*/ */
bool DoInit() override; bool DoInit() override;
/**
* Common DoTerminate() functionality for threaded backends.
*
* The default DoTerminate() implementation of ThreadedBackend
* runs OnLoop's Process() once to drain any pending messages, then
* closes and unsets it.
*
* Classes deriving from ThreadedBackend need to ensure that all threads
* calling QeueuForProcessing() have terminated before invoking the
* ThreadedBackend's DoTerminate() implementation.
*/
void DoTerminate() override; void DoTerminate() override;
private: private:

View file

@ -116,8 +116,6 @@ void ZeroMQBackend::DoInitPostScript() {
} }
void ZeroMQBackend::DoTerminate() { void ZeroMQBackend::DoTerminate() {
ThreadedBackend::DoTerminate();
// If self_thread is running, notify it to shutdown via the inproc // If self_thread is running, notify it to shutdown via the inproc
// socket, then wait for it to terminate. // socket, then wait for it to terminate.
if ( self_thread.joinable() && ! self_thread_shutdown_requested ) { if ( self_thread.joinable() && ! self_thread_shutdown_requested ) {
@ -152,6 +150,8 @@ void ZeroMQBackend::DoTerminate() {
proxy_thread.reset(); proxy_thread.reset();
} }
// ThreadedBackend::DoTerminate() cleans up the onloop instance.
ThreadedBackend::DoTerminate();
ZEROMQ_DEBUG("Terminated"); ZEROMQ_DEBUG("Terminated");
} }

View file

@ -342,11 +342,14 @@ void WebSocketEventDispatcher::Process(const WebSocketClose& close) {
// If the client doesn't have a backend, it wasn't ever properly instantiated. // If the client doesn't have a backend, it wasn't ever properly instantiated.
if ( backend ) { if ( backend ) {
backend->Terminate();
// Raise Cluster::websocket_client_lost() after the backend has terminated.
// In case any messages/events were still pending, Cluster::websocket_client_lost()
// should be the last event related to this WebSocket client.
auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(), auto rec = zeek::cluster::detail::bif::make_endpoint_info(backend->NodeId(), wsc->getRemoteIp(),
wsc->getRemotePort(), TRANSPORT_TCP); wsc->getRemotePort(), TRANSPORT_TCP);
zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec)); zeek::event_mgr.Enqueue(Cluster::websocket_client_lost, std::move(rec));
backend->Terminate();
} }
clients.erase(it); clients.erase(it);

View file

@ -29,8 +29,6 @@
@load ./zeromq-test-bootstrap @load ./zeromq-test-bootstrap
redef exit_only_after_terminate = T; redef exit_only_after_terminate = T;
global event_count = 0;
global ping: event(msg: string, c: count) &is_used; global ping: event(msg: string, c: count) &is_used;
global pong: event(msg: string, c: count) &is_used; global pong: event(msg: string, c: count) &is_used;

View file

@ -30,9 +30,6 @@
@load ./zeromq-test-bootstrap @load ./zeromq-test-bootstrap
redef exit_only_after_terminate = T; redef exit_only_after_terminate = T;
global expected_ping_count = 100;
global ping_count = 0;
event zeek_init() event zeek_init()
{ {
Cluster::subscribe("/test/pings"); Cluster::subscribe("/test/pings");