Switch default CAF scheduler policy to work sharing

It may generally be better for our default use-case, as workers may
save a few percent cpu utilization as this policy does not have to
use any polling like the stealing policy does.

This also helps avoid a potential issue with the implementation of
spinlocks used in the work-stealing policy in current CAF versions,
where there's some conditions where lock contention causes a thread
to spin for long periods without relinquishing the cpu to others.
This commit is contained in:
Jon Siwek 2019-06-28 16:34:33 -07:00
parent 9795782ecb
commit 5b64c35185
2 changed files with 27 additions and 7 deletions

View file

@ -73,32 +73,43 @@ export {
## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting. ## ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.
const max_threads = 1 &redef; const max_threads = 1 &redef;
## The CAF scheduling policy to use. Available options are "sharing" and
## "stealing". The "sharing" policy uses a single, global work queue along
## with mutex and condition variable used for accessing it, which may be
## better for cases that don't require much concurrency or need lower power
## consumption. The "stealing" policy uses multiple work queues protected
## by spinlocks, which may be better for use-cases that have more
## concurrency needs. E.g. may be worth testing the "stealing" policy
## along with dedicating more threads if a lot of data store processing is
## required.
const scheduler_policy = "sharing" &redef;
## Interval of time for under-utilized Broker/CAF threads to sleep ## Interval of time for under-utilized Broker/CAF threads to sleep
## when in "moderate" mode. ## when in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_sleep = 16 msec &redef; const moderate_sleep = 16 msec &redef;
## Interval of time for under-utilized Broker/CAF threads to sleep ## Interval of time for under-utilized Broker/CAF threads to sleep
## when in "relaxed" mode. ## when in "relaxed" mode. Only used for the "stealing" scheduler policy.
const relaxed_sleep = 64 msec &redef; const relaxed_sleep = 64 msec &redef;
## Number of work-stealing polling attempts for Broker/CAF threads ## Number of work-stealing polling attempts for Broker/CAF threads
## in "aggressive" mode. ## in "aggressive" mode. Only used for the "stealing" scheduler policy.
const aggressive_polls = 5 &redef; const aggressive_polls = 5 &redef;
## Number of work-stealing polling attempts for Broker/CAF threads ## Number of work-stealing polling attempts for Broker/CAF threads
## in "moderate" mode. ## in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_polls = 5 &redef; const moderate_polls = 5 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads ## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "aggressive" mode. ## in "aggressive" mode. Only used for the "stealing" scheduler policy.
const aggressive_interval = 4 &redef; const aggressive_interval = 4 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads ## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "moderate" mode. ## in "moderate" mode. Only used for the "stealing" scheduler policy.
const moderate_interval = 2 &redef; const moderate_interval = 2 &redef;
## Frequency of work-stealing polling attempts for Broker/CAF threads ## Frequency of work-stealing polling attempts for Broker/CAF threads
## in "relaxed" mode. ## in "relaxed" mode. Only used for the "stealing" scheduler policy.
const relaxed_interval = 1 &redef; const relaxed_interval = 1 &redef;
## Forward all received messages to subscribing peers. ## Forward all received messages to subscribing peers.

View file

@ -178,6 +178,15 @@ void Manager::InitPostScript()
BrokerConfig config{std::move(options)}; BrokerConfig config{std::move(options)};
auto scheduler_policy = get_option("Broker::scheduler_policy")->AsString()->CheckString();
if ( streq(scheduler_policy, "sharing") )
config.set("scheduler.policy", caf::atom("sharing"));
else if ( streq(scheduler_policy, "stealing") )
config.set("scheduler.policy", caf::atom("stealing"));
else
reporter->FatalError("Invalid Broker::scheduler_policy: %s", scheduler_policy);
auto max_threads_env = zeekenv("ZEEK_BROKER_MAX_THREADS"); auto max_threads_env = zeekenv("ZEEK_BROKER_MAX_THREADS");
if ( max_threads_env ) if ( max_threads_env )