From c699748157845d1962db4415203fab0083d5ee0f Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Wed, 25 Sep 2024 16:26:35 +0200 Subject: [PATCH] Integrate new Broker auto-disconnecting feature --- auxil/broker | 2 +- scripts/base/frameworks/broker/main.zeek | 18 ++++++++++++++ src/broker/Manager.cc | 30 ++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/auxil/broker b/auxil/broker index 5b3ed87a93..5ed9997159 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit 5b3ed87a93b2ded1f3c95ff1a3b99e2c6ab84ef4 +Subproject commit 5ed99971597a9a684a7ed97cd8a1d6604d5dce63 diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 0dd61f9139..0bdba4f57e 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -86,6 +86,24 @@ export { ## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting. const max_threads = 1 &redef; + ## Max number of items we buffer at most per peer. What action to take when + ## the buffer reaches its maximum size is determined by + ## `peer_overflow_policy`. + const peer_buffer_size = 2048 &redef; + + ## Configures how Broker responds to peers that cannot keep up with the + ## incoming message rate. Available strategies: + ## - disconnect: drop the connection to the unresponsive peer + ## - drop_newest: replace the newest message in the buffer + ## - drop_oldest: removed the olsted message from the buffer, then append + const peer_overflow_policy = "disconnect" &redef; + + ## Same as `peer_buffer_size` but for WebSocket clients. + const web_socket_buffer_size = 512 &redef; + + ## Same as `peer_overflow_policy` but for WebSocket clients. + const web_socket_overflow_policy = "disconnect" &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 015f53d10e..b5407d413a 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -294,6 +294,36 @@ void Manager::InitPostScript() { options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool(); options.use_real_time = use_real_time; + options.peer_buffer_size = get_option("Broker::peer_buffer_size")->AsCount(); + auto peer_overflow_policy = get_option("Broker::peer_overflow_policy")->AsString()->CheckString(); + if ( util::streq(peer_overflow_policy, "disconnect") ) { + options.peer_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(peer_overflow_policy, "drop_oldest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(peer_overflow_policy, "drop_newest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::peer_overflow_policy: %s", peer_overflow_policy); + } + + options.web_socket_buffer_size = get_option("Broker::web_socket_buffer_size")->AsCount(); + auto web_socket_overflow_policy = get_option("Broker::web_socket_overflow_policy")->AsString()->CheckString(); + if ( util::streq(web_socket_overflow_policy, "disconnect") ) { + options.web_socket_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(web_socket_overflow_policy, "drop_oldest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(web_socket_overflow_policy, "drop_newest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::web_socket_overflow_policy: %s", web_socket_overflow_policy); + } + broker::configuration config{std::move(options)}; config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());