From 85d5dda028bba10332b96eec3f1d5fce63ce9801 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Mon, 21 Jul 2025 14:45:18 +0200 Subject: [PATCH] cluster/zeromq: Rework lambdas to member functions --- src/cluster/backend/zeromq/ZeroMQ.cc | 258 +++++++++++++-------------- src/cluster/backend/zeromq/ZeroMQ.h | 7 + 2 files changed, 134 insertions(+), 131 deletions(-) diff --git a/src/cluster/backend/zeromq/ZeroMQ.cc b/src/cluster/backend/zeromq/ZeroMQ.cc index 7f42027477..7af2fb41ac 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.cc +++ b/src/cluster/backend/zeromq/ZeroMQ.cc @@ -444,155 +444,151 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he return true; } -void ZeroMQBackend::Run() { - char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> - snprintf(name, sizeof(name), "zmq-%p", this); - util::detail::set_thread_name(name); - ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); - - using MultipartMessage = std::vector; - - auto HandleLogMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - // sender, format, type, payload - if ( msg.size() != 4 ) { - ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); - continue; - } - - byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; - LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), - .payload = std::move(payload)}; - - QueueForProcessing(std::move(lm)); - } - }; - - auto HandleInprocMessages = [this](std::vector& msgs) { - // Forward messages from the inprocess bridge. - // - // Either it's 2 parts (tag and payload) for controlling subscriptions - // or terminating the thread, or it is 4 parts in which case all the parts - // are forwarded to the XPUB socket directly for publishing. - for ( auto& msg : msgs ) { - if ( msg.size() == 2 ) { - InprocTag tag = msg[0].data()[0]; - switch ( tag ) { - case InprocTag::XsubUpdate: { - xsub.send(msg[1], zmq::send_flags::none); - break; - } - case InprocTag::Terminate: { - if ( self_thread_stop ) - ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); - self_thread_stop = true; - } +// Forward messages from the inprocess bridge. +// +// Either it's 2 parts (tag and payload) for controlling subscriptions +// or terminating the thread, or it is 4 parts in which case all the parts +// are forwarded to the XPUB socket directly for publishing. +void ZeroMQBackend::HandleInprocMessages(std::vector& msgs) { + for ( auto& msg : msgs ) { + if ( msg.size() == 2 ) { + InprocTag tag = msg[0].data()[0]; + switch ( tag ) { + case InprocTag::XsubUpdate: { + xsub.send(msg[1], zmq::send_flags::none); + break; + } + case InprocTag::Terminate: { + if ( self_thread_stop ) + ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message"); + self_thread_stop = true; } } - else if ( msg.size() == 4 ) { - for ( auto& part : msg ) { - zmq::send_flags flags = zmq::send_flags::dontwait; - if ( part.more() ) - flags = flags | zmq::send_flags::sndmore; + } + else if ( msg.size() == 4 ) { + for ( auto& part : msg ) { + zmq::send_flags flags = zmq::send_flags::dontwait; + if ( part.more() ) + flags = flags | zmq::send_flags::sndmore; + + zmq::send_result_t result; + int tries = 0; + do { + try { + result = xpub.send(part, flags); + } catch ( zmq::error_t& err ) { + if ( err.num() == ETERM ) + return; + + // XXX: What other error can happen here? How should we react? + ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + break; + } + + // Empty result means xpub.send() returned EAGAIN. The socket reached + // its high-water-mark and we cannot send right now. We simply attempt + // to re-send the message without the dontwait flag after increasing + // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until + // there's enough room available. + if ( ! result ) { + total_xpub_stalls->Inc(); - zmq::send_result_t result; - int tries = 0; - do { try { - result = xpub.send(part, flags); + // We sent non-blocking above so we are able to observe and report stalls + // in a metric. Now that we have done that switch to blocking send. + zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore); + result = xpub.send(part, block_flags); } catch ( zmq::error_t& err ) { if ( err.num() == ETERM ) return; // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num()); + ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), + err.num()); break; } + } + } while ( ! result ); + } + } + else { + ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); + } + } +} - // Empty result means xpub.send() returned EAGAIN. The socket reached - // its high-water-mark and we cannot send right now. We simply attempt - // to re-send the message without the dontwait flag after increasing - // the xpub stall metric. This way, ZeroMQ will block in xpub.send() until - // there's enough room available. - if ( ! result ) { - total_xpub_stalls->Inc(); +void ZeroMQBackend::HandleLogMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + // sender, format, type, payload + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size()); + continue; + } - try { - // We sent non-blocking above so we are able to observe and report stalls - // in a metric. Now that we have done that switch to blocking send. - zmq::send_flags block_flags = - zmq::send_flags::none | (flags & zmq::send_flags::sndmore); - result = xpub.send(part, block_flags); - } catch ( zmq::error_t& err ) { - if ( err.num() == ETERM ) - return; + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + LogMessage lm{.format = std::string(msg[2].data(), msg[2].size()), .payload = std::move(payload)}; - // XXX: What other error can happen here? How should we react? - ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(), - err.num()); - break; - } - } - } while ( ! result ); - } + QueueForProcessing(std::move(lm)); + } +} + +void ZeroMQBackend::HandleXPubMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + if ( msg.size() != 1 ) { + ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + continue; + } + + // Check if the messages starts with \x00 or \x01 to understand if it's + // a subscription or unsubscription message. + auto first = *reinterpret_cast(msg[0].data()); + if ( first == 0 || first == 1 ) { + QueueMessage qm; + auto* start = msg[0].data() + 1; + auto* end = msg[0].data() + msg[0].size(); + byte_buffer topic(start, end); + if ( first == 1 ) { + qm = BackendMessage{1, std::move(topic)}; + } + else if ( first == 0 ) { + qm = BackendMessage{0, std::move(topic)}; } else { - ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size()); - } - } - }; - - auto HandleXPubMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - if ( msg.size() != 1 ) { - ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); + ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); continue; } - // Check if the messages starts with \x00 or \x01 to understand if it's - // a subscription or unsubscription message. - auto first = *reinterpret_cast(msg[0].data()); - if ( first == 0 || first == 1 ) { - QueueMessage qm; - auto* start = msg[0].data() + 1; - auto* end = msg[0].data() + msg[0].size(); - byte_buffer topic(start, end); - if ( first == 1 ) { - qm = BackendMessage{1, std::move(topic)}; - } - else if ( first == 0 ) { - qm = BackendMessage{0, std::move(topic)}; - } - else { - ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first); - continue; - } - - QueueForProcessing(std::move(qm)); - } + QueueForProcessing(std::move(qm)); } - }; + } +} - auto HandleXSubMessages = [this](const std::vector& msgs) { - for ( const auto& msg : msgs ) { - if ( msg.size() != 4 ) { - ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); - continue; - } - - // Filter out messages that are coming from this node. - std::string sender(msg[1].data(), msg[1].size()); - if ( sender == NodeId() ) - continue; - - byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; - EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), - .format = std::string(msg[2].data(), msg[2].size()), - .payload = std::move(payload)}; - - QueueForProcessing(std::move(em)); +void ZeroMQBackend::HandleXSubMessages(const std::vector& msgs) { + for ( const auto& msg : msgs ) { + if ( msg.size() != 4 ) { + ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); + continue; } - }; + + // Filter out messages that are coming from this node. + std::string sender(msg[1].data(), msg[1].size()); + if ( sender == NodeId() ) + continue; + + byte_buffer payload{msg[3].data(), msg[3].data() + msg[3].size()}; + EventMessage em{.topic = std::string(msg[0].data(), msg[0].size()), + .format = std::string(msg[2].data(), msg[2].size()), + .payload = std::move(payload)}; + + QueueForProcessing(std::move(em)); + } +} + +void ZeroMQBackend::Run() { + char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex> + snprintf(name, sizeof(name), "zmq-%p", this); + util::detail::set_thread_name(name); + ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this); struct SocketInfo { zmq::socket_ref socket; @@ -601,10 +597,10 @@ void ZeroMQBackend::Run() { }; std::vector sockets = { - {.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages}, - {.socket = xpub, .name = "xpub", .handler = HandleXPubMessages}, - {.socket = xsub, .name = "xsub", .handler = HandleXSubMessages}, - {.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages}, + {.socket = child_inproc, .name = "inproc", .handler = [this](auto& msgs) { HandleInprocMessages(msgs); }}, + {.socket = xpub, .name = "xpub", .handler = [this](const auto& msgs) { HandleXPubMessages(msgs); }}, + {.socket = xsub, .name = "xsub", .handler = [this](const auto& msgs) { HandleXSubMessages(msgs); }}, + {.socket = log_pull, .name = "log_pull", .handler = [this](const auto& msgs) { HandleLogMessages(msgs); }}, }; // Called when Run() terminates. diff --git a/src/cluster/backend/zeromq/ZeroMQ.h b/src/cluster/backend/zeromq/ZeroMQ.h index 4d94b51142..2932f0161a 100644 --- a/src/cluster/backend/zeromq/ZeroMQ.h +++ b/src/cluster/backend/zeromq/ZeroMQ.h @@ -73,6 +73,13 @@ private: void DoReadyToPublishCallback(ReadyCallback cb) override; + // Inner thread helper methods. + using MultipartMessage = std::vector; + void HandleInprocMessages(std::vector& msgs); + void HandleLogMessages(const std::vector& msgs); + void HandleXPubMessages(const std::vector& msgs); + void HandleXSubMessages(const std::vector& msgs); + // Script level variables. std::string connect_xsub_endpoint; std::string connect_xpub_endpoint;