Merge remote-tracking branch 'origin/topic/awelzel/4431-zeromq-drop-policy-v2'

* origin/topic/awelzel/4431-zeromq-drop-policy-v2:
  cluster.bif: Improve Cluster::publish() docstring
  btest/cluster/zeromq: Add tests for overload behavior
  cluster/zeromq: Metric for msg errors
  cluster/zeromq: Drop events when overloaded
  cluster/zeromq: Comments and move lookups to InitPostScript()
  cluster/zeromq: Rework lambdas to member functions
  cluster/zeromq: Support local XPUB/XSUB hwm and buf configurability
  cluster/OnLoop: Support DontBlock and Force flags for queueing
  cluster/ThreadedBackend: Injectable OnLoopProcess instance
This commit is contained in:
Arne Welzel 2025-07-29 11:29:11 +02:00
commit cd7836dda2
30 changed files with 1259 additions and 208 deletions

View file

@ -264,14 +264,20 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, byte_buffer_span payload) {
}
ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr<EventSerializer> es,
std::unique_ptr<LogSerializer> ls, std::unique_ptr<detail::EventHandlingStrategy> ehs)
: Backend(name, std::move(es), std::move(ls), std::move(ehs)) {
onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, Name());
std::unique_ptr<LogSerializer> ls, std::unique_ptr<detail::EventHandlingStrategy> ehs,
zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>* onloop)
: Backend(name, std::move(es), std::move(ls), std::move(ehs)), onloop(onloop) {
onloop->Register(true); // Register as don't count first
}
ThreadedBackend::ThreadedBackend(std::string_view name, std::unique_ptr<EventSerializer> es,
std::unique_ptr<LogSerializer> ls, std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(name, std::move(es), std::move(ls), std::move(ehs),
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, name)) {}
bool ThreadedBackend::DoInit() {
// Have the backend count so Zeek does not terminate.
// Have the onloop instance count so Zeek does not terminate.
onloop->Register(/*dont_count=*/false);
return true;
}

View file

