mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/awelzel/zeromq-no-fprintf-at-overload'
* origin/topic/awelzel/zeromq-no-fprintf-at-overload: cluster/zeromq: Improve XPUB stall behavior, add a metric
This commit is contained in:
commit
acab4236e2
4 changed files with 48 additions and 15 deletions
7
CHANGES
7
CHANGES
|
@ -1,3 +1,10 @@
|
|||
7.2.0-dev.431 | 2025-03-26 14:23:51 +0100
|
||||
|
||||
* cluster/zeromq: Improve XPUB stall behavior, add a metric (Arne Welzel, Corelight)
|
||||
|
||||
Instead of fprintf, track the number of occurrences via a metric and
|
||||
change the sleep loop to a blocking send instead.
|
||||
|
||||
7.2.0-dev.429 | 2025-03-26 13:59:23 +0100
|
||||
|
||||
* GH-4309: telemetry: Run callbacks at collect time (Arne Welzel, Corelight)
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
7.2.0-dev.429
|
||||
7.2.0-dev.431
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/backend/zeromq/Plugin.h"
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
|
||||
#include "zeek/telemetry/Manager.h"
|
||||
#include "zeek/util.h"
|
||||
|
||||
extern int signal_val;
|
||||
|
@ -102,6 +103,11 @@ void ZeroMQBackend::DoInitPostScript() {
|
|||
|
||||
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
|
||||
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
|
||||
|
||||
total_xpub_stalls =
|
||||
zeek::telemetry_mgr
|
||||
->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {},
|
||||
"Counter for how many times sending on the XPUB socket stalled due to EAGAIN.");
|
||||
}
|
||||
|
||||
void ZeroMQBackend::DoTerminate() {
|
||||
|
@ -434,20 +440,29 @@ void ZeroMQBackend::Run() {
|
|||
break;
|
||||
}
|
||||
|
||||
// Empty result means xpub.send() returned EAGAIN. The
|
||||
// socket reached its high water mark and we should
|
||||
// relax / backoff a bit. Otherwise we'll be spinning
|
||||
// unproductively very fast here. Note that this is going
|
||||
// to build up backpressure and eventually inproc.send()
|
||||
// will block from the main thread.
|
||||
// Empty result means xpub.send() returned EAGAIN. The socket reached
|
||||
// its high-water-mark and we cannot send right now. We simply attempt
|
||||
// to re-send the message without the dontwait flag after increasing
|
||||
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
|
||||
// there's enough room available.
|
||||
if ( ! result ) {
|
||||
++tries;
|
||||
auto sleep_for = std::min(tries * 10, 500);
|
||||
ZEROMQ_THREAD_PRINTF(
|
||||
"xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries,
|
||||
sleep_for);
|
||||
total_xpub_stalls->Inc();
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for));
|
||||
try {
|
||||
// We sent non-blocking above so we are able to observe and report stalls
|
||||
// in a metric. Now that we have done that switch to blocking send.
|
||||
zmq::send_flags block_flags =
|
||||
zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
|
||||
result = xpub.send(part, block_flags);
|
||||
} catch ( zmq::error_t& err ) {
|
||||
if ( err.num() == ETERM )
|
||||
return;
|
||||
|
||||
// XXX: What other error can happen here? How should we react?
|
||||
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
|
||||
err.num());
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while ( ! result );
|
||||
}
|
||||
|
|
|
@ -10,7 +10,15 @@
|
|||
#include "zeek/cluster/Serializer.h"
|
||||
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
|
||||
|
||||
namespace zeek::cluster::zeromq {
|
||||
|
||||
namespace zeek {
|
||||
|
||||
namespace telemetry {
|
||||
class Counter;
|
||||
using CounterPtr = std::shared_ptr<Counter>;
|
||||
} // namespace telemetry
|
||||
|
||||
namespace cluster::zeromq {
|
||||
|
||||
class ZeroMQBackend : public cluster::ThreadedBackend {
|
||||
public:
|
||||
|
@ -105,6 +113,9 @@ private:
|
|||
// Tracking the subscriptions on the local XPUB socket.
|
||||
std::map<std::string, SubscribeCallback> subscription_callbacks;
|
||||
std::set<std::string> xpub_subscriptions;
|
||||
|
||||
zeek::telemetry::CounterPtr total_xpub_stalls;
|
||||
};
|
||||
|
||||
} // namespace zeek::cluster::zeromq
|
||||
} // namespace cluster::zeromq
|
||||
} // namespace zeek
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue