cluster/zeromq: Drop events when overloaded

When either the XPUB socket's hwm is reached, or the onloop queue is
full, drop the events. Users can set ths xpub_sndhwm and
onloop_queue_hwm to 0 to avoid these drops at the risk of unbounded
memory growth.
This commit is contained in:
Arne Welzel 2025-06-30 19:07:19 +02:00
parent 5de9296c77
commit 073de9f5fd
3 changed files with 134 additions and 53 deletions

View file

@ -22,10 +22,12 @@
#include "zeek/Reporter.h"
#include "zeek/Val.h"
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/util-types.h"
#include "zeek/util.h"
extern int signal_val;
@ -71,10 +73,25 @@ constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast<Debug
// NOLINTEND(cppcoreguidelines-macro-usage)
std::unique_ptr<Backend> ZeroMQBackend::Instantiate(std::unique_ptr<EventSerializer> es,
std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) {
auto onloop_queue_hwm = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::onloop_queue_hwm")->AsCount();
return std::make_unique<ZeroMQBackend>(std::move(es), std::move(ls), std::move(ehs), onloop_queue_hwm);
}
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)),
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)) {}
std::unique_ptr<detail::EventHandlingStrategy> ehs, zeek_uint_t onloop_queue_hwm)
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
// Counters for block and drop metrics.
total_xpub_drops(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
"Number of published events dropped due to XPUB socket HWM.")),
total_onloop_drops(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {},
"Number of received events dropped due to OnLoop queue full.")) {}
ZeroMQBackend::~ZeroMQBackend() {
try {
@ -114,11 +131,6 @@ void ZeroMQBackend::DoInitPostScript() {
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
total_xpub_stalls =
zeek::telemetry_mgr
->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {},
"Counter for how many times sending on the XPUB socket stalled due to EAGAIN.");
// xpub/xsub hwm configuration
xpub_sndhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt());
xpub_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt());
@ -471,43 +483,32 @@ void ZeroMQBackend::HandleInprocMessages(std::vector<MultipartMessage>& msgs) {
flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result;
int tries = 0;
do {
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break;
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we drop this message.
if ( ! result ) {
total_xpub_drops->Inc();
// Warn once about a dropped message.
//
// TODO: warn every n seconds?
if ( xpub_drop_last_warn_at == 0.0 ) {
ZEROMQ_THREAD_PRINTF("xpub: warn: dropped a message due to hwm\n");
xpub_drop_last_warn_at = util::current_time(true);
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we cannot send right now. We simply attempt
// to re-send the message without the dontwait flag after increasing
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
// there's enough room available.
if ( ! result ) {
total_xpub_stalls->Inc();
try {
// We sent non-blocking above so we are able to observe and report stalls
// in a metric. Now that we have done that switch to blocking send.
zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
result = xpub.send(part, block_flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
err.num());
break;
}
}
} while ( ! result );
break; // Skip the whole message.
}
}
}
else {
@ -527,7 +528,11 @@ void ZeroMQBackend::HandleLogMessages(const std::vector<MultipartMessage>& msgs)
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()), .payload = std::move(payload)};
QueueForProcessing(std::move(lm));
// Always enqueue log messages for processing, they are important.
//
// Hmm, we could also consider bypassing Zeek's event loop completely and
// just go to the log_mgr directly in the future.
OnLoop()->QueueForProcessing(std::move(lm), zeek::detail::QueueFlag::Force);
}
}
@ -557,7 +562,12 @@ void ZeroMQBackend::HandleXPubMessages(const std::vector<MultipartMessage>& msgs
continue;
}
QueueForProcessing(std::move(qm));
// Always enqueue subscription messages from other nodes as events.
//
// There shouldn't be all that many, unless some script calls Cluster::subscribe() and
// Cluster::unsubscribe() a lot, so assume we can afford the extra memory rather than
// missing these low-frequency events.
OnLoop()->QueueForProcessing(std::move(qm), zeek::detail::QueueFlag::Force);
}
}
}
@ -579,7 +589,20 @@ void ZeroMQBackend::HandleXSubMessages(const std::vector<MultipartMessage>& msgs
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(em));
// If queueing the event message for Zeek's main loop doesn't work due to reaching the onloop hwm,
// drop the message.
//
// This is sort of a suicidal snail pattern but without exiting the node.
if ( ! OnLoop()->QueueForProcessing(std::move(em), zeek::detail::QueueFlag::DontBlock) ) {
total_onloop_drops->Inc();
// Warn once about a dropped message.
if ( onloop_drop_last_warn_at == 0.0 ) {
ZEROMQ_THREAD_PRINTF("warn: dropped a message due to onloop queue full\n");
onloop_drop_last_warn_at = util::current_time(true);
}
}
}
}