cluster/zeromq: Attempt publish during termination

Explicitly notify the internal thread about the shutdown via the
inproc socket pair. This ensures that the internal thread processes
all previous messages on the inproc socket before terminating.

This fixes the scenario where a backend is created, a few messages published
and then immediately terminated as can be done with WebSocket clients.
Previously, some of the messages published might have still been in the
inproc socket's queue and were simply discarded.

Adds the same test for Broker and ZeroMQ backends.
This commit is contained in:
Arne Welzel 2025-04-16 14:08:06 +02:00
parent ab25e5d24b
commit 6bd624d9b2
12 changed files with 410 additions and 11 deletions

View file

@ -44,6 +44,11 @@ enum class DebugFlag : zeek_uint_t {
THREAD = 2,
};
enum class InprocTag : uint8_t {
XsubUpdate,
Terminate,
};
constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
return static_cast<DebugFlag>(x & static_cast<zeek_uint_t>(y));
}
@ -112,15 +117,28 @@ void ZeroMQBackend::DoInitPostScript() {
void ZeroMQBackend::DoTerminate() {
ThreadedBackend::DoTerminate();
// If self_thread is running, notify it to shutdown via the inproc
// socket, then wait for it to terminate.
if ( self_thread.joinable() && ! self_thread_shutdown_requested ) {
ZEROMQ_DEBUG("Sending terminate request via inproc socket");
auto tag = InprocTag::Terminate;
main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore);
main_inproc.send(zmq::const_buffer("", 0));
self_thread_shutdown_requested = true;
ZEROMQ_DEBUG("Joining self_thread");
if ( self_thread.joinable() )
self_thread.join();
ZEROMQ_DEBUG("Joined self_thread");
}
ZEROMQ_DEBUG("Shutting down ctx");
ctx.shutdown();
ZEROMQ_DEBUG("Joining self_thread");
if ( self_thread.joinable() )
self_thread.join();
ZEROMQ_DEBUG("Joined self_thread");
// Close the sockets that are used from the main thread,
// the remaining sockets are closed by self_thread.
// the remaining sockets were closed by self_thread during
// shutdown already.
log_push.close();
main_inproc.close();
@ -303,6 +321,11 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallba
// Prepend 0x01 byte to indicate subscription to XSUB socket
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x01" + topic_prefix;
// Send two message parts. The first part is a single byte tagging the
// message as a XSUB update. The second part the payload for the XSUB socket.
auto tag = InprocTag::XsubUpdate;
main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore);
main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( const zmq::error_t& err ) {
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
@ -322,9 +345,14 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix, SubscribeCallba
bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) {
ZEROMQ_DEBUG("Unsubscribing %s", topic_prefix.c_str());
try {
// Prepend 0x00 byte to indicate subscription to XSUB socket.
// Prepend 0x00 byte to indicate unsubscription to XSUB socket.
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = '\0' + topic_prefix;
// Send two message parts. The first part is a single byte tagging the
// message as a XSUB update. The second part the payload for the XSUB socket.
auto tag = InprocTag::XsubUpdate;
main_inproc.send(zmq::const_buffer(&tag, 1), zmq::send_flags::sndmore);
main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( const zmq::error_t& err ) {
zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what());
@ -413,11 +441,25 @@ void ZeroMQBackend::Run() {
};
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge to XSUB for subscription
// subscription handling (1 part) or XPUB for publishing (4 parts).
// Forward messages from the inprocess bridge.
//
// Either it's 2 parts (tag and payload) for controlling subscriptions
// or terminating the thread, or it is 4 parts in which case all the parts
// are forwarded to the XPUB socket directly for publishing.
for ( auto& msg : msgs ) {
if ( msg.size() == 1 ) {
xsub.send(msg[0], zmq::send_flags::none);
if ( msg.size() == 2 ) {
InprocTag tag = msg[0].data<InprocTag>()[0];
switch ( tag ) {
case InprocTag::XsubUpdate: {
xsub.send(msg[1], zmq::send_flags::none);
break;
}
case InprocTag::Terminate: {
if ( self_thread_stop )
ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message");
self_thread_stop = true;
}
}
}
else if ( msg.size() == 4 ) {
for ( auto& part : msg ) {
@ -558,7 +600,7 @@ void ZeroMQBackend::Run() {
std::vector<zmq::pollitem_t> poll_items(sockets.size());
while ( true ) {
while ( ! self_thread_stop ) {
for ( size_t i = 0; i < sockets.size(); i++ )
poll_items[i] = {.socket = sockets[i].socket.handle(), .fd = 0, .events = ZMQ_POLLIN | ZMQ_POLLERR};