From 4c4eb4b8e28c7e79bebeaec549b2466095d8d97a Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Wed, 25 Sep 2024 16:26:35 +0200 Subject: [PATCH] Add Zeek-level configurability of Broker slow-peer disconnects --- scripts/base/frameworks/broker/main.zeek | 18 ++++++++++++++ src/broker/Manager.cc | 30 ++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index d41f64ab2e..2990f3f297 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -86,6 +86,24 @@ export { ## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting. const max_threads = 1 &redef; + ## Max number of items we buffer at most per peer. What action to take when + ## the buffer reaches its maximum size is determined by + ## `peer_overflow_policy`. + const peer_buffer_size = 2048 &redef; + + ## Configures how Broker responds to peers that cannot keep up with the + ## incoming message rate. Available strategies: + ## - disconnect: drop the connection to the unresponsive peer + ## - drop_newest: replace the newest message in the buffer + ## - drop_oldest: removed the olsted message from the buffer, then append + const peer_overflow_policy = "disconnect" &redef; + + ## Same as `peer_buffer_size` but for WebSocket clients. + const web_socket_buffer_size = 512 &redef; + + ## Same as `peer_overflow_policy` but for WebSocket clients. + const web_socket_overflow_policy = "disconnect" &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 6e5dd26cd7..e5e44e567f 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -255,6 +255,36 @@ void Manager::DoInitPostScript() { options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool(); options.use_real_time = use_real_time; + options.peer_buffer_size = get_option("Broker::peer_buffer_size")->AsCount(); + auto peer_overflow_policy = get_option("Broker::peer_overflow_policy")->AsString()->CheckString(); + if ( util::streq(peer_overflow_policy, "disconnect") ) { + options.peer_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(peer_overflow_policy, "drop_oldest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(peer_overflow_policy, "drop_newest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::peer_overflow_policy: %s", peer_overflow_policy); + } + + options.web_socket_buffer_size = get_option("Broker::web_socket_buffer_size")->AsCount(); + auto web_socket_overflow_policy = get_option("Broker::web_socket_overflow_policy")->AsString()->CheckString(); + if ( util::streq(web_socket_overflow_policy, "disconnect") ) { + options.web_socket_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(web_socket_overflow_policy, "drop_oldest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(web_socket_overflow_policy, "drop_newest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::web_socket_overflow_policy: %s", web_socket_overflow_policy); + } + broker::configuration config{std::move(options)}; config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());