cluster/zeromq: Fix XSUB threading issues

It is not safe to use the same socket from different threads, but the
current code used the xsub socket directly from the main thread (to setup
subscriptions) and from the internal thread for polling and reading.

Leverage the PAIR socket already in use for forwarding publish operations
to the internal thread also for subscribe and unsubscribe.

The failure mode is/was a bit annoying. Essentially, closing of the
context would hang indefinitely in zmq_ctx_term().
This commit is contained in:
Arne Welzel 2025-01-09 20:50:31 +01:00
parent df78a94c76
commit fa22f91ca4
2 changed files with 86 additions and 38 deletions

View file

@ -162,7 +162,8 @@ export {
## Bitmask to enable low-level stderr based debug printing. ## Bitmask to enable low-level stderr based debug printing.
## ##
## poll debugging: 1 (produce verbose zmq::poll() output) ## poll: 1 (produce verbose zmq::poll() output)
## thread: 2 (produce thread related output)
## ##
## Or values from the above list together and set debug_flags ## Or values from the above list together and set debug_flags
## to the result. E.g. use 7 to select 4, 2 and 1. Only use this ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this

View file

@ -22,6 +22,7 @@
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/Plugin.h"
#include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h"
#include "zeek/util.h"
namespace zeek { namespace zeek {
@ -36,6 +37,7 @@ namespace cluster::zeromq {
enum class DebugFlag : zeek_uint_t { enum class DebugFlag : zeek_uint_t {
NONE = 0, NONE = 0,
POLL = 1, POLL = 1,
THREAD = 2,
}; };
constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) {
@ -68,13 +70,8 @@ void self_thread_fun(void* arg) {
ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls, ZeroMQBackend::ZeroMQBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs) std::unique_ptr<detail::EventHandlingStrategy> ehs)
: ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_push = zmq::socket_t(ctx, zmq::socket_type::push); log_push = zmq::socket_t(ctx, zmq::socket_type::push);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
} }
void ZeroMQBackend::DoInitPostScript() { void ZeroMQBackend::DoInitPostScript() {
@ -97,9 +94,6 @@ void ZeroMQBackend::DoInitPostScript() {
event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription");
event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription");
main_inproc.bind("inproc://publish-bridge");
child_inproc.connect("inproc://publish-bridge");
} }
@ -109,13 +103,12 @@ void ZeroMQBackend::DoTerminate() {
ZEROMQ_DEBUG("Joining self_thread"); ZEROMQ_DEBUG("Joining self_thread");
if ( self_thread.joinable() ) if ( self_thread.joinable() )
self_thread.join(); self_thread.join();
ZEROMQ_DEBUG("Joined self_thread");
// Close the sockets that are used from the main thread,
// the remaining sockets are closed by self_thread.
log_push.close(); log_push.close();
log_pull.close();
xsub.close();
xpub.close();
main_inproc.close(); main_inproc.close();
child_inproc.close();
ZEROMQ_DEBUG("Closing ctx"); ZEROMQ_DEBUG("Closing ctx");
ctx.close(); ctx.close();
@ -130,6 +123,11 @@ void ZeroMQBackend::DoTerminate() {
} }
bool ZeroMQBackend::DoInit() { bool ZeroMQBackend::DoInit() {
xsub = zmq::socket_t(ctx, zmq::socket_type::xsub);
xpub = zmq::socket_t(ctx, zmq::socket_type::xpub);
log_pull = zmq::socket_t(ctx, zmq::socket_type::pull);
child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair);
auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); auto linger_ms = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::linger_ms")->AsInt());
int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; int xpub_nodrop = zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0;
@ -218,6 +216,10 @@ bool ZeroMQBackend::DoInit() {
// following post might be useful: // following post might be useful:
// //
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
// Setup connectivity between main and child thread.
main_inproc.bind("inproc://inproc-bridge");
child_inproc.connect("inproc://inproc-bridge");
self_thread = std::thread(self_thread_fun, this); self_thread = std::thread(self_thread_fun, this);
// After connecting, call ThreadedBackend::DoInit() to register // After connecting, call ThreadedBackend::DoInit() to register
@ -267,7 +269,7 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) {
// Prepend 0x01 byte to indicate subscription to XSUB socket // Prepend 0x01 byte to indicate subscription to XSUB socket
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x01" + topic_prefix; std::string msg = "\x01" + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size())); main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what());
return false; return false;
@ -282,7 +284,7 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) {
// Prepend 0x00 byte to indicate subscription to XSUB socket. // Prepend 0x00 byte to indicate subscription to XSUB socket.
// This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE).
std::string msg = "\x00" + topic_prefix; std::string msg = "\x00" + topic_prefix;
xsub.send(zmq::const_buffer(msg.data(), msg.size())); main_inproc.send(zmq::const_buffer(msg.data(), msg.size()));
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what());
return false; return false;
@ -340,6 +342,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
} }
void ZeroMQBackend::Run() { void ZeroMQBackend::Run() {
util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this));
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
using MultipartMessage = std::vector<zmq::message_t>; using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
@ -362,29 +367,50 @@ void ZeroMQBackend::Run() {
}; };
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) { auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge to xpub. // Forward messages from the inprocess bridge to XSUB for subscription
// subscription handling (1 part) or XPUB for publishing (4 parts).
for ( auto& msg : msgs ) { for ( auto& msg : msgs ) {
assert(msg.size() == 4); assert(msg.size() == 1 || msg.size() == 4);
if ( msg.size() == 1 ) {
xsub.send(msg[0], zmq::send_flags::none);
}
else {
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
for ( auto& part : msg ) { zmq::send_result_t result;
zmq::send_flags flags = zmq::send_flags::dontwait; int tries = 0;
if ( part.more() ) do {
flags = flags | zmq::send_flags::sndmore; try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
zmq::send_result_t result; // XXX: What other error can happen here? How should we react?
do { ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
try { break;
result = xpub.send(part, flags); }
} catch ( zmq::error_t& err ) {
// XXX: Not sure if the return false is so great here. // Empty result means xpub.send() returned EAGAIN. The
// // socket reached its high water mark and we should
// Also, if we fail to publish, should we block rather // relax / backoff a bit. Otherwise we'll be spinning
// than discard? // unproductively very fast here. Note that this is going
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num()); // to build up backpressure and eventually inproc.send()
break; // will block from the main thread.
} if ( ! result ) {
// EAGAIN returns empty result, means try again! ++tries;
} while ( ! result ); auto sleep_for = std::min(tries * 10, 500);
ZEROMQ_THREAD_PRINTF(
"xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries,
sleep_for);
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for));
}
} while ( ! result );
}
} }
} }
}; };
@ -449,6 +475,16 @@ void ZeroMQBackend::Run() {
QueueForProcessing(std::move(qmsgs)); QueueForProcessing(std::move(qmsgs));
}; };
// Helper class running at destruction.
class Deferred {
public:
Deferred(std::function<void()> deferred) : closer(std::move(deferred)) {}
~Deferred() { closer(); }
private:
std::function<void()> closer;
};
struct SocketInfo { struct SocketInfo {
zmq::socket_ref socket; zmq::socket_ref socket;
std::string name; std::string name;
@ -462,6 +498,15 @@ void ZeroMQBackend::Run() {
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
}; };
// Called when Run() terminates.
auto deferred_close = Deferred([this]() {
child_inproc.close();
xpub.close();
xsub.close();
log_pull.close();
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this);
});
std::vector<zmq::pollitem_t> poll_items(sockets.size()); std::vector<zmq::pollitem_t> poll_items(sockets.size());
while ( true ) { while ( true ) {
@ -530,10 +575,12 @@ void ZeroMQBackend::Run() {
rcv_messages[i].pop_back(); rcv_messages[i].pop_back();
} }
} catch ( zmq::error_t& err ) { } catch ( zmq::error_t& err ) {
if ( err.num() == ETERM ) if ( err.num() != ETERM )
return; throw;
throw; // Shutdown.
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this);
break;
} }
// At this point, we've received anything that was readable from the sockets. // At this point, we've received anything that was readable from the sockets.