@ -16,6 +16,7 @@
#include "zeek/Val.h"
#include "zeek/ZeekArgs.h"
#include "zeek/cluster/BifSupport.h"
#include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/Telemetry.h"
#include "zeek/logging/Types.h"
@ -572,7 +573,7 @@ private:
/**
* A cluster backend may receive event and log messages asynchronously
* through threads. The following structs can be used with QueueForProcessing()
* through threads. The following structs can be used with OnLoop()
* to enqueue these messages onto the main IO loop for processing.
*
* EventMessage and LogMessage are processed in a generic fashion in
@ -623,6 +624,13 @@ using QueueMessage = std::variant<EventMessage, LogMessage, BackendMessage>;
*/
class ThreadedBackend : public Backend {
protected:
/**
* Constructor with custom onloop parameter.
*/
ThreadedBackend(std::string_view name, std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs,
zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>* onloop);
/**
* Constructor.
*/
@ -637,6 +645,7 @@ protected:
*
* @param messages Messages to be enqueued.
*/
[[deprecated("Remove in v8.1: Use OnLoop() and QueueForProcessing() directly.")]]
void QueueForProcessing(QueueMessage&& messages);
/**
@ -664,6 +673,11 @@ protected:
*/
void DoTerminate() override;
/**
* @return this backend's OnLoopProcess instance, or nullptr after DoTerminate().
*/
zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>* OnLoop() { return onloop; }
private:
/**
* Process a backend specific message queued as BackendMessage.

View file

@ -14,9 +14,22 @@
#include "zeek/iosource/IOSource.h"
#include "zeek/iosource/Manager.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/util-types.h"
namespace zeek::detail {
/**
* Flags for OnLoopProcess::QueueForProcessing().
*/
enum class QueueFlag : uint8_t {
Block = 0x0, // block until there's room
DontBlock = 0x1, // fail queueing if there's no room
Force = 0x2, // ignore any queue size limitations
};
constexpr QueueFlag operator&(QueueFlag x, QueueFlag y) {
return static_cast<QueueFlag>(static_cast<uint8_t>(x) & static_cast<uint8_t>(y));
};
/**
* Template class allowing work items to be queued by threads and processed
@ -46,19 +59,19 @@ public:
* @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning.
* @param main_thread_id The ID of the main thread for usage checks.
*/
OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250,
OnLoopProcess(Proc* proc, std::string_view tag, size_t max_queue_size = 250,
std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000),
std::thread::id main_thread_id = std::this_thread::get_id())
: cond_timeout(cond_timeout),
max_queue_size(max_queue_size),
proc(proc),
tag(std::move(tag)),
tag(tag),
main_thread_id(main_thread_id),
total_queue_stalls_metric(
total_queue_blocks_metric(
zeek::telemetry_mgr
->CounterFamily(
"zeek", "cluster_onloop_queue_stalls", {"tag"},
"Increased whenever a cluster backend thread is stalled due to the OnLoop queue being full.")
"zeek", "cluster_onloop_queue_blocks", {"tag"},
"Increased whenever a cluster backend thread is blocked due to the OnLoop queue being full.")
->GetOrAdd({{"tag", this->tag}})) {}
/**
@ -116,7 +129,7 @@ public:
bool notify = false;
{
std::scoped_lock lock(mtx);
if ( queue.size() >= max_queue_size )
if ( max_queue_size > 0 && queue.size() >= max_queue_size )
notify = true;
to_process.splice(to_process.end(), queue);
@ -151,37 +164,52 @@ public:
/**
* Queue the given Work item to be processed on Zeek's main thread.
*
* If there's too many items in the queue, this method blocks until
* there's more room available. The zeek_cluster_onloop_queue_stalls_total
* If there's too many items in the queue and flags does not contains Force or DontBlock,
* the method blocks until there's room available. This may result in deadlocks if
* Zeek's event loop is blocked as well. The zeek_cluster_onloop_queue_blocks_total
* metric will be increased once for every cond_timeout being blocked.
*
* If the Force flag is used, the queue's max size is ignored and queueing
* succeeds possibly increasing the queue size to more than max_queue_size.
* If DontBlock is used and the queue is full, queueing fails and false is
* returned.
*
* Calling this method from the main thread will result in an abort().
*
* @param work The work to enqueue.
* @param flags Modifies the behavior.
*
* @return True if the message was queued, else false.
*/
void QueueForProcessing(Work&& work) {
++queuers;
std::list<Work> to_queue{std::move(work)};
bool QueueForProcessing(Work&& work, QueueFlag flags = QueueFlag::Block) {
if ( std::this_thread::get_id() == main_thread_id ) {
fprintf(stderr, "OnLoopProcess::QueueForProcessing() called by main thread!");
abort();
}
++queuers;
auto defer = util::Deferred([this] { --queuers; });
bool fire = false;
size_t qs = 0;
{
std::unique_lock lock(mtx);
// Wait for room in the queue.
while ( IsOpen() && queue.size() >= max_queue_size ) {
total_queue_stalls_metric->Inc();
while ( IsOpen() && max_queue_size > 0 && queue.size() >= max_queue_size ) {
if ( (flags & QueueFlag::Force) == QueueFlag::Force ) // enqueue no matter the limit.
break;
if ( (flags & QueueFlag::DontBlock) == QueueFlag::DontBlock )
return false;
total_queue_blocks_metric->Inc();
cond.wait_for(lock, cond_timeout);
}
if ( IsOpen() ) {
assert(queue.size() < max_queue_size);
assert(to_queue.size() == 1);
std::list<Work> to_queue{std::move(work)};
queue.splice(queue.end(), to_queue);
assert(to_queue.empty());
fire = queue.size() == 1; // first element in queue triggers processing.
}
else {
@ -190,11 +218,10 @@ public:
}
}
if ( fire )
flare.Fire();
--queuers;
return true;
}
private:
@ -215,7 +242,7 @@ private:
std::thread::id main_thread_id;
// Track queue stalling.
telemetry::CounterPtr total_queue_stalls_metric;
telemetry::CounterPtr total_queue_blocks_metric;
};

View file

@ -22,10 +22,12 @@
#include "zeek/Reporter.h"
#include "zeek/Val.h"
#include "zeek/cluster/Backend.h"
#include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/util-types.h"
#include "zeek/util.h"
extern int signal_val;
@ -71,10 +73,28 @@ constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast<Debug
// NOLINTEND(cppcoreguidelines-macro-usage)
std::unique_ptr<Backend> ZeroMQBackend::Instantiate(std::unique_ptr<EventSerializer> es,
std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) {
auto onloop_queue_hwm = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::onloop_queue_hwm")->AsCount();
return std::make_unique<ZeroMQBackend>(std::move(es), std::move(ls), std::move(ehs), onloop_queue_hwm);
}
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)),
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)) {}
std::unique_ptr<detail::EventHandlingStrategy> ehs, zeek_uint_t onloop_queue_hwm)
: ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs),
new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ZeroMQ", onloop_queue_hwm)),
main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)),
// Counters for block and drop metrics.
total_xpub_drops(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {},
"Number of published events dropped due to XPUB socket HWM.")),
total_onloop_drops(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {},
"Number of received events dropped due to OnLoop queue full.")),
total_msg_errors(
zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_msg_errors", {},
"Number of events with the wrong number of message parts.")) {}
ZeroMQBackend::~ZeroMQBackend() {
try {
@ -114,10 +134,19 @@ void ZeroMQBackend::DoInitPostScript() {
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
total_xpub_stalls =
zeek::telemetry_mgr
->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {},
"Counter for how many times sending on the XPUB socket stalled due to EAGAIN.");
// xpub/xsub hwm configuration
xpub_sndhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt());
xpub_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt());
xsub_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt());
xsub_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt());
// log push/pull socket configuration
log_immediate =
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
log_sndhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
log_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
log_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
log_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
}
void ZeroMQBackend::DoTerminate() {
@ -179,6 +208,11 @@ bool ZeroMQBackend::DoInit() {
xpub.set(zmq::sockopt::xpub_nodrop, connect_xpub_nodrop);
xpub.set(zmq::sockopt::xpub_verbose, 1);
xpub.set(zmq::sockopt::sndhwm, xpub_sndhwm);
xpub.set(zmq::sockopt::sndbuf, xpub_sndbuf);
xsub.set(zmq::sockopt::rcvhwm, xsub_rcvhwm);
xsub.set(zmq::sockopt::rcvbuf, xsub_rcvbuf);
try {
xsub.connect(connect_xsub_endpoint);
} catch ( zmq::error_t& err ) {
@ -195,22 +229,6 @@ bool ZeroMQBackend::DoInit() {
return false;
}
auto log_immediate =
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
auto log_sndhwm =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
auto log_sndbuf =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
auto log_rcvhwm =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
auto log_rcvbuf =
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf,
log_rcvhwm, log_rcvbuf, linger_ms);
@ -233,6 +251,11 @@ bool ZeroMQBackend::DoInit() {
}
}
// The connect_log_endpoints variable may be modified by zeek_init(), so
// need to look it up here rather than during DoInitPostScript().
//
// We should've probably introduced a configuration record similar to the
// storage framework, too. Hmm. Maybe in the future.
const auto& log_endpoints = zeek::id::find_val<zeek::VectorVal>("Cluster::Backend::ZeroMQ::connect_log_endpoints");
for ( unsigned int i = 0; i < log_endpoints->Size(); i++ )
connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString());
@ -302,9 +325,11 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string&
if ( i < parts.size() - 1 )
flags = flags | zmq::send_flags::sndmore;
// This should never fail, it will instead block
// when HWM is reached. I guess we need to see if
// and how this can happen :-/
// This never returns EAGAIN. A pair socket blocks whenever the hwm
// is reached, regardless of passing any dontwait flag.
//
// This can result in blocking on Cluster::publish() if the inner
// thread does not consume from child_inproc.
try {
main_inproc.send(parts[i], flags);
} catch ( const zmq::error_t& err ) {
@ -433,156 +458,167 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
return true;
}
// Forward messages from the inprocess bridge.
//
// Either it's 2 parts (tag and payload) for controlling subscriptions
// or terminating the thread, or it is 4 parts in which case all the parts
// are forwarded to the XPUB socket directly for publishing.
void ZeroMQBackend::HandleInprocMessages(std::vector<MultipartMessage>& msgs) {
for ( auto& msg : msgs ) {
if ( msg.size() == 2 ) {
InprocTag tag = msg[0].data<InprocTag>()[0];
switch ( tag ) {
case InprocTag::XsubUpdate: {
xsub.send(msg[1], zmq::send_flags::none);
break;
}
case InprocTag::Terminate: {
if ( self_thread_stop )
ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message");
self_thread_stop = true;
}
}
}
else if ( msg.size() == 4 ) {
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result;
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break;
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we drop this message.
if ( ! result ) {
total_xpub_drops->Inc();
// Warn once about a dropped message.
//
// TODO: warn every n seconds?
if ( xpub_drop_last_warn_at == 0.0 ) {
ZEROMQ_THREAD_PRINTF("xpub: warn: dropped a message due to hwm\n");
xpub_drop_last_warn_at = util::current_time(true);
}
break; // Skip the whole message.
}
}
}
else {
ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size());
total_msg_errors->Inc();
}
}
}
void ZeroMQBackend::HandleLogMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
// sender, format, type, payload
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
total_msg_errors->Inc();
continue;
}
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()), .payload = std::move(payload)};
// Always enqueue log messages for processing, they are important.
//
// Hmm, we could also consider bypassing Zeek's event loop completely and
// just go to the log_mgr directly in the future.
OnLoop()->QueueForProcessing(std::move(lm), zeek::detail::QueueFlag::Force);
}
}
void ZeroMQBackend::HandleXPubMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
total_msg_errors->Inc();
continue;
}
// Check if the messages starts with \x00 or \x01 to understand if it's
// a subscription or unsubscription message.
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
if ( first == 0 || first == 1 ) {
QueueMessage qm;
auto* start = msg[0].data<std::byte>() + 1;
auto* end = msg[0].data<std::byte>() + msg[0].size();
byte_buffer topic(start, end);
if ( first == 1 ) {
qm = BackendMessage{1, std::move(topic)};
}
else if ( first == 0 ) {
qm = BackendMessage{0, std::move(topic)};
}
else {
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
continue;
}
// Always enqueue subscription messages from other nodes as events.
//
// There shouldn't be all that many, unless some script calls Cluster::subscribe() and
// Cluster::unsubscribe() a lot, so assume we can afford the extra memory rather than
// missing these low-frequency events.
OnLoop()->QueueForProcessing(std::move(qm), zeek::detail::QueueFlag::Force);
}
}
}
void ZeroMQBackend::HandleXSubMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
total_msg_errors->Inc();
continue;
}
// Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == NodeId() )
continue;
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
EventMessage em{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
// If queueing the event message for Zeek's main loop doesn't work due to reaching the onloop hwm,
// drop the message.
//
// This is sort of a suicidal snail pattern but without exiting the node.
if ( ! OnLoop()->QueueForProcessing(std::move(em), zeek::detail::QueueFlag::DontBlock) ) {
total_onloop_drops->Inc();
// Warn once about a dropped message.
if ( onloop_drop_last_warn_at == 0.0 ) {
ZEROMQ_THREAD_PRINTF("warn: dropped a message due to onloop queue full\n");
onloop_drop_last_warn_at = util::current_time(true);
}
}
}
}
void ZeroMQBackend::Run() {
char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex><nul>
snprintf(name, sizeof(name), "zmq-%p", this);
util::detail::set_thread_name(name);
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
// sender, format, type, payload
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(lm));
}
};
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge.
//
// Either it's 2 parts (tag and payload) for controlling subscriptions
// or terminating the thread, or it is 4 parts in which case all the parts
// are forwarded to the XPUB socket directly for publishing.
for ( auto& msg : msgs ) {
if ( msg.size() == 2 ) {
InprocTag tag = msg[0].data<InprocTag>()[0];
switch ( tag ) {
case InprocTag::XsubUpdate: {
xsub.send(msg[1], zmq::send_flags::none);
break;
}
case InprocTag::Terminate: {
if ( self_thread_stop )
ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message");
self_thread_stop = true;
}
}
}
else if ( msg.size() == 4 ) {
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result;
int tries = 0;
do {
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break;
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we cannot send right now. We simply attempt
// to re-send the message without the dontwait flag after increasing
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
// there's enough room available.
if ( ! result ) {
total_xpub_stalls->Inc();
try {
// We sent non-blocking above so we are able to observe and report stalls
// in a metric. Now that we have done that switch to blocking send.
zmq::send_flags block_flags =
zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
result = xpub.send(part, block_flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
err.num());
break;
}
}
} while ( ! result );
}
}
else {
ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size());
}
}
};
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
continue;
}
// Check if the messages starts with \x00 or \x01 to understand if it's
// a subscription or unsubscription message.
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
if ( first == 0 || first == 1 ) {
QueueMessage qm;
auto* start = msg[0].data<std::byte>() + 1;
auto* end = msg[0].data<std::byte>() + msg[0].size();
byte_buffer topic(start, end);
if ( first == 1 ) {
qm = BackendMessage{1, std::move(topic)};
}
else if ( first == 0 ) {
qm = BackendMessage{0, std::move(topic)};
}
else {
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
continue;
}
QueueForProcessing(std::move(qm));
}
}
};
auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
// Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == NodeId() )
continue;
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
EventMessage em{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(em));
}
};
struct SocketInfo {
zmq::socket_ref socket;
std::string name;
@ -590,10 +626,10 @@ void ZeroMQBackend::Run() {
};
std::vector<SocketInfo> sockets = {
{.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages},
{.socket = xpub, .name = "xpub", .handler = HandleXPubMessages},
{.socket = xsub, .name = "xsub", .handler = HandleXSubMessages},
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
{.socket = child_inproc, .name = "inproc", .handler = [this](auto& msgs) { HandleInprocMessages(msgs); }},
{.socket = xpub, .name = "xpub", .handler = [this](const auto& msgs) { HandleXPubMessages(msgs); }},
{.socket = xsub, .name = "xsub", .handler = [this](const auto& msgs) { HandleXSubMessages(msgs); }},
{.socket = log_pull, .name = "log_pull", .handler = [this](const auto& msgs) { HandleLogMessages(msgs); }},
};
// Called when Run() terminates.

View file

@ -26,7 +26,7 @@ public:
* Constructor.
*/
ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs);
std::unique_ptr<detail::EventHandlingStrategy> ehs, zeek_uint_t onloop_max_queue_size);
/**
* Destructor.
@ -49,9 +49,7 @@ public:
*/
static std::unique_ptr<Backend> Instantiate(std::unique_ptr<EventSerializer> event_serializer,
std::unique_ptr<LogSerializer> log_serializer,
std::unique_ptr<detail::EventHandlingStrategy> ehs) {
return std::make_unique<ZeroMQBackend>(std::move(event_serializer), std::move(log_serializer), std::move(ehs));
}
std::unique_ptr<detail::EventHandlingStrategy> ehs);
private:
void DoInitPostScript() override;
@ -73,6 +71,13 @@ private:
void DoReadyToPublishCallback(ReadyCallback cb) override;
// Inner thread helper methods.
using MultipartMessage = std::vector<zmq::message_t>;
void HandleInprocMessages(std::vector<MultipartMessage>& msgs);
void HandleLogMessages(const std::vector<MultipartMessage>& msgs);
void HandleXPubMessages(const std::vector<MultipartMessage>& msgs);
void HandleXSubMessages(const std::vector<MultipartMessage>& msgs);
// Script level variables.
std::string connect_xsub_endpoint;
std::string connect_xpub_endpoint;
@ -92,6 +97,19 @@ private:
EventHandlerPtr event_subscription;
EventHandlerPtr event_unsubscription;
// xpub/xsub configuration
int xpub_sndhwm = 1000; // libzmq default
int xpub_sndbuf = -1; // OS defaults
int xsub_rcvhwm = 1000; // libzmq default
int xsub_rcvbuf = -1; // OS defaults
// log socket configuration
int log_immediate = false; // libzmq default
int log_sndhwm = 1000; // libzmq default
int log_sndbuf = -1; // OS defaults
int log_rcvhwm = 1000; // libzmq defaults
int log_rcvbuf = -1; // OS defaults
zmq::context_t ctx;
zmq::socket_t xsub;
zmq::socket_t xpub;
@ -120,7 +138,13 @@ private:
std::map<std::string, SubscribeCallback> subscription_callbacks;
std::set<std::string> xpub_subscriptions;
zeek::telemetry::CounterPtr total_xpub_stalls;
zeek::telemetry::CounterPtr total_xpub_drops; // events dropped due to XPUB socket hwm reached
zeek::telemetry::CounterPtr total_onloop_drops; // events dropped due to onloop queue full
zeek::telemetry::CounterPtr total_msg_errors; // messages with the wrong number of parts
// Could rework to log-once-every X seconds if needed.
double xpub_drop_last_warn_at = 0.0;
double onloop_drop_last_warn_at = 0.0;
};
} // namespace cluster::zeromq

View file

@ -25,7 +25,10 @@ type Cluster::WebSocketTLSOptions: record;
## :zeek:see:`Cluster::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
## Returns: T if the event was accepted for sending. Depending on
## the selected cluster backend, an event may be dropped
## when a Zeek cluster is overloadede. This can happen on
## the sending or receiving node.
function Cluster::publish%(topic: string, ...%): bool
%{
ScriptLocationScope scope{frame};