From fa22f91ca48ff7637807a2b6e442388b5cbb674a Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Thu, 9 Jan 2025 20:50:31 +0100 Subject: [PATCH] cluster/zeromq: Fix XSUB threading issues It is not safe to use the same socket from different threads, but the current code used the xsub socket directly from the main thread (to setup subscriptions) and from the internal thread for polling and reading. Leverage the PAIR socket already in use for forwarding publish operations to the internal thread also for subscribe and unsubscribe. The failure mode is/was a bit annoying. Essentially, closing of the context would hang indefinitely in zmq_ctx_term(). --- .../cluster/backend/zeromq/main.zeek | 3 +- src/cluster/backend/zeromq/ZeroMQ.cc | 121 ++++++++++++------ 2 files changed, 86 insertions(+), 38 deletions(-) diff --git a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek index 52e8bff74b..466d778840 100644 --- a/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek +++ b/scripts/policy/frameworks/cluster/backend/zeromq/main.zeek @@ -162,7 +162,8 @@ export { ## Bitmask to enable low-level stderr based debug printing. ## - ## poll debugging: 1 (produce verbose zmq::poll() output) + ## poll: 1 (produce verbose zmq::poll() output) + ## thread: 2 (produce thread related output) ## ## Or values from the above list together and set debug_flags ## to the result. E.g. use 7 to select 4, 2 and 1. Only use this diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index f07785e2a2..d79eb87db1 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -22,6 +22,7 @@ #include "zeek/cluster/Serializer.h" #include "zeek/cluster/backend/zeromq/Plugin.h" #include "zeek/cluster/backend/zeromq/ZeroMQ-Proxy.h" +#include "zeek/util.h" namespace zeek { @@ -36,6 +37,7 @@ namespace cluster::zeromq { enum class DebugFlag : zeek_uint_t { NONE = 0, POLL = 1, + THREAD = 2, }; constexpr DebugFlag operator&(zeek_uint_t x, DebugFlag y) { @@ -68,13 +70,8 @@ void self_thread_fun(void* arg) { ZeroMQBackend::ZeroMQBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : ThreadedBackend(std::move(es), std::move(ls), std::move(ehs)) { - xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); - xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); log_push = zmq::socket_t(ctx, zmq::socket_type::push); - log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); - main_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); - child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); } void ZeroMQBackend::DoInitPostScript() { @@ -97,9 +94,6 @@ void ZeroMQBackend::DoInitPostScript() { event_unsubscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::unsubscription"); event_subscription = zeek::event_registry->Register("Cluster::Backend::ZeroMQ::subscription"); - - main_inproc.bind("inproc://publish-bridge"); - child_inproc.connect("inproc://publish-bridge"); } @@ -109,13 +103,12 @@ void ZeroMQBackend::DoTerminate() { ZEROMQ_DEBUG("Joining self_thread"); if ( self_thread.joinable() ) self_thread.join(); + ZEROMQ_DEBUG("Joined self_thread"); + // Close the sockets that are used from the main thread, + // the remaining sockets are closed by self_thread. log_push.close(); - log_pull.close(); - xsub.close(); - xpub.close(); main_inproc.close(); - child_inproc.close(); ZEROMQ_DEBUG("Closing ctx"); ctx.close(); @@ -130,6 +123,11 @@ void ZeroMQBackend::DoTerminate() { } bool ZeroMQBackend::DoInit() { + xsub = zmq::socket_t(ctx, zmq::socket_type::xsub); + xpub = zmq::socket_t(ctx, zmq::socket_type::xpub); + log_pull = zmq::socket_t(ctx, zmq::socket_type::pull); + child_inproc = zmq::socket_t(ctx, zmq::socket_type::pair); + auto linger_ms = static_cast(zeek::id::find_val("Cluster::Backend::ZeroMQ::linger_ms")->AsInt()); int xpub_nodrop = zeek::id::find_val("Cluster::Backend::ZeroMQ::xpub_nodrop")->AsBool() ? 1 : 0; @@ -218,6 +216,10 @@ bool ZeroMQBackend::DoInit() { // following post might be useful: // // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ + + // Setup connectivity between main and child thread. + main_inproc.bind("inproc://inproc-bridge"); + child_inproc.connect("inproc://inproc-bridge"); self_thread = std::thread(self_thread_fun, this); // After connecting, call ThreadedBackend::DoInit() to register @@ -267,7 +269,7 @@ bool ZeroMQBackend::DoSubscribe(const std::string& topic_prefix) { // Prepend 0x01 byte to indicate subscription to XSUB socket // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x01" + topic_prefix; - xsub.send(zmq::const_buffer(msg.data(), msg.size())); + main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to subscribe to topic %s: %s", topic_prefix.c_str(), err.what()); return false; @@ -282,7 +284,7 @@ bool ZeroMQBackend::DoUnsubscribe(const std::string& topic_prefix) { // Prepend 0x00 byte to indicate subscription to XSUB socket. // This is the XSUB API instead of setsockopt(ZMQ_SUBSCRIBE). std::string msg = "\x00" + topic_prefix; - xsub.send(zmq::const_buffer(msg.data(), msg.size())); + main_inproc.send(zmq::const_buffer(msg.data(), msg.size())); } catch ( zmq::error_t& err ) { zeek::reporter->Error("Failed to unsubscribe from topic %s: %s", topic_prefix.c_str(), err.what()); return false; @@ -340,6 +342,9 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he } void ZeroMQBackend::Run() { + util::detail::set_thread_name(zeek::util::fmt("zmq-%p", this)); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); + using MultipartMessage = std::vector; auto HandleLogMessages = [this](const std::vector& msgs) { @@ -362,29 +367,50 @@ void ZeroMQBackend::Run() { }; auto HandleInprocMessages = [this](std::vector& msgs) { - // Forward messages from the inprocess bridge to xpub. + // Forward messages from the inprocess bridge to XSUB for subscription + // subscription handling (1 part) or XPUB for publishing (4 parts). for ( auto& msg : msgs ) { - assert(msg.size() == 4); + assert(msg.size() == 1 || msg.size() == 4); + if ( msg.size() == 1 ) { + xsub.send(msg[0], zmq::send_flags::none); + } + else { + for ( auto& part : msg ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( part.more() ) + flags = flags | zmq::send_flags::sndmore; - for ( auto& part : msg ) { - zmq::send_flags flags = zmq::send_flags::dontwait; - if ( part.more() ) - flags = flags | zmq::send_flags::sndmore; + zmq::send_result_t result; + int tries = 0; + do { + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; - zmq::send_result_t result; - do { - try { - result = xpub.send(part, flags); - } catch ( zmq::error_t& err ) { - // XXX: Not sure if the return false is so great here. - // - // Also, if we fail to publish, should we block rather - // than discard? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish: %s (%d)", err.what(), err.num()); - break; - } - // EAGAIN returns empty result, means try again! - } while ( ! result ); + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The + // socket reached its high water mark and we should + // relax / backoff a bit. Otherwise we'll be spinning + // unproductively very fast here. Note that this is going + // to build up backpressure and eventually inproc.send() + // will block from the main thread. + if ( ! result ) { + ++tries; + auto sleep_for = std::min(tries * 10, 500); + ZEROMQ_THREAD_PRINTF( + "xpub: Failed forward inproc to xpub! Overloaded? (tries=%d sleeping %d ms)\n", tries, + sleep_for); + + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_for)); + } + } while ( ! result ); + } } } }; @@ -449,6 +475,16 @@ void ZeroMQBackend::Run() { QueueForProcessing(std::move(qmsgs)); }; + // Helper class running at destruction. + class Deferred { + public: + Deferred(std::function deferred) : closer(std::move(deferred)) {} + ~Deferred() { closer(); } + + private: + std::function closer; + }; + struct SocketInfo { zmq::socket_ref socket; std::string name; @@ -462,6 +498,15 @@ void ZeroMQBackend::Run() { {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, }; + // Called when Run() terminates. + auto deferred_close = Deferred([this]() { + child_inproc.close(); + xpub.close(); + xsub.close(); + log_pull.close(); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread sockets closed (%p)\n", this); + }); + std::vector poll_items(sockets.size()); while ( true ) { @@ -530,10 +575,12 @@ void ZeroMQBackend::Run() { rcv_messages[i].pop_back(); } } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + if ( err.num() != ETERM ) + throw; - throw; + // Shutdown. + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread terminating (%p)\n", this); + break; } // At this point, we've received anything that was readable from the sockets.