Merge remote-tracking branch 'origin/topic/awelzel/backend-ready-callback-logic'

* origin/topic/awelzel/backend-ready-callback-logic:
  btest/cluster/websocket: Move no-subscriptions test
  cluster/websocket: Leverage ReadyToPublishCallback()
  cluster/zeromq: Implement DoReadyToPublishCallback()
  cluster/Backend: Add ReadyToPublishCallback() API
This commit is contained in:
Arne Welzel 2025-04-25 12:03:44 +02:00
commit a852ecf913
19 changed files with 298 additions and 25 deletions

25
CHANGES
View file

@ -1,3 +1,28 @@
7.2.0-dev.632 | 2025-04-25 12:03:44 +0200
* btest/cluster/websocket: Move no-subscriptions test (Arne Welzel, Corelight)
...and also add one for broker.
* cluster/websocket: Leverage ReadyToPublishCallback() (Arne Welzel, Corelight)
Change WebSocket client handling to return only when the ready to
publish callback has been invoked.
* cluster/zeromq: Implement DoReadyToPublishCallback() (Arne Welzel, Corelight)
The ZeroMQ heuristic for "ready to publish" is to create an unique and
ephemeral subscription using the XSUB socket and observe it arrive on the
XPUB socket. At this point, visibility into other node's subscriptions
is provided.
* cluster/Backend: Add ReadyToPublishCallback() API (Arne Welzel, Corelight)
Provide a mechanism to allow a cluster backend report when it is ready
for publish operations. This is primarily useful for ZeroMQ which has
sender-side filtering and is only really ready for publishing when it
has learned about subscriptions from other nodes.
7.2.0-dev.627 | 2025-04-25 09:03:01 +0200
* broker/WebSocketShim/tests: Comment out two endpoint tests (Arne Welzel, Corelight)

View file

@ -1 +1 @@
7.2.0-dev.627
7.2.0-dev.632

View file

@ -218,6 +218,18 @@ export {
## subscriptions and hello messages from other
## nodes. These expirations trigger reporter warnings.
const hello_expiration: interval = 10sec &redef;
## The topic prefix used for internal ZeroMQ specific communication.
##
## This is used for the "ready to publish callback" topics.
##
## Zeek creates a short-lived subscription for a auto-generated
## topic name with this prefix and waits for it to be confirmed
## on its XPUB socket. Once this happens, the XPUB socket should've
## also received all other active subscriptions of other nodes in a
## cluster from the central XPUB/XSUB proxy and therefore can be
## deemed ready for publish operations.
const internal_topic_prefix = "zeek.zeromq.internal." &redef;
}
redef Cluster::backend = Cluster::CLUSTER_BACKEND_ZEROMQ;

View file

