diff --git a/auxil/broker b/auxil/broker index a80bf420aa..e941b4fbb5 160000 --- a/auxil/broker +++ b/auxil/broker @@ -1 +1 @@ -Subproject commit a80bf420aa6f55b4eb959ae89c184522a096a119 +Subproject commit e941b4fbb52717b11516d45446fe632ed0e3013d diff --git a/scripts/base/frameworks/broker/main.zeek b/scripts/base/frameworks/broker/main.zeek index 0dd61f9139..0bdba4f57e 100644 --- a/scripts/base/frameworks/broker/main.zeek +++ b/scripts/base/frameworks/broker/main.zeek @@ -86,6 +86,24 @@ export { ## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting. const max_threads = 1 &redef; + ## Max number of items we buffer at most per peer. What action to take when + ## the buffer reaches its maximum size is determined by + ## `peer_overflow_policy`. + const peer_buffer_size = 2048 &redef; + + ## Configures how Broker responds to peers that cannot keep up with the + ## incoming message rate. Available strategies: + ## - disconnect: drop the connection to the unresponsive peer + ## - drop_newest: replace the newest message in the buffer + ## - drop_oldest: removed the olsted message from the buffer, then append + const peer_overflow_policy = "disconnect" &redef; + + ## Same as `peer_buffer_size` but for WebSocket clients. + const web_socket_buffer_size = 512 &redef; + + ## Same as `peer_overflow_policy` but for WebSocket clients. + const web_socket_overflow_policy = "disconnect" &redef; + ## The CAF scheduling policy to use. Available options are "sharing" and ## "stealing". The "sharing" policy uses a single, global work queue along ## with mutex and condition variable used for accessing it, which may be diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 015f53d10e..b5407d413a 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -294,6 +294,36 @@ void Manager::InitPostScript() { options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool(); options.use_real_time = use_real_time; + options.peer_buffer_size = get_option("Broker::peer_buffer_size")->AsCount(); + auto peer_overflow_policy = get_option("Broker::peer_overflow_policy")->AsString()->CheckString(); + if ( util::streq(peer_overflow_policy, "disconnect") ) { + options.peer_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(peer_overflow_policy, "drop_oldest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(peer_overflow_policy, "drop_newest") ) { + options.peer_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::peer_overflow_policy: %s", peer_overflow_policy); + } + + options.web_socket_buffer_size = get_option("Broker::web_socket_buffer_size")->AsCount(); + auto web_socket_overflow_policy = get_option("Broker::web_socket_overflow_policy")->AsString()->CheckString(); + if ( util::streq(web_socket_overflow_policy, "disconnect") ) { + options.web_socket_overflow_policy = broker::overflow_policy::disconnect; + } + else if ( util::streq(web_socket_overflow_policy, "drop_oldest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_oldest; + } + else if ( util::streq(web_socket_overflow_policy, "drop_newest") ) { + options.web_socket_overflow_policy = broker::overflow_policy::drop_newest; + } + else { + reporter->FatalError("Invalid Broker::web_socket_overflow_policy: %s", web_socket_overflow_policy); + } + broker::configuration config{std::move(options)}; config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());