cluster/zeromq: Implement DoReadyToPublishCallback()

The ZeroMQ heuristic for "ready to publish" is to create an unique and
ephemeral subscription using the XSUB socket and observe it arrive on the
XPUB socket. At this point, visibility into other node's subscriptions
is provided.
This commit is contained in:
Arne Welzel 2025-04-24 16:20:07 +02:00
parent e7a876da35
commit 643b926625
3 changed files with 47 additions and 0 deletions

View file

@ -218,6 +218,18 @@ export {
## subscriptions and hello messages from other
## nodes. These expirations trigger reporter warnings.
const hello_expiration: interval = 10sec &redef;
## The topic prefix used for internal ZeroMQ specific communication.
##
## This is used for the "ready to publish callback" topics.
##
## Zeek creates a short-lived subscription for a auto-generated
## topic name with this prefix and waits for it to be confirmed
## on its XPUB socket. Once this happens, the XPUB socket should've
## also received all other active subscriptions of other nodes in a
## cluster from the central XPUB/XSUB proxy and therefore can be
## deemed ready for publish operations.
const internal_topic_prefix = "zeek.zeromq.internal." &redef;
}
redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;

View file

@ -16,6 +16,7 @@
#include "zeek/DebugLogger.h"
#include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h"
#include "zeek/ID.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/Val.h"
@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() {
linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
poll_max_messages = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::poll_max_messages")->Get();
debug_flags = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::debug_flags")->Get();
internal_topic_prefix =
zeek::id::find_const<zeek::StringVal>("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString();
proxy_io_threads =
static_cast<int>(zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get());
@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) {
}
}
void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) {
// Setup an ephemeral subscription for a topic produced with the internal
// topic prefix, this backend's node identifier and an incrementing counter.
// When the SubscribeCallback for the subscription is invoked, meaning it
// has become visible on the XPUB socket, call the provided ready callback
// and cancel the subscription by unsubscribing from the topic again.
//
// The heuristic here is that seeing a subscription created by the node itself
// also leads to the XPUB/XSUB proxy having sent all subscriptions from other
// nodes in the cluster.
//
// Without this heuristic, short-lived WebSocket clients may fail to publish
// messages as ZeroMQ implements sender-side subscription filtering and simply
// discards messages to topics for which it hasn't seen any subscriptions yet.
static int ready_topic_counter = 0;
++ready_topic_counter;
auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) {
Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message};
cb(info);
// Unsubscribe again, we're not actually interested in this topic.
Unsubscribe(topic_prefix);
};
std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter);
Subscribe(topic, std::move(scb));
}
} // namespace cluster::zeromq
} // namespace zeek

View file

@ -71,6 +71,8 @@ private:
bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override;
void DoReadyToPublishCallback(ReadyCallback cb) override;
// Script level variables.
std::string connect_xsub_endpoint;
std::string connect_xpub_endpoint;
@ -84,6 +86,8 @@ private:
zeek_uint_t poll_max_messages = 0;
zeek_uint_t debug_flags = 0;
std::string internal_topic_prefix;
EventHandlerPtr event_subscription;
EventHandlerPtr event_unsubscription;