mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Add Zeek-level configurability of Broker slow-peer disconnects
This commit is contained in:
parent
b9df1674b7
commit
4c4eb4b8e2
2 changed files with 48 additions and 0 deletions
|
@ -86,6 +86,24 @@ export {
|
||||||
## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.
|
## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.
|
||||||
const max_threads = 1 &redef;
|
const max_threads = 1 &redef;
|
||||||
|
|
||||||
|
## Max number of items we buffer at most per peer. What action to take when
|
||||||
|
## the buffer reaches its maximum size is determined by
|
||||||
|
## `peer_overflow_policy`.
|
||||||
|
const peer_buffer_size = 2048 &redef;
|
||||||
|
|
||||||
|
## Configures how Broker responds to peers that cannot keep up with the
|
||||||
|
## incoming message rate. Available strategies:
|
||||||
|
## - disconnect: drop the connection to the unresponsive peer
|
||||||
|
## - drop_newest: replace the newest message in the buffer
|
||||||
|
## - drop_oldest: removed the olsted message from the buffer, then append
|
||||||
|
const peer_overflow_policy = "disconnect" &redef;
|
||||||
|
|
||||||
|
## Same as `peer_buffer_size` but for WebSocket clients.
|
||||||
|
const web_socket_buffer_size = 512 &redef;
|
||||||
|
|
||||||
|
## Same as `peer_overflow_policy` but for WebSocket clients.
|
||||||
|
const web_socket_overflow_policy = "disconnect" &redef;
|
||||||
|
|
||||||
## The CAF scheduling policy to use. Available options are "sharing" and
|
## The CAF scheduling policy to use. Available options are "sharing" and
|
||||||
## "stealing". The "sharing" policy uses a single, global work queue along
|
## "stealing". The "sharing" policy uses a single, global work queue along
|
||||||
## with mutex and condition variable used for accessing it, which may be
|
## with mutex and condition variable used for accessing it, which may be
|
||||||
|
|
|
@ -255,6 +255,36 @@ void Manager::DoInitPostScript() {
|
||||||
options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool();
|
options.disable_forwarding = ! get_option("Broker::forward_messages")->AsBool();
|
||||||
options.use_real_time = use_real_time;
|
options.use_real_time = use_real_time;
|
||||||
|
|
||||||
|
options.peer_buffer_size = get_option("Broker::peer_buffer_size")->AsCount();
|
||||||
|
auto peer_overflow_policy = get_option("Broker::peer_overflow_policy")->AsString()->CheckString();
|
||||||
|
if ( util::streq(peer_overflow_policy, "disconnect") ) {
|
||||||
|
options.peer_overflow_policy = broker::overflow_policy::disconnect;
|
||||||
|
}
|
||||||
|
else if ( util::streq(peer_overflow_policy, "drop_oldest") ) {
|
||||||
|
options.peer_overflow_policy = broker::overflow_policy::drop_oldest;
|
||||||
|
}
|
||||||
|
else if ( util::streq(peer_overflow_policy, "drop_newest") ) {
|
||||||
|
options.peer_overflow_policy = broker::overflow_policy::drop_newest;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
reporter->FatalError("Invalid Broker::peer_overflow_policy: %s", peer_overflow_policy);
|
||||||
|
}
|
||||||
|
|
||||||
|
options.web_socket_buffer_size = get_option("Broker::web_socket_buffer_size")->AsCount();
|
||||||
|
auto web_socket_overflow_policy = get_option("Broker::web_socket_overflow_policy")->AsString()->CheckString();
|
||||||
|
if ( util::streq(web_socket_overflow_policy, "disconnect") ) {
|
||||||
|
options.web_socket_overflow_policy = broker::overflow_policy::disconnect;
|
||||||
|
}
|
||||||
|
else if ( util::streq(web_socket_overflow_policy, "drop_oldest") ) {
|
||||||
|
options.web_socket_overflow_policy = broker::overflow_policy::drop_oldest;
|
||||||
|
}
|
||||||
|
else if ( util::streq(web_socket_overflow_policy, "drop_newest") ) {
|
||||||
|
options.web_socket_overflow_policy = broker::overflow_policy::drop_newest;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
reporter->FatalError("Invalid Broker::web_socket_overflow_policy: %s", web_socket_overflow_policy);
|
||||||
|
}
|
||||||
|
|
||||||
broker::configuration config{std::move(options)};
|
broker::configuration config{std::move(options)};
|
||||||
|
|
||||||
config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());
|
config.openssl_cafile(get_option("Broker::ssl_cafile")->AsString()->CheckString());
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue