From 643b9266259c4fb78f662988674552c5fd17dd05 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 24 Apr 2025 16:20:07 +0200 Subject: [PATCH] cluster/zeromq: Implement DoReadyToPublishCallback() The ZeroMQ heuristic for "ready to publish" is to create an unique and ephemeral subscription using the XSUB socket and observe it arrive on the XPUB socket. At this point, visibility into other node's subscriptions is provided. --- .../cluster/backend/zeromq/main.zeek | 12 +++++++ src/cluster/backend/zeromq/ZeroMQ.cc | 31 +++++++++++++++++++ src/cluster/backend/zeromq/ZeroMQ.h | 4 +++ 3 files changed, 47 insertions(+) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..34c127ddf3 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -218,6 +218,18 @@ export { ## subscriptions and hello messages from other ## nodes. These expirations trigger reporter warnings. const hello_expiration: interval = 10sec &redef; + + ## The topic prefix used for internal ZeroMQ specific communication. + ## + ## This is used for the "ready to publish callback" topics. + ## + ## Zeek creates a short-lived subscription for a auto-generated + ## topic name with this prefix and waits for it to be confirmed + ## on its XPUB socket. Once this happens, the XPUB socket should've + ## also received all other active subscriptions of other nodes in a + ## cluster from the central XPUB/XSUB proxy and therefore can be + ## deemed ready for publish operations. + const internal_topic_prefix = "zeek.zeromq.internal." &redef; } redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index a620c8d82c..077100c000 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -16,6 +16,7 @@ #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" +#include "zeek/ID.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" @@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() { linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); + internal_topic_prefix = + zeek::id::find_const("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString(); proxy_io_threads = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get()); @@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { } } +void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) { + // Setup an ephemeral subscription for a topic produced with the internal + // topic prefix, this backend's node identifier and an incrementing counter. + // When the SubscribeCallback for the subscription is invoked, meaning it + // has become visible on the XPUB socket, call the provided ready callback + // and cancel the subscription by unsubscribing from the topic again. + // + // The heuristic here is that seeing a subscription created by the node itself + // also leads to the XPUB/XSUB proxy having sent all subscriptions from other + // nodes in the cluster. + // + // Without this heuristic, short-lived WebSocket clients may fail to publish + // messages as ZeroMQ implements sender-side subscription filtering and simply + // discards messages to topics for which it hasn't seen any subscriptions yet. + static int ready_topic_counter = 0; + ++ready_topic_counter; + + auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) { + Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message}; + cb(info); + + // Unsubscribe again, we're not actually interested in this topic. + Unsubscribe(topic_prefix); + }; + + std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter); + Subscribe(topic, std::move(scb)); +} } // namespace cluster::zeromq } // namespace zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index f09bfa09db..dae822160b 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -71,6 +71,8 @@ private: bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override; + void DoReadyToPublishCallback(ReadyCallback cb) override; + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint; @@ -84,6 +86,8 @@ private: zeek_uint_t poll_max_messages = 0; zeek_uint_t debug_flags = 0; + std::string internal_topic_prefix; + EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription;