diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index e450a96e50..947ee17533 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -1,5 +1,7 @@ ##! ZeroMQ cluster backend support. ##! +##! Overview +##! ##! For publish-subscribe functionality, one node in the Zeek cluster spawns a ##! thread running a central broker listening on a XPUB and XSUB socket. ##! These sockets are connected via `zmq_proxy() `_. @@ -22,7 +24,43 @@ ##! possible to run non-Zeek logger nodes. All a logger node needs to do is ##! open a ZeroMQ PULL socket and interpret the format used by Zeek nodes ##! to send their log writes. - +##! +##! Overload Behavior +##! +##! The ZeroMQ cluster backend by default drops outgoing and incoming events +##! when the Zeek cluster is overloaded. Dropping of outgoing events is governed +##! by the :zeek:see:`Cluster::Backend::ZeroMQ::xpub_sndhwm` setting. This +##! is the High Water Mark (HWM) for the local XPUB socket's queue. Once reached, +##! any outgoing events are dropped until there's room in the socket's queue again. +##! The metric ``zeek_cluster_zeromq_xpub_drops_total`` is incremented for every +##! dropped event. +##! +##! For incoming events, the :zeek:see:`Cluster::Backend::ZeroMQ::onloop_queue_hwm` +##! setting is used. Remote events received via the local XSUB socket are first +##! enqueued as raw event messages for processing on Zeek's main event loop. +##! When this queue is full due to more remote events incoming than Zeek +##! can possibly process in an event loop iteration, incoming events are dropped +##! and the ``zeek_cluster_zeromq_onloop_drops_total`` metric is incremented. +##! +##! Incoming log batches or subscription and unsubscription events are passed +##! through the onloop queue, but the HWM does currently not apply to them. The +##! assumption is that 1) these are not frequent and 2) more important than +##! arbitrary publish-subscribe events. +##! +##! To avoid dropping any events (e.g. for performance testing or offline PCAP +##! processing), the recommended strategy is to set both +##! :zeek:see:`Cluster::Backend::ZeroMQ::xpub_sndhwm` and +##! :zeek:see:`Cluster::Backend::ZeroMQ::onloop_queue_hwm` to ``0``, +##! disabling the HWM and dropping logic. It is up to the user to monitor CPU +##! and memory usage of individual nodes to avoid overloading and running into +##! out-of-memory situations. +##! +##! As a Zeek operator, you should monitor ``zeek_cluster_zeromq_xpub_drops_total`` +##! and ``zeek_cluster_zeromq_onloop_drops_total``. Any non-zero values for these +##! metrics indicate an overloaded Zeek cluster. See the the cluster telemetry +##! options :zeek:see:`Cluster::Telemetry::core_metrics` and +##! :zeek:see:`Cluster::Telemetry::websocket_metrics` for ways to get a better +##! understanding about the events published and received. @load base/utils/addrs module Cluster::Backend::ZeroMQ; @@ -99,7 +137,8 @@ export { ## Send high water mark value for the XPUB socket. ## - ## If reached, Zeek nodes will block or drop messages. + ## Events published when the XPUB queue is full will be dropped and the + ## ``zeek_cluster_zeromq_xpub_drops_total`` metric incremented. ## ## See ZeroMQ's `ZMQ_SNDHWM documentation `_ ## for more details. @@ -130,6 +169,19 @@ export { ## for more details. const xsub_rcvbuf: int = -1 &redef; + ## Maximum number of incoming events queued for Zeek's event loop. + ## + ## This constant defines the maximum number of remote events queued + ## by the ZeroMQ cluster backend for Zeek's event loop to drain in + ## one go. If you set this value to 0 (unlimited), consider closely + ## CPU and memory usage of cluster nodes as high remote event rates + ## may starve packet processing. + ## + ## If more events are received than can fit the queue, new events will be + ## dropped and the ``zeek_cluster_zeromq_onloop_drops_total`` metric + ## incremented. + const onloop_queue_hwm = 10000 &redef; + ## Configure ZeroMQ's immediate setting on PUSH sockets ## ## Setting this to ``T`` will queue log writes only to completed @@ -188,7 +240,10 @@ export { ## ## Whether to configure ``ZMQ_XPUB_NODROP`` on the XPUB socket ## connecting to the proxy to detect when sending a message fails - ## due to reaching the high-water-mark. + ## due to reaching the high-water-mark. If you set this to **F**, + ## then the XPUB drops metric will stop working as sending on the + ## XPUB socket will always succeed. Unless you're developing on the + ## ZeroMQ cluster backend, keep this set to **T**. ## ## See ZeroMQ's `ZMQ_XPUB_NODROP documentation `_ ## for more details. diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index acbc430018..45763fab54 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -22,10 +22,12 @@ #include "zeek/Reporter.h" #include "zeek/Val.h" #include "zeek/cluster/Backend.h" +#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/telemetry/Manager.h" +#include "zeek/util-types.h" #include "zeek/util.h" extern int signal_val; @@ -71,10 +73,25 @@ constexpr DebugFlag operator&(uint8_t x, DebugFlag y) { return static_cast ZeroMQBackend::Instantiate(std::unique_ptr es, + std::unique_ptr ls, + std::unique_ptr ehs) { + auto onloop_queue_hwm = zeek::id::find_val("Cluster::Backend::ZeroMQ::onloop_queue_hwm")->AsCount(); + return std::make_unique(std::move(es), std::move(ls), std::move(ehs), onloop_queue_hwm); +} + ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, - std::unique_ptr ehs) - : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs)), - main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)) {} + std::unique_ptr ehs, zeek_uint_t onloop_queue_hwm) + : ThreadedBackend("ZeroMQ", std::move(es), std::move(ls), std::move(ehs), + new zeek::detail::OnLoopProcess(this, "ZeroMQ", onloop_queue_hwm)), + main_inproc(zmq::socket_t(ctx, zmq::socket_type::pair)), + // Counters for block and drop metrics. + total_xpub_drops( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_xpub_drops", {}, + "Number of published events dropped due to XPUB socket HWM.")), + total_onloop_drops( + zeek::telemetry_mgr->CounterInstance("zeek", "cluster_zeromq_onloop_drops", {}, + "Number of received events dropped due to OnLoop queue full.")) {} ZeroMQBackend::~ZeroMQBackend() { try { @@ -114,11 +131,6 @@ void ZeroMQBackend::DoInitPostScript() { event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); - total_xpub_stalls = - zeek::telemetry_mgr - ->CounterInstance("zeek", "cluster_zeromq_xpub_stalls", {}, - "Counter for how many times sending on the XPUB socket stalled due to EAGAIN."); - // xpub/xsub hwm configuration xpub_sndhwm = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndhwm")->AsInt()); xpub_sndbuf = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt()); @@ -471,43 +483,32 @@ void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { flags = flags | zmq::send_flags::sndmore; zmq::send_result_t result; - int tries = 0; - do { - try { - result = xpub.send(part, flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); - break; + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The socket reached + // its high-water-mark and we drop this message. + if ( ! result ) { + total_xpub_drops->Inc(); + + // Warn once about a dropped message. + // + // TODO: warn every n seconds? + if ( xpub_drop_last_warn_at == 0.0 ) { + ZEROMQ_THREAD_PRINTF("xpub: warn: dropped a message due to hwm\n"); + xpub_drop_last_warn_at = util::current_time(true); } - // Empty result means xpub.send() returned EAGAIN. The socket reached - // its high-water-mark and we cannot send right now. We simply attempt - // to re-send the message without the dontwait flag after increasing - // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until - // there's enough room available. - if ( ! result ) { - total_xpub_stalls->Inc(); - - try { - // We sent non-blocking above so we are able to observe and report stalls - // in a metric. Now that we have done that switch to blocking send. - zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore); - result = xpub.send(part, block_flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; - - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), - err.num()); - break; - } - } - } while ( ! result ); + break; // Skip the whole message. + } } } else { @@ -527,7 +528,11 @@ void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - QueueForProcessing(std::move(lm)); + // Always enqueue log messages for processing, they are important. + // + // Hmm, we could also consider bypassing Zeek's event loop completely and + // just go to the log_mgr directly in the future. + OnLoop()->QueueForProcessing(std::move(lm), zeek::detail::QueueFlag::Force); } } @@ -557,7 +562,12 @@ void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs continue; } - QueueForProcessing(std::move(qm)); + // Always enqueue subscription messages from other nodes as events. + // + // There shouldn't be all that many, unless some script calls Cluster::subscribe() and + // Cluster::unsubscribe() a lot, so assume we can afford the extra memory rather than + // missing these low-frequency events. + OnLoop()->QueueForProcessing(std::move(qm), zeek::detail::QueueFlag::Force); } } } @@ -579,7 +589,20 @@ void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs .format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - QueueForProcessing(std::move(em)); + + // If queueing the event message for Zeek's main loop doesn't work due to reaching the onloop hwm, + // drop the message. + // + // This is sort of a suicidal snail pattern but without exiting the node. + if ( ! OnLoop()->QueueForProcessing(std::move(em), zeek::detail::QueueFlag::DontBlock) ) { + total_onloop_drops->Inc(); + + // Warn once about a dropped message. + if ( onloop_drop_last_warn_at == 0.0 ) { + ZEROMQ_THREAD_PRINTF("warn: dropped a message due to onloop queue full\n"); + onloop_drop_last_warn_at = util::current_time(true); + } + } } } diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index cb41a22d6a..11e905d2d7 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -26,7 +26,7 @@ public: * Constructor. */ ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, - std::unique_ptr ehs); + std::unique_ptr ehs, zeek_uint_t onloop_max_queue_size); /** * Destructor. @@ -49,9 +49,7 @@ public: */ static std::unique_ptr Instantiate(std::unique_ptr event_serializer, std::unique_ptr log_serializer, - std::unique_ptr ehs) { - return std::make_unique(std::move(event_serializer), std::move(log_serializer), std::move(ehs)); - } + std::unique_ptr ehs); private: void DoInitPostScript() override; @@ -140,7 +138,12 @@ private: std::map subscription_callbacks; std::set xpub_subscriptions; - zeek::telemetry::CounterPtr total_xpub_stalls; + zeek::telemetry::CounterPtr total_xpub_drops; // events dropped due to XPUB socket hwm reached + zeek::telemetry::CounterPtr total_onloop_drops; // events dropped due to onloop queue full + + // Could rework to log-once-every X seconds if needed. + double xpub_drop_last_warn_at = 0.0; + double onloop_drop_last_warn_at = 0.0; }; } // namespace cluster::zeromq