diff --git a/CHANGES b/CHANGES index f53c8b3724..b44b63c57e 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,28 @@ +7.2.0-dev.632 | 2025-04-25 12:03:44 +0200 + + * btest/cluster/websocket: Move no-subscriptions test (Arne Welzel, Corelight) + + ...and also add one for broker. + + * cluster/websocket: Leverage ReadyToPublishCallback() (Arne Welzel, Corelight) + + Change WebSocket client handling to return only when the ready to + publish callback has been invoked. + + * cluster/zeromq: Implement DoReadyToPublishCallback() (Arne Welzel, Corelight) + + The ZeroMQ heuristic for "ready to publish" is to create an unique and + ephemeral subscription using the XSUB socket and observe it arrive on the + XPUB socket. At this point, visibility into other node's subscriptions + is provided. + + * cluster/Backend: Add ReadyToPublishCallback() API (Arne Welzel, Corelight) + + Provide a mechanism to allow a cluster backend report when it is ready + for publish operations. This is primarily useful for ZeroMQ which has + sender-side filtering and is only really ready for publishing when it + has learned about subscriptions from other nodes. + 7.2.0-dev.627 | 2025-04-25 09:03:01 +0200 * broker/WebSocketShim/tests: Comment out two endpoint tests (Arne Welzel, Corelight) diff --git a/VERSION b/VERSION index 21abd9670c..9bfb0cad32 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.2.0-dev.627 +7.2.0-dev.632 diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index c34608d437..34c127ddf3 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -218,6 +218,18 @@ export { ## subscriptions and hello messages from other ## nodes. These expirations trigger reporter warnings. const hello_expiration: interval = 10sec &redef; + + ## The topic prefix used for internal ZeroMQ specific communication. + ## + ## This is used for the "ready to publish callback" topics. + ## + ## Zeek creates a short-lived subscription for a auto-generated + ## topic name with this prefix and waits for it to be confirmed + ## on its XPUB socket. Once this happens, the XPUB socket should've + ## also received all other active subscriptions of other nodes in a + ## cluster from the central XPUB/XSUB proxy and therefore can be + ## deemed ready for publish operations. + const internal_topic_prefix = "zeek.zeromq.internal." &redef; } redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ; diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 575d0aeaf7..cbadf97950 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -111,6 +111,11 @@ std::optional Backend::MakeClusterEvent(FuncValPtr handler, ArgsS return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp}; } +void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) { + Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success}; + cb(info); +} + // Default implementation doing the serialization. bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) { byte_buffer buf; diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 1b09fb4c88..b8a5a11143 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -269,6 +269,31 @@ public: */ bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); } + /** + * Information passed to a ready callback. + */ + using ReadyCallbackInfo = SubscriptionCallbackInfo; + + using ReadyCallback = std::function; + + /** + * Register a "ready to publish" callback. + * + * Some cluster backend implementations may not be immediately ready for + * publish operations. For example, ZeroMQ has sender-side subscription + * filtering and discards messages until the XPUB socket learns about + * subscriptions in a cluster. + * + * The callback mechanism allows backends to notify the caller that it + * has now determined readiness for publish operations. + * + * Callers should be prepared that \a cb is invoked immediately as that + * is the default implementation for DoReadyToPublishCallback(). + * + * @param cb The callback to invoke when the backend is ready for publish operations. + */ + void ReadyToPublishCallback(ReadyCallback cb) { DoReadyToPublishCallback(std::move(cb)); } + /** * Publish multiple log records. * @@ -450,6 +475,13 @@ private: */ virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0; + /** + * Register a "ready to publish" callback. + * + * @param cb The callback to invoke when the backend is ready for publish operations. + */ + virtual void DoReadyToPublishCallback(ReadyCallback cb); + /** * Serialize a log batch, then forward it to DoPublishLogWrites() below. diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index a620c8d82c..077100c000 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -16,6 +16,7 @@ #include "zeek/DebugLogger.h" #include "zeek/EventHandler.h" #include "zeek/EventRegistry.h" +#include "zeek/ID.h" #include "zeek/IntrusivePtr.h" #include "zeek/Reporter.h" #include "zeek/Val.h" @@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() { linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); poll_max_messages = zeek::id::find_val("Cluster::Backend::ZeroMQ::poll_max_messages")->Get(); debug_flags = zeek::id::find_val("Cluster::Backend::ZeroMQ::debug_flags")->Get(); + internal_topic_prefix = + zeek::id::find_const("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString(); proxy_io_threads = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get()); @@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) { } } +void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) { + // Setup an ephemeral subscription for a topic produced with the internal + // topic prefix, this backend's node identifier and an incrementing counter. + // When the SubscribeCallback for the subscription is invoked, meaning it + // has become visible on the XPUB socket, call the provided ready callback + // and cancel the subscription by unsubscribing from the topic again. + // + // The heuristic here is that seeing a subscription created by the node itself + // also leads to the XPUB/XSUB proxy having sent all subscriptions from other + // nodes in the cluster. + // + // Without this heuristic, short-lived WebSocket clients may fail to publish + // messages as ZeroMQ implements sender-side subscription filtering and simply + // discards messages to topics for which it hasn't seen any subscriptions yet. + static int ready_topic_counter = 0; + ++ready_topic_counter; + + auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) { + Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message}; + cb(info); + + // Unsubscribe again, we're not actually interested in this topic. + Unsubscribe(topic_prefix); + }; + + std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter); + Subscribe(topic, std::move(scb)); +} } // namespace cluster::zeromq } // namespace zeek diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index f09bfa09db..dae822160b 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -71,6 +71,8 @@ private: bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override; + void DoReadyToPublishCallback(ReadyCallback cb) override; + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint; @@ -84,6 +86,8 @@ private: zeek_uint_t poll_max_messages = 0; zeek_uint_t debug_flags = 0; + std::string internal_topic_prefix; + EventHandlerPtr event_subscription; EventHandlerPtr event_unsubscription; diff --git a/src/cluster/websocket/WebSocket.cc b/src/cluster/websocket/WebSocket.cc index 1f2610a99b..8ca19a91de 100644 --- a/src/cluster/websocket/WebSocket.cc +++ b/src/cluster/websocket/WebSocket.cc @@ -372,6 +372,30 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) { return; } + if ( ! entry.ready_to_publish ) { + // Still waiting for the backend to be ready. + return; + } + + HandleSubscriptionsActive(entry); +} + +void WebSocketEventDispatcher::Process(const WebSocketBackendReadyToPublish& ready) { + const auto& it = clients.find(ready.id); + if ( it == clients.end() ) { + reporter->Error("Backend ready from non-existing WebSocket client with id %s!", ready.id.c_str()); + return; + } + + auto& entry = it->second; + + entry.ready_to_publish = true; + + if ( ! entry.wsc->AllSubscriptionsActive() ) { + // More subscriptions to come! + return; + } + HandleSubscriptionsActive(entry); } @@ -397,14 +421,6 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, entry.wsc->SetSubscriptions(subscriptions); - // Short-circuit setting up subscriptions and directly reply with - // an ack if the client didn't request any topic subscriptions. - if ( subscriptions.empty() ) { - assert(entry.wsc->AllSubscriptionsActive()); - HandleSubscriptionsActive(entry); - return; - } - auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic, const Backend::SubscriptionCallbackInfo& info) { if ( info.status == Backend::CallbackStatus::Error ) { @@ -424,6 +440,13 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry, QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"}); } } + + // Register a callback to be invoked when the backend is ready for publishing. + entry.backend->ReadyToPublishCallback([this, id = entry.id](const auto& info) { + // Ready callbacks are supposed to run on the main thread, + // so we can just start processing a WebSocketBackendReady. + Process(WebSocketBackendReadyToPublish{id}); + }); } void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) { diff --git a/src/cluster/websocket/WebSocket.h b/src/cluster/websocket/WebSocket.h index 4dd0965372..296c9a731d 100644 --- a/src/cluster/websocket/WebSocket.h +++ b/src/cluster/websocket/WebSocket.h @@ -145,7 +145,13 @@ struct WebSocketSubscribeFinished { std::string topic_prefix; }; -using WebSocketEvent = std::variant; +// Internally created when the backend of a Websocket client is ready. +struct WebSocketBackendReadyToPublish { + std::string id; +}; + +using WebSocketEvent = std::variant; struct WebSocketSendReply { std::shared_ptr wsc; @@ -211,6 +217,7 @@ private: void Process(const WebSocketOpen& open); void Process(const WebSocketSubscribeFinished& fin); + void Process(const WebSocketBackendReadyToPublish& ready); void Process(const WebSocketMessage& msg); void Process(const WebSocketClose& close); @@ -222,6 +229,7 @@ private: std::string id; std::shared_ptr wsc; std::shared_ptr backend; + bool ready_to_publish = false; uint64_t msg_count = 0; std::list queue; }; diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stderr rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stderr diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stdout similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..client..stdout rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..client..stdout diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stderr similarity index 100% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stderr rename to testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stderr diff --git a/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..944dde8125 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.broker.no-subscriptions/..manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 1, 1 +got ping, 2, 2 +got ping, 3, 3 +got ping, 4, 4 +got ping, 5, 5 +got ping, 6, 6 +got ping, 7, 7 +got ping, 8, 8 +got ping, 9, 9 +got ping, 10, 10 +got ping, 11, 11 +got ping, 12, 12 +got ping, 13, 13 +got ping, 14, 14 +got ping, 15, 15 +got ping, 16, 16 +got ping, 17, 17 +got ping, 18, 18 +got ping, 19, 19 +got ping, 20, 20 +got ping, 21, 21 +got ping, 22, 22 +got ping, 23, 23 +got ping, 24, 24 +got ping, 25, 25 +got ping, 26, 26 +got ping, 27, 27 +got ping, 28, 28 +got ping, 29, 29 +got ping, 30, 30 +got ping, 31, 31 +got ping, 32, 32 diff --git a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr similarity index 70% rename from testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout rename to testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr index bb622d5ce2..49d861c74c 100644 --- a/testing/btest/Baseline/cluster.websocket.no-subscriptions/..manager..stdout +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stderr @@ -1,3 +1 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. -Cluster::websocket_client_added, [] -got ping, 42 diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout new file mode 100644 index 0000000000..49d861c74c --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..client..stdout @@ -0,0 +1 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr new file mode 100644 index 0000000000..e3f6131b1d --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stderr @@ -0,0 +1,2 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +received termination signal diff --git a/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout new file mode 100644 index 0000000000..944dde8125 --- /dev/null +++ b/testing/btest/Baseline/cluster.websocket.zeromq.no-subscriptions/..manager..stdout @@ -0,0 +1,33 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +got ping, 1, 1 +got ping, 2, 2 +got ping, 3, 3 +got ping, 4, 4 +got ping, 5, 5 +got ping, 6, 6 +got ping, 7, 7 +got ping, 8, 8 +got ping, 9, 9 +got ping, 10, 10 +got ping, 11, 11 +got ping, 12, 12 +got ping, 13, 13 +got ping, 14, 14 +got ping, 15, 15 +got ping, 16, 16 +got ping, 17, 17 +got ping, 18, 18 +got ping, 19, 19 +got ping, 20, 20 +got ping, 21, 21 +got ping, 22, 22 +got ping, 23, 23 +got ping, 24, 24 +got ping, 25, 25 +got ping, 26, 26 +got ping, 27, 27 +got ping, 28, 28 +got ping, 29, 29 +got ping, 30, 30 +got ping, 31, 31 +got ping, 32, 32 diff --git a/testing/btest/cluster/websocket/broker/no-subscriptions.zeek b/testing/btest/cluster/websocket/broker/no-subscriptions.zeek new file mode 100644 index 0000000000..968e5b40bc --- /dev/null +++ b/testing/btest/cluster/websocket/broker/no-subscriptions.zeek @@ -0,0 +1,66 @@ +# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works. +# +# @TEST-REQUIRES: python3 -c 'import websockets.sync' +# +# @TEST-PORT: BROKER_PORT1 +# @TEST-PORT: WEBSOCKET_PORT +# +# @TEST-EXEC: cp $FILES/ws/wstest.py . +# +# @TEST-EXEC: zeek -b --parse-only manager.zeek +# @TEST-EXEC: python3 -m py_compile client.py +# +# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek" +# @TEST-EXEC: btest-bg-run client "python3 ../client.py" +# +# @TEST-EXEC: btest-bg-wait 30 +# @TEST-EXEC: btest-diff ./manager/.stdout +# @TEST-EXEC: btest-diff ./manager/.stderr +# @TEST-EXEC: btest-diff ./client/.stdout +# @TEST-EXEC: btest-diff ./client/.stderr + +# @TEST-START-FILE cluster-layout.zeek +redef Cluster::nodes = { + ["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))], +}; +# @TEST-END-FILE +# +# @TEST-START-FILE manager.zeek +redef exit_only_after_terminate = T; + +redef Log::enable_local_logging = T; +redef Log::default_rotation_interval = 0sec; +redef Broker::disable_ssl = T; + +event zeek_init() + { + Cluster::subscribe("/test/pings"); + Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); + } + +global ping_count = 0; +const ping_count_expected = 32; + +event ping(c: count) &is_used + { + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); + } +# @TEST-END-FILE + +# @TEST-START-FILE client.py +import wstest + +def run(ws_url): + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1])) + +if __name__ == "__main__": + wstest.main(run, wstest.WS4_URL_V1) +# @TEST-END-FILE diff --git a/testing/btest/cluster/websocket/no-subscriptions.zeek b/testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek similarity index 76% rename from testing/btest/cluster/websocket/no-subscriptions.zeek rename to testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek index 09701e5fc9..305b0919eb 100644 --- a/testing/btest/cluster/websocket/no-subscriptions.zeek +++ b/testing/btest/cluster/websocket/zeromq/no-subscriptions.zeek @@ -36,28 +36,28 @@ event zeek_init() Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]); } +global ping_count = 0; +const ping_count_expected = 32; + event ping(c: count) &is_used { - print "got ping", c; - terminate(); - } - -event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec) - { - print "Cluster::websocket_client_added", subscriptions; + ++ping_count; + print "got ping", c, ping_count; + if ( ping_count == ping_count_expected ) + terminate(); } # @TEST-END-FILE - # @TEST-START-FILE client.py import wstest def run(ws_url): - with wstest.connect("ws1", ws_url) as tc: - tc.send_json([]) # Send no subscriptions - ack = tc.recv_json() - assert ack.get("type") == "ack", f"{ack}" - tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42])) + for i in range(32): + with wstest.connect("ws1", ws_url) as tc: + tc.send_json([]) # Send no subscriptions + ack = tc.recv_json() + assert ack.get("type") == "ack", f"{ack}" + tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1])) if __name__ == "__main__": wstest.main(run, wstest.WS4_URL_V1)