@ -111,6 +111,11 @@ std::optional<detail::Event> Backend::MakeClusterEvent(FuncValPtr handler, ArgsS
return zeek::cluster::detail::Event{eh, std::move(*checked_args), timestamp};
}
void Backend::DoReadyToPublishCallback(Backend::ReadyCallback cb) {
Backend::ReadyCallbackInfo info{Backend::CallbackStatus::Success};
cb(info);
}
// Default implementation doing the serialization.
bool Backend::DoPublishEvent(const std::string& topic, cluster::detail::Event& event) {
byte_buffer buf;

View file

@ -269,6 +269,31 @@ public:
*/
bool Unsubscribe(const std::string& topic_prefix) { return DoUnsubscribe(topic_prefix); }
/**
* Information passed to a ready callback.
*/
using ReadyCallbackInfo = SubscriptionCallbackInfo;
using ReadyCallback = std::function<void(const ReadyCallbackInfo& info)>;
/**
* Register a "ready to publish" callback.
*
* Some cluster backend implementations may not be immediately ready for
* publish operations. For example, ZeroMQ has sender-side subscription
* filtering and discards messages until the XPUB socket learns about
* subscriptions in a cluster.
*
* The callback mechanism allows backends to notify the caller that it
* has now determined readiness for publish operations.
*
* Callers should be prepared that \a cb is invoked immediately as that
* is the default implementation for DoReadyToPublishCallback().
*
* @param cb The callback to invoke when the backend is ready for publish operations.
*/
void ReadyToPublishCallback(ReadyCallback cb) { DoReadyToPublishCallback(std::move(cb)); }
/**
* Publish multiple log records.
*
@ -450,6 +475,13 @@ private:
*/
virtual bool DoUnsubscribe(const std::string& topic_prefix) = 0;
/**
* Register a "ready to publish" callback.
*
* @param cb The callback to invoke when the backend is ready for publish operations.
*/
virtual void DoReadyToPublishCallback(ReadyCallback cb);
/**
* Serialize a log batch, then forward it to DoPublishLogWrites() below.

View file

@ -16,6 +16,7 @@
#include "zeek/DebugLogger.h"
#include "zeek/EventHandler.h"
#include "zeek/EventRegistry.h"
#include "zeek/ID.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/Reporter.h"
#include "zeek/Val.h"
@ -103,6 +104,8 @@ void ZeroMQBackend::DoInitPostScript() {
linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
poll_max_messages = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::poll_max_messages")->Get();
debug_flags = zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::debug_flags")->Get();
internal_topic_prefix =
zeek::id::find_const<zeek::StringVal>("Cluster::Backend::ZeroMQ::internal_topic_prefix")->ToStdString();
proxy_io_threads =
static_cast<int>(zeek::id::find_val<zeek::CountVal>("Cluster::Backend::ZeroMQ::proxy_io_threads")->Get());
@ -723,6 +726,34 @@ bool ZeroMQBackend::DoProcessBackendMessage(int tag, byte_buffer_span payload) {
}
}
void ZeroMQBackend::DoReadyToPublishCallback(ReadyCallback cb) {
// Setup an ephemeral subscription for a topic produced with the internal
// topic prefix, this backend's node identifier and an incrementing counter.
// When the SubscribeCallback for the subscription is invoked, meaning it
// has become visible on the XPUB socket, call the provided ready callback
// and cancel the subscription by unsubscribing from the topic again.
//
// The heuristic here is that seeing a subscription created by the node itself
// also leads to the XPUB/XSUB proxy having sent all subscriptions from other
// nodes in the cluster.
//
// Without this heuristic, short-lived WebSocket clients may fail to publish
// messages as ZeroMQ implements sender-side subscription filtering and simply
// discards messages to topics for which it hasn't seen any subscriptions yet.
static int ready_topic_counter = 0;
++ready_topic_counter;
auto scb = [this, cb = std::move(cb)](const std::string& topic_prefix, const SubscriptionCallbackInfo& sinfo) {
Backend::ReadyCallbackInfo info{sinfo.status, sinfo.message};
cb(info);
// Unsubscribe again, we're not actually interested in this topic.
Unsubscribe(topic_prefix);
};
std::string topic = util::fmt("%s%s.%d.", internal_topic_prefix.c_str(), NodeId().c_str(), ready_topic_counter);
Subscribe(topic, std::move(scb));
}
} // namespace cluster::zeromq
} // namespace zeek

View file

@ -71,6 +71,8 @@ private:
bool DoProcessBackendMessage(int tag, byte_buffer_span payload) override;
void DoReadyToPublishCallback(ReadyCallback cb) override;
// Script level variables.
std::string connect_xsub_endpoint;
std::string connect_xpub_endpoint;
@ -84,6 +86,8 @@ private:
zeek_uint_t poll_max_messages = 0;
zeek_uint_t debug_flags = 0;
std::string internal_topic_prefix;
EventHandlerPtr event_subscription;
EventHandlerPtr event_unsubscription;

View file

@ -372,6 +372,30 @@ void WebSocketEventDispatcher::Process(const WebSocketSubscribeFinished& fin) {
return;
}
if ( ! entry.ready_to_publish ) {
// Still waiting for the backend to be ready.
return;
}
HandleSubscriptionsActive(entry);
}
void WebSocketEventDispatcher::Process(const WebSocketBackendReadyToPublish& ready) {
const auto& it = clients.find(ready.id);
if ( it == clients.end() ) {
reporter->Error("Backend ready from non-existing WebSocket client with id %s!", ready.id.c_str());
return;
}
auto& entry = it->second;
entry.ready_to_publish = true;
if ( ! entry.wsc->AllSubscriptionsActive() ) {
// More subscriptions to come!
return;
}
HandleSubscriptionsActive(entry);
}
@ -397,14 +421,6 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
entry.wsc->SetSubscriptions(subscriptions);
// Short-circuit setting up subscriptions and directly reply with
// an ack if the client didn't request any topic subscriptions.
if ( subscriptions.empty() ) {
assert(entry.wsc->AllSubscriptionsActive());
HandleSubscriptionsActive(entry);
return;
}
auto cb = [this, id = entry.id, wsc = entry.wsc](const std::string& topic,
const Backend::SubscriptionCallbackInfo& info) {
if ( info.status == Backend::CallbackStatus::Error ) {
@ -424,6 +440,13 @@ void WebSocketEventDispatcher::HandleSubscriptions(WebSocketClientEntry& entry,
QueueReply(WebSocketCloseReply{entry.wsc, 1011, "Could not subscribe. Something bad happened!"});
}
}
// Register a callback to be invoked when the backend is ready for publishing.
entry.backend->ReadyToPublishCallback([this, id = entry.id](const auto& info) {
// Ready callbacks are supposed to run on the main thread,
// so we can just start processing a WebSocketBackendReady.
Process(WebSocketBackendReadyToPublish{id});
});
}
void WebSocketEventDispatcher::HandleSubscriptionsActive(const WebSocketClientEntry& entry) {

View file

@ -145,7 +145,13 @@ struct WebSocketSubscribeFinished {
std::string topic_prefix;
};
using WebSocketEvent = std::variant<WebSocketOpen, WebSocketSubscribeFinished, WebSocketClose, WebSocketMessage>;
// Internally created when the backend of a Websocket client is ready.
struct WebSocketBackendReadyToPublish {
std::string id;
};
using WebSocketEvent = std::variant<WebSocketOpen, WebSocketSubscribeFinished, WebSocketClose, WebSocketMessage,
WebSocketBackendReadyToPublish>;
struct WebSocketSendReply {
std::shared_ptr<WebSocketClient> wsc;
@ -211,6 +217,7 @@ private:
void Process(const WebSocketOpen& open);
void Process(const WebSocketSubscribeFinished& fin);
void Process(const WebSocketBackendReadyToPublish& ready);
void Process(const WebSocketMessage& msg);
void Process(const WebSocketClose& close);
@ -222,6 +229,7 @@ private:
std::string id;
std::shared_ptr<WebSocketClient> wsc;
std::shared_ptr<zeek::cluster::Backend> backend;
bool ready_to_publish = false;
uint64_t msg_count = 0;
std::list<WebSocketMessage> queue;
};

View file

@ -0,0 +1,33 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got ping, 1, 1
got ping, 2, 2
got ping, 3, 3
got ping, 4, 4
got ping, 5, 5
got ping, 6, 6
got ping, 7, 7
got ping, 8, 8
got ping, 9, 9
got ping, 10, 10
got ping, 11, 11
got ping, 12, 12
got ping, 13, 13
got ping, 14, 14
got ping, 15, 15
got ping, 16, 16
got ping, 17, 17
got ping, 18, 18
got ping, 19, 19
got ping, 20, 20
got ping, 21, 21
got ping, 22, 22
got ping, 23, 23
got ping, 24, 24
got ping, 25, 25
got ping, 26, 26
got ping, 27, 27
got ping, 28, 28
got ping, 29, 29
got ping, 30, 30
got ping, 31, 31
got ping, 32, 32

View file

@ -1,3 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Cluster::websocket_client_added, []
got ping, 42

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,2 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
received termination signal

View file

@ -0,0 +1,33 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
got ping, 1, 1
got ping, 2, 2
got ping, 3, 3
got ping, 4, 4
got ping, 5, 5
got ping, 6, 6
got ping, 7, 7
got ping, 8, 8
got ping, 9, 9
got ping, 10, 10
got ping, 11, 11
got ping, 12, 12
got ping, 13, 13
got ping, 14, 14
got ping, 15, 15
got ping, 16, 16
got ping, 17, 17
got ping, 18, 18
got ping, 19, 19
got ping, 20, 20
got ping, 21, 21
got ping, 22, 22
got ping, 23, 23
got ping, 24, 24
got ping, 25, 25
got ping, 26, 26
got ping, 27, 27
got ping, 28, 28
got ping, 29, 29
got ping, 30, 30
got ping, 31, 31
got ping, 32, 32

View file

@ -0,0 +1,66 @@
# @TEST-DOC: Test that publishing events to a WebSocket client's auto topic works.
#
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-PORT: BROKER_PORT1
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
# @TEST-EXEC: btest-bg-run client "python3 ../client.py"
#
# @TEST-EXEC: btest-bg-wait 30
# @TEST-EXEC: btest-diff ./manager/.stdout
# @TEST-EXEC: btest-diff ./manager/.stderr
# @TEST-EXEC: btest-diff ./client/.stdout
# @TEST-EXEC: btest-diff ./client/.stderr
# @TEST-START-FILE cluster-layout.zeek
redef Cluster::nodes = {
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
};
# @TEST-END-FILE
#
# @TEST-START-FILE manager.zeek
redef exit_only_after_terminate = T;
redef Log::enable_local_logging = T;
redef Log::default_rotation_interval = 0sec;
redef Broker::disable_ssl = T;
event zeek_init()
{
Cluster::subscribe("/test/pings");
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
global ping_count = 0;
const ping_count_expected = 32;
event ping(c: count) &is_used
{
++ping_count;
print "got ping", c, ping_count;
if ( ping_count == ping_count_expected )
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import wstest
def run(ws_url):
for i in range(32):
with wstest.connect("ws1", ws_url) as tc:
tc.send_json([]) # Send no subscriptions
ack = tc.recv_json()
assert ack.get("type") == "ack", f"{ack}"
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1]))
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE

View file

@ -36,28 +36,28 @@ event zeek_init()
Cluster::listen_websocket([$listen_host="127.0.0.1", $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
global ping_count = 0;
const ping_count_expected = 32;
event ping(c: count) &is_used
{
print "got ping", c;
++ping_count;
print "got ping", c, ping_count;
if ( ping_count == ping_count_expected )
terminate();
}
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
print "Cluster::websocket_client_added", subscriptions;
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import wstest
def run(ws_url):
for i in range(32):
with wstest.connect("ws1", ws_url) as tc:
tc.send_json([]) # Send no subscriptions
ack = tc.recv_json()
assert ack.get("type") == "ack", f"{ack}"
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [42]))
tc.send_json(wstest.build_event_v1("/test/pings/", "ping", [i + 1]))
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)