diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..34c127ddf3 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -218,6 +218,18 @@ export { ## subscriptions and hello messages from other ## nodes. These expirations trigger reporter warnings. const hello_expiration: interval = 10sec &redef; + + ## The topic prefix used for internal ZeroMQ specific communication. + ## + ## This is used for the "ready to publish callback" topics. + ## + ## Zeek creates a short-lived subscription for a auto-generated + ## topic name with this prefix and waits for it to be confirmed + ## on its XPUB socket. Once this happens, the XPUB socket should've + ## also received all other active subscriptions of other nodes in a + ## cluster from the central XPUB/XSUB proxy and therefore can be + ## deemed ready for publish operations. + const internal_topic_prefix = "zeek.zeromq.internal." &redef; } redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index a620c8d82c..077100c000 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -16,6 +16,7 @@ #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" +#include "zeek/ID.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" @@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() { linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); + internal_topic_prefix = + zeek::id::find_const("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString(); proxy_io_threads = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get()); @@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { } } +void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) { + // Setup an ephemeral subscription for a topic produced with the internal + // topic prefix, this backend's node identifier and an incrementing counter. + // When the SubscribeCallback for the subscription is invoked, meaning it + // has become visible on the XPUB socket, call the provided ready callback + // and cancel the subscription by unsubscribing from the topic again. + // + // The heuristic here is that seeing a subscription created by the node itself + // also leads to the XPUB/XSUB proxy having sent all subscriptions from other + // nodes in the cluster. + // + // Without this heuristic, short-lived WebSocket clients may fail to publish + // messages as ZeroMQ implements sender-side subscription filtering and simply + // discards messages to topics for which it hasn't seen any subscriptions yet. + static int ready_topic_counter = 0; + ++ready_topic_counter; + + auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) { + Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message}; + cb(info); + + // Unsubscribe again, we're not actually interested in this topic. + Unsubscribe(topic_prefix); + }; + + std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter); + Subscribe(topic, std::move(scb)); +} } // namespace cluster::zeromq } // namespace zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index f09bfa09db..dae822160b 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -71,6 +71,8 @@ private: bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override; + void DoReadyToPublishCallback(ReadyCallback cb) override; + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint; @@ -84,6 +86,8 @@ private: zeek_uint_t poll_max_messages = 0; zeek_uint_t debug_flags = 0; + std::string internal_topic_prefix; + EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription;