diff --git a/scripts/base/frameworks/cluster/main.zeek b/scripts/base/frameworks/cluster/main.zeek index 0520a262c6..30511f9b82 100644 --- a/scripts/base/frameworks/cluster/main.zeek +++ b/scripts/base/frameworks/cluster/main.zeek @@ -75,6 +75,16 @@ export { ## :zeek:see:`Cluster::create_store` with the *persistent* argument set true. const default_persistent_backend = Broker::SQLITE &redef; + ## The default maximum queue size for WebSocket event dispatcher instances. + ## + ## If the maximum queue size is reached, events from external WebSocket + ## clients will be stalled and processed once the queue has been drained. + ## + ## An internal metric named ``cluster_onloop_queue_stalls`` and + ## labeled with a ``WebSocketEventDispatcher::`` tag + ## is incremented when the maximum queue size is reached. + const default_websocket_max_event_queue_size = 32 &redef; + ## Setting a default dir will, for persistent backends that have not ## been given an explicit file path via :zeek:see:`Cluster::stores`, ## automatically create a path within this dir that is based on the name of @@ -353,6 +363,8 @@ export { listen_host: string; ## The port the WebSocket server is supposed to listen on. listen_port: port; + ## The maximum event queue size for this server. + max_event_queue_size: count &default=default_websocket_max_event_queue_size; ## The TLS options used for this WebSocket server. By default, ## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`. tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions(); diff --git a/src/cluster/Manager.cc b/src/cluster/Manager.cc index 28b1055f59..29bade62a6 100644 --- a/src/cluster/Manager.cc +++ b/src/cluster/Manager.cc @@ -55,8 +55,11 @@ bool Manager::ListenWebSocket(const websocket::detail::ServerOptions& options) { return false; } - auto server = - websocket::detail::StartServer(std::make_unique(), options); + std::string ident = util::fmt("%s:%d", options.host.c_str(), options.port); + + auto dispatcher = + std::make_unique(std::move(ident), options.max_event_queue_size); + auto server = websocket::detail::StartServer(std::move(dispatcher), options); if ( ! server ) return false; diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index 08557ff765..84e86798c9 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -196,6 +196,8 @@ function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool options_rec->GetField("listen_host")->ToStdString(), static_cast(options_rec->GetField("listen_port")->Port()), }; + + server_options.max_event_queue_size = options_rec->GetField("max_event_queue_size")->AsCount(); server_options.tls_options = std::move(tls_options); auto result = zeek::cluster::manager->ListenWebSocket(server_options); diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index e6b867e32f..2ceff5b9d6 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -214,9 +214,11 @@ public: bool OnFinish(double network_time) override { return true; } }; -WebSocketEventDispatcher::WebSocketEventDispatcher() { +WebSocketEventDispatcher::WebSocketEventDispatcher(std::string ident, size_t queue_size) { onloop = - new zeek::detail::OnLoopProcess(this, "WebSocketEventDispatcher"); + new zeek::detail::OnLoopProcess(this, + "WebSocketEventDispatcher:" + ident, + queue_size); // Register the onloop instance the IO loop. Lifetime will be managed by the loop. onloop->Register(false); diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 8d8e51ceee..a635c7dbd8 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -171,7 +171,13 @@ class ReplyMsgThread; */ class WebSocketEventDispatcher { public: - WebSocketEventDispatcher(); + /** + * Constructor. + * + * @param ident A string identifying this dispatcher instance. Used in metrics. + * @param queue_size Maximum queue size before events are stalled. + */ + WebSocketEventDispatcher(std::string ident, size_t queue_size); ~WebSocketEventDispatcher(); @@ -295,12 +301,13 @@ struct ServerOptions { int ping_interval_seconds = 5; int max_connections = 100; bool per_message_deflate = false; + size_t max_event_queue_size = 32; struct TLSOptions tls_options; bool operator==(const ServerOptions& o) const { return host == o.host && port == o.port && ping_interval_seconds == o.ping_interval_seconds && max_connections == o.max_connections && per_message_deflate == o.per_message_deflate && - tls_options == o.tls_options; + max_event_queue_size == o.max_event_queue_size && tls_options == o.tls_options; } }; diff --git a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr index 889b895763..38b947db5d 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.listen-idempotent/.stderr @@ -1,4 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) -error in <...>/main.zeek, line 654: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_x)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_wss_port)) +error in <...>/main.zeek, line 666: Already listening on 127.0.0.1: (Cluster::__listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr index 0e09ca4de9..0a8d499f53 100644 --- a/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr +++ b/testing/btest/Baseline.zam/cluster.websocket.tls-usage-error/.stderr @@ -1,3 +1,3 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -error in <...>/main.zeek, line 654: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) -error in <...>/main.zeek, line 654: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) +error in <...>/main.zeek, line 666: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0)) +error in <...>/main.zeek, line 666: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3)) diff --git a/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr index e2bea8f00f..b922d6c7e4 100644 --- a/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr +++ b/testing/btest/Baseline/cluster.websocket.listen-idempotent/.stderr @@ -1,4 +1,5 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. error in <...>/listen-idempotent.zeek, line 43: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_x)) error in <...>/listen-idempotent.zeek, line 47: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_wss_port)) +error in <...>/listen-idempotent.zeek, line 52: Already listening on 127.0.0.1: (Cluster::listen_websocket(ws_opts_qs)) received termination signal diff --git a/testing/btest/cluster/websocket/listen-idempotent.zeek b/testing/btest/cluster/websocket/listen-idempotent.zeek index 448ab5e1ac..e187386ca3 100644 --- a/testing/btest/cluster/websocket/listen-idempotent.zeek +++ b/testing/btest/cluster/websocket/listen-idempotent.zeek @@ -46,5 +46,12 @@ event zeek_init() assert Cluster::listen_websocket(ws_tls_opts_copy); assert ! Cluster::listen_websocket(ws_opts_wss_port); + # Using a different max_event_queue_size fails, but using the default should work. + local ws_opts_qs = copy(ws_opts); + ws_opts_qs$max_event_queue_size = 42; + assert ! Cluster::listen_websocket(ws_opts_qs); + ws_opts_qs$max_event_queue_size = Cluster::default_websocket_max_event_queue_size; + assert Cluster::listen_websocket(ws_opts_qs); + terminate(); }