cluster/websocket: Make websocket dispatcher queue size configurable

Limit the number WebSocket events queued from external clients to
dispatcher instances to produce back pressure to the clients if
Zeek's IO loop is overloaded.
This commit is contained in:
Arne Welzel 2025-04-16 16:59:05 +02:00
parent 6bd624d9b2
commit 011029addc
9 changed files with 45 additions and 10 deletions

View file

@ -75,6 +75,16 @@ export {
## :zeek:see:`Cluster::create_store` with the *persistent* argument set true.
const default_persistent_backend = Broker::SQLITE &redef;
## The default maximum queue size for WebSocket event dispatcher instances.
##
## If the maximum queue size is reached, events from external WebSocket
## clients will be stalled and processed once the queue has been drained.
##
## An internal metric named ``cluster_onloop_queue_stalls`` and
## labeled with a ``WebSocketEventDispatcher:<host>:<port>`` tag
## is incremented when the maximum queue size is reached.
const default_websocket_max_event_queue_size = 32 &redef;
## Setting a default dir will, for persistent backends that have not
## been given an explicit file path via :zeek:see:`Cluster::stores`,
## automatically create a path within this dir that is based on the name of
@ -353,6 +363,8 @@ export {
listen_host: string;
## The port the WebSocket server is supposed to listen on.
listen_port: port;
## The maximum event queue size for this server.
max_event_queue_size: count &default=default_websocket_max_event_queue_size;
## The TLS options used for this WebSocket server. By default,
## TLS is disabled. See also :zeek:see:`Cluster::WebSocketTLSOptions`.
tls_options: WebSocketTLSOptions &default=WebSocketTLSOptions();

View file

@ -55,8 +55,11 @@ bool Manager::ListenWebSocket(const websocket::detail::ServerOptions& options) {
return false;
}
auto server =
websocket::detail::StartServer(std::make_unique<websocket::detail::WebSocketEventDispatcher>(), options);
std::string ident = util::fmt("%s:%d", options.host.c_str(), options.port);
auto dispatcher =
std::make_unique<websocket::detail::WebSocketEventDispatcher>(std::move(ident), options.max_event_queue_size);
auto server = websocket::detail::StartServer(std::move(dispatcher), options);
if ( ! server )
return false;

View file

@ -196,6 +196,8 @@ function Cluster::__listen_websocket%(options: WebSocketServerOptions%): bool
options_rec->GetField<zeek::StringVal>("listen_host")->ToStdString(),
static_cast<uint16_t>(options_rec->GetField<zeek::PortVal>("listen_port")->Port()),
};
server_options.max_event_queue_size = options_rec->GetField<zeek::PortVal>("max_event_queue_size")->AsCount();
server_options.tls_options = std::move(tls_options);
auto result = zeek::cluster::manager->ListenWebSocket(server_options);

View file

@ -214,9 +214,11 @@ public:
bool OnFinish(double network_time) override { return true; }
};
WebSocketEventDispatcher::WebSocketEventDispatcher() {
WebSocketEventDispatcher::WebSocketEventDispatcher(std::string ident, size_t queue_size) {
onloop =
new zeek::detail::OnLoopProcess<WebSocketEventDispatcher, WebSocketEvent>(this, "WebSocketEventDispatcher");
new zeek::detail::OnLoopProcess<WebSocketEventDispatcher, WebSocketEvent>(this,
"WebSocketEventDispatcher:" + ident,
queue_size);
// Register the onloop instance the IO loop. Lifetime will be managed by the loop.
onloop->Register(false);

View file

@ -171,7 +171,13 @@ class ReplyMsgThread;
*/
class WebSocketEventDispatcher {
public:
WebSocketEventDispatcher();
/**
* Constructor.
*
* @param ident A string identifying this dispatcher instance. Used in metrics.
* @param queue_size Maximum queue size before events are stalled.
*/
WebSocketEventDispatcher(std::string ident, size_t queue_size);
~WebSocketEventDispatcher();
@ -295,12 +301,13 @@ struct ServerOptions {
int ping_interval_seconds = 5;
int max_connections = 100;
bool per_message_deflate = false;
size_t max_event_queue_size = 32;
struct TLSOptions tls_options;
bool operator==(const ServerOptions& o) const {
return host == o.host && port == o.port && ping_interval_seconds == o.ping_interval_seconds &&
max_connections == o.max_connections && per_message_deflate == o.per_message_deflate &&
tls_options == o.tls_options;
max_event_queue_size == o.max_event_queue_size && tls_options == o.tls_options;
}
};

View file

@ -1,4 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 654: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
error in <...>/main.zeek, line 654: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
error in <...>/main.zeek, line 666: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_x))
error in <...>/main.zeek, line 666: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_wss_port))
error in <...>/main.zeek, line 666: Already listening on 127.0.0.1:<port> (Cluster::__listen_websocket(ws_opts_qs))
received termination signal

View file

@ -1,3 +1,3 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/main.zeek, line 654: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
error in <...>/main.zeek, line 654: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))
error in <...>/main.zeek, line 666: Invalid tls_options: No key_file field (Cluster::__listen_websocket(Cluster::options.0))
error in <...>/main.zeek, line 666: Invalid tls_options: No cert_file field (Cluster::__listen_websocket(Cluster::options.3))

View file

@ -1,4 +1,5 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/listen-idempotent.zeek, line 43: Already listening on 127.0.0.1:<port> (Cluster::listen_websocket(ws_opts_x))
error in <...>/listen-idempotent.zeek, line 47: Already listening on 127.0.0.1:<port> (Cluster::listen_websocket(ws_opts_wss_port))
error in <...>/listen-idempotent.zeek, line 52: Already listening on 127.0.0.1:<port> (Cluster::listen_websocket(ws_opts_qs))
received termination signal

View file

@ -46,5 +46,12 @@ event zeek_init()
assert Cluster::listen_websocket(ws_tls_opts_copy);
assert ! Cluster::listen_websocket(ws_opts_wss_port);
# Using a different max_event_queue_size fails, but using the default should work.
local ws_opts_qs = copy(ws_opts);
ws_opts_qs$max_event_queue_size = 42;
assert ! Cluster::listen_websocket(ws_opts_qs);
ws_opts_qs$max_event_queue_size = Cluster::default_websocket_max_event_queue_size;
assert Cluster::listen_websocket(ws_opts_qs);
terminate();
}