Integrate new Broker auto-disconnecting feature

This commit is contained in:
Dominik Charousset 2024-09-25 16:26:35 +02:00
parent 270429bfea
commit 304b8f8f0b
3 changed files with 49 additions and 1 deletions

@ -1 +1 @@
Subproject commit a80bf420aa6f55b4eb959ae89c184522a096a119
Subproject commit e941b4fbb52717b11516d45446fe632ed0e3013d

View file

@ -86,6 +86,24 @@ export {
## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.
const max_threads = 1 &redef;
## Max number of items we buffer at most per peer. What action to take when
## the buffer reaches its maximum size is determined by
## `peer_overflow_policy`.
const peer_buffer_size = 2048 &redef;
## Configures how Broker responds to peers that cannot keep up with the
## incoming message rate. Available strategies:
## - disconnect: drop the connection to the unresponsive peer
## - drop_newest: replace the newest message in the buffer
## - drop_oldest: removed the olsted message from the buffer, then append
const peer_overflow_policy = "disconnect" &redef;
## Same as `peer_buffer_size` but for WebSocket clients.
const web_socket_buffer_size = 512 &redef;
## Same as `peer_overflow_policy` but for WebSocket clients.
const web_socket_overflow_policy = "disconnect" &redef;
## The CAF scheduling policy to use. Available options are "sharing" and
## "stealing". The "sharing" policy uses a single, global work queue along
## with mutex and condition variable used for accessing it, which may be

View file

@ -294,6 +294,36 @@ void Manager::InitPostScript() {
options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool();
options.use_real_time = use_real_time;
options.peer_buffer_size = get_option("Broker::peer_buffer_size")->AsCount();
auto peer_overflow_policy = get_option("Broker::peer_overflow_policy")->AsString()->CheckString();
if ( util::streq(peer_overflow_policy, "disconnect") ) {
options.peer_overflow_policy = broker::overflow_policy::disconnect;
}
else if ( util::streq(peer_overflow_policy, "drop_oldest") ) {
options.peer_overflow_policy = broker::overflow_policy::drop_oldest;
}
else if ( util::streq(peer_overflow_policy, "drop_newest") ) {
options.peer_overflow_policy = broker::overflow_policy::drop_newest;
}
else {
reporter->FatalError("Invalid Broker::peer_overflow_policy: %s", peer_overflow_policy);
}
options.web_socket_buffer_size = get_option("Broker::web_socket_buffer_size")->AsCount();
auto web_socket_overflow_policy = get_option("Broker::web_socket_overflow_policy")->AsString()->CheckString();
if ( util::streq(web_socket_overflow_policy, "disconnect") ) {
options.web_socket_overflow_policy = broker::overflow_policy::disconnect;
}
else if ( util::streq(web_socket_overflow_policy, "drop_oldest") ) {
options.web_socket_overflow_policy = broker::overflow_policy::drop_oldest;
}
else if ( util::streq(web_socket_overflow_policy, "drop_newest") ) {
options.web_socket_overflow_policy = broker::overflow_policy::drop_newest;
}
else {
reporter->FatalError("Invalid Broker::web_socket_overflow_policy: %s", web_socket_overflow_policy);
}
broker::configuration config{std::move(options)};
config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());