cluster/zeromq: Queue one message at a time

Queueing multiple messages can easily overload the IO loop without
creating any backpressure.
This commit is contained in:
Arne Welzel 2025-01-29 15:39:26 +01:00
parent 09ccb2e250
commit 94ec3af2b0

View file

@ -366,9 +366,6 @@ void ZeroMQBackend::Run() {
using MultipartMessage = std::vector<zmq::message_t>; using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) { for ( const auto& msg : msgs ) {
// sender, format, type, payload // sender, format, type, payload
if ( msg.size() != 4 ) { if ( msg.size() != 4 ) {
@ -377,11 +374,11 @@ void ZeroMQBackend::Run() {
} }
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()}; detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
qmsgs.emplace_back(LogMessage{.format = std::string(msg[2].data<const char>(), msg[2].size()), LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)}); .payload = std::move(payload)};
}
QueueForProcessing(std::move(qmsgs)); QueueForProcessing(std::move(lm));
}
}; };
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) { auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
@ -434,9 +431,6 @@ void ZeroMQBackend::Run() {
}; };
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) { for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) { if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size()); ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
@ -462,17 +456,12 @@ void ZeroMQBackend::Run() {
continue; continue;
} }
qmsgs.emplace_back(std::move(qm)); QueueForProcessing(std::move(qm));
} }
} }
QueueForProcessing(std::move(qmsgs));
}; };
auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) { auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) {
QueueMessages qmsgs;
qmsgs.reserve(msgs.size());
for ( const auto& msg : msgs ) { for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) { if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size()); ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
@ -485,12 +474,12 @@ void ZeroMQBackend::Run() {
continue; continue;
detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()}; detail::byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
qmsgs.emplace_back(EventMessage{.topic = std::string(msg[0].data<const char>(), msg[0].size()), EventMessage em{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()), .format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)}); .payload = std::move(payload)};
}
QueueForProcessing(std::move(qmsgs)); QueueForProcessing(std::move(em));
}
}; };
// Helper class running at destruction. // Helper class running at destruction.