cluster/zeromq: Improve XPUB stall behavior, add a metric

Instead of fprintf, track the number of occurrences via a metric and
change the sleep loop to a blocking send instead.
This commit is contained in:
Arne Welzel 2025-03-25 20:31:06 +01:00
parent 33d7e5a7bf
commit bfffc8dac8
2 changed files with 40 additions and 14 deletions

View file

@ -23,6 +23,7 @@
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/util.h"
extern int signal_val;
@ -102,6 +103,11 @@ void ZeroMQBackend::DoInitPostScript() {
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
total_xpub_stalls =
zeek::telemetry_mgr
->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {},
"Counter for how many times sending on the XPUB socket stalled due to EAGAIN.");
}
void ZeroMQBackend::DoTerminate() {
@ -434,20 +440,29 @@ void ZeroMQBackend::Run() {
break;
}
// Empty result means xpub.send() returned EAGAIN. The
// socket reached its high water mark and we should
// relax / backoff a bit. Otherwise we'll be spinning
// unproductively very fast here. Note that this is going
// to build up backpressure and eventually inproc.send()
// will block from the main thread.
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we cannot send right now. We simply attempt
// to re-send the message without the dontwait flag after increasing
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
// there's enough room available.
if ( ! result ) {
++tries;
auto sleep_for = std::min(tries * 10, 500);
ZEROMQ_THREAD_PRINTF(
"xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries,
sleep_for);
total_xpub_stalls->Inc();
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for));
try {
// We sent non-blocking above so we are able to observe and report stalls
// in a metric. Now that we have done that switch to blocking send.
zmq::send_flags block_flags =
zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
result = xpub.send(part, block_flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
err.num());
break;
}
}
} while ( ! result );
}

View file

@ -10,7 +10,15 @@
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
namespace zeek::cluster::zeromq {
namespace zeek {
namespace telemetry {
class Counter;
using CounterPtr = std::shared_ptr<Counter>;
} // namespace telemetry
namespace cluster::zeromq {
class ZeroMQBackend : public cluster::ThreadedBackend {
public:
@ -105,6 +113,9 @@ private:
// Tracking the subscriptions on the local XPUB socket.
std::map<std::string, SubscribeCallback> subscription_callbacks;
std::set<std::string> xpub_subscriptions;
zeek::telemetry::CounterPtr total_xpub_stalls;
};
} // namespace zeek::cluster::zeromq
} // namespace cluster::zeromq
} // namespace zeek