From 011029addc03e87e98e03c37a18d70d479ae8ded Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 16 Apr 2025 16:59:05 +0200 Subject: [PATCH] cluster/websocket: Make websocket dispatcher queue size configurable Limit the number WebSocket events queued from external clients to dispatcher instances to produce back pressure to the clients if Zeek's IO loop is overloaded. --- scripts/base/frameworks/cluster/main.zeek | 12 ++++++++++++ src/cluster/Manager.cc | 7 +++++-- src/cluster/cluster.bif | 2 ++ src/cluster/websocket/WebSocket.cc | 6 ++++-- src/cluster/websocket/WebSocket.h | 11 +++++++++-- .../cluster.websocket.listen-idempotent/.stderr | 5 +++-- .../cluster.websocket.tls-usage-error/.stderr | 4 ++-- .../cluster.websocket.listen-idempotent/.stderr | 1 + .../btest/cluster/websocket/listen-idempotent.zeek | 7 +++++++ 9 files changed, 45 insertions(+), 10 deletions(-) diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 0520a262c6..30511f9b82 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -75,6 +75,16 @@ export { ## :zeek:see:`Cluster::create_store` with the *persistent* argument set true. const default_persistent_backend = Broker::SQLITE &redef; + ## The default maximum queue size for WebSocket event dispatcher instances. + ## + ## If the maximum queue size is reached, events from external WebSocket + ## clients will be stalled and processed once the queue has been drained. + ## + ## An internal metric named ``cluster_onloop_queue_stalls`` and + ## labeled with a ``WebSocketEventDispatcher::`` tag + ## is incremented when the maximum queue size is reached. + const default_websocket_max_event_queue_size = 32 &redef; + ## Setting a default dir will, for persistent backends that have not ## been given an explicit file path via :zeek:see:`Cluster::stores`, ## automatically create a path within this dir that is based on the name of @@ -353,6 +363,8 @@ export { listen_host: string; ## The port the WebSocket server is supposed to listen on. listen_port: port; + ## The maximum event queue size for this server. + max_event_queue_size: count &default=default_websocket_max_event_queue_size; ## The TLS options used for this WebSocket server. By default, ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc index 28b1055f59..29bade62a6 100644 --- a/src/cluster/Manager.cc +++ b/src/cluster/Manager.cc @@ -55,8 +55,11 @@ bool Manager::ListenWebSocket(const websocket::detail::ServerOptions& options) { return false; } - auto server = - websocket::detail::StartServer(std::make_unique(), options); + std::string ident = util::fmt("%s:%d", options.host.c_str(), options.port); + + auto dispatcher = + std::make_unique(std::move(ident), options.max_event_queue_size); + auto server = websocket::detail::StartServer(std::move(dispatcher), options); if ( ! server ) return false; diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index 08557ff765..84e86798c9 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -196,6 +196,8 @@ function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool options_rec->GetField("listen_host")->ToStdString(), static_cast(options_rec->GetField("listen_port")->Port()), }; + + server_options.max_event_queue_size = options_rec->GetField("max_event_queue_size")->AsCount(); server_options.tls_options = std::move(tls_options); auto result = zeek::cluster::manager->ListenWebSocket(server_options); diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index e6b867e32f..2ceff5b9d6 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -214,9 +214,11 @@ public: bool OnFinish(double network_time) override { return true; } }; -WebSocketEventDispatcher::WebSocketEventDispatcher() { +WebSocketEventDispatcher::WebSocketEventDispatcher(std::string ident, size_t queue_size) { onloop = - new zeek::detail::OnLoopProcess(this, "WebSocketEventDispatcher"); + new zeek::detail::OnLoopProcess(this, + "WebSocketEventDispatcher:" + ident, + queue_size); // Register the onloop instance the IO loop. Lifetime will be managed by the loop. onloop->Register(false); diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 8d8e51ceee..a635c7dbd8 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -171,7 +171,13 @@ class ReplyMsgThread; */ class WebSocketEventDispatcher { public: - WebSocketEventDispatcher(); + /** + * Constructor. + * + * @param ident A string identifying this dispatcher instance. Used in metrics. + * @param queue_size Maximum queue size before events are stalled. + */ + WebSocketEventDispatcher(std::string ident, size_t queue_size); ~WebSocketEventDispatcher(); @@ -295,12 +301,13 @@ struct ServerOptions { int ping_interval_seconds = 5; int max_connections = 100; bool per_message_deflate = false; + size_t max_event_queue_size = 32; struct TLSOptions tls_options; bool operator==(const ServerOptions& o) const { return host == o.host && port == o.port && ping_interval_seconds == o.ping_interval_seconds && max_connections == o.max_connections && per_message_deflate == o.per_message_deflate && - tls_options == o.tls_options; + max_event_queue_size == o.max_event_queue_size && tls_options == o.tls_options; } }; diff --git a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr index 889b895763..38b947db5d 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr @@ -1,4 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) -error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr index 0e09ca4de9..0a8d499f53 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr @@ -1,3 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 654: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) -error in <...>/main.zeek, line 654: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) +error in <...>/main.zeek, line 666: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) +error in <...>/main.zeek, line 666: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) diff --git a/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr index e2bea8f00f..b922d6c7e4 100644 --- a/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr @@ -1,4 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. error in <...>/listen-idempotent.zeek, line 43: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_x)) error in <...>/listen-idempotent.zeek, line 47: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_wss_port)) +error in <...>/listen-idempotent.zeek, line 52: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/cluster/websocket/listen-idempotent.zeek b/testing/btest/cluster/websocket/listen-idempotent.zeek index 448ab5e1ac..e187386ca3 100644 --- a/testing/btest/cluster/websocket/listen-idempotent.zeek +++ b/testing/btest/cluster/websocket/listen-idempotent.zeek @@ -46,5 +46,12 @@ event zeek_init() assert Cluster::listen_websocket(ws_tls_opts_copy); assert ! Cluster::listen_websocket(ws_opts_wss_port); + # Using a different max_event_queue_size fails, but using the default should work. + local ws_opts_qs = copy(ws_opts); + ws_opts_qs$max_event_queue_size = 42; + assert ! Cluster::listen_websocket(ws_opts_qs); + ws_opts_qs$max_event_queue_size = Cluster::default_websocket_max_event_queue_size; + assert Cluster::listen_websocket(ws_opts_qs); + terminate(); }