cluster/zeromq: Support configuring IO threads for proxy thread

This commit is contained in:
Arne Welzel 2025-01-22 17:32:28 +01:00
parent ba7b605a97
commit aad512c616
5 changed files with 21 additions and 3 deletions

View file

@ -55,6 +55,14 @@ export {
## By default, this is set to ``T`` on the manager and ``F`` elsewhere.
const run_proxy_thread: bool = F &redef;
## How many IO threads to configure for the ZeroMQ context that
## acts as a central broker.
## See ZeroMQ's `ZMQ_IO_THREADS documentation <http://api.zeromq.org/4-2:zmq-ctx-set#toc4>`_
## and the `I/O threads <https://zguide.zeromq.org/docs/chapter2/#I-O-Threads>`
## section in the ZeroMQ guide for details.
const proxy_io_threads = 2 &redef;
## XSUB listen endpoint for the central broker.
##
## This setting is used for the XSUB socket of the central broker started

View file

@ -36,6 +36,8 @@ void thread_fun(ProxyThread::Args* args) {
} // namespace
bool ProxyThread::Start() {
ctx.set(zmq::ctxopt::io_threads, io_threads);
zmq::socket_t xpub(ctx, zmq::socket_type::xpub);
zmq::socket_t xsub(ctx, zmq::socket_type::xsub);

View file

@ -21,8 +21,11 @@ public:
* @param xsub_endpoint the XSUB socket address to listen on.
* @param xpub_nodrop the xpub_nodrop option to use on the XPUB socket.
*/
ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop)
: xpub_endpoint(std::move(xpub_endpoint)), xsub_endpoint(std::move(xsub_endpoint)), xpub_nodrop(xpub_nodrop) {}
ProxyThread(std::string xpub_endpoint, std::string xsub_endpoint, int xpub_nodrop, int io_threads)
: xpub_endpoint(std::move(xpub_endpoint)),
xsub_endpoint(std::move(xsub_endpoint)),
xpub_nodrop(xpub_nodrop),
io_threads(io_threads) {}
~ProxyThread() { Shutdown(); }
@ -52,5 +55,6 @@ private:
std::string xpub_endpoint;
std::string xsub_endpoint;
int xpub_nodrop = 1;
int io_threads = 2;
};
} // namespace zeek::cluster::zeromq

View file

@ -97,6 +97,8 @@ void ZeroMQBackend::DoInitPostScript() {
linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
poll_max_messages = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::poll_max_messages")->Get();
debug_flags = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::debug_flags")->Get();
proxy_io_threads =
static_cast<int>(zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get());
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
@ -240,7 +242,8 @@ bool ZeroMQBackend::DoInit() {
}
bool ZeroMQBackend::SpawnZmqProxyThread() {
proxy_thread = std::make_unique<ProxyThread>(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop);
proxy_thread =
std::make_unique<ProxyThread>(listen_xpub_endpoint, listen_xsub_endpoint, listen_xpub_nodrop, proxy_io_threads);
return proxy_thread->Start();
}

View file

@ -99,6 +99,7 @@ private:
std::thread self_thread;
int proxy_io_threads = 2;
std::unique_ptr<ProxyThread> proxy_thread;
// Tracking the subscriptions on the local XPUB socket.