cluster/zeromq: Rework lambdas to member functions

This commit is contained in:
Arne Welzel 2025-07-21 14:45:18 +02:00
parent 5dc4586b70
commit 85d5dda028
2 changed files with 134 additions and 131 deletions

View file

@ -444,155 +444,151 @@ bool ZeroMQBackend::DoPublishLogWrites(const logging::detail::LogWriteHeader& he
return true;
}
void ZeroMQBackend::Run() {
char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex><nul>
snprintf(name, sizeof(name), "zmq-%p", this);
util::detail::set_thread_name(name);
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
using MultipartMessage = std::vector<zmq::message_t>;
auto HandleLogMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
// sender, format, type, payload
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(lm));
}
};
auto HandleInprocMessages = [this](std::vector<MultipartMessage>& msgs) {
// Forward messages from the inprocess bridge.
//
// Either it's 2 parts (tag and payload) for controlling subscriptions
// or terminating the thread, or it is 4 parts in which case all the parts
// are forwarded to the XPUB socket directly for publishing.
for ( auto& msg : msgs ) {
if ( msg.size() == 2 ) {
InprocTag tag = msg[0].data<InprocTag>()[0];
switch ( tag ) {
case InprocTag::XsubUpdate: {
xsub.send(msg[1], zmq::send_flags::none);
break;
}
case InprocTag::Terminate: {
if ( self_thread_stop )
ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message");
self_thread_stop = true;
}
// Forward messages from the inprocess bridge.
//
// Either it's 2 parts (tag and payload) for controlling subscriptions
// or terminating the thread, or it is 4 parts in which case all the parts
// are forwarded to the XPUB socket directly for publishing.
void ZeroMQBackend::HandleInprocMessages(std::vector<MultipartMessage>& msgs) {
for ( auto& msg : msgs ) {
if ( msg.size() == 2 ) {
InprocTag tag = msg[0].data<InprocTag>()[0];
switch ( tag ) {
case InprocTag::XsubUpdate: {
xsub.send(msg[1], zmq::send_flags::none);
break;
}
case InprocTag::Terminate: {
if ( self_thread_stop )
ZEROMQ_THREAD_PRINTF("inproc: error: duplicate shutdown message");
self_thread_stop = true;
}
}
else if ( msg.size() == 4 ) {
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
}
else if ( msg.size() == 4 ) {
for ( auto& part : msg ) {
zmq::send_flags flags = zmq::send_flags::dontwait;
if ( part.more() )
flags = flags | zmq::send_flags::sndmore;
zmq::send_result_t result;
int tries = 0;
do {
try {
result = xpub.send(part, flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
break;
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we cannot send right now. We simply attempt
// to re-send the message without the dontwait flag after increasing
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
// there's enough room available.
if ( ! result ) {
total_xpub_stalls->Inc();
zmq::send_result_t result;
int tries = 0;
do {
try {
result = xpub.send(part, flags);
// We sent non-blocking above so we are able to observe and report stalls
// in a metric. Now that we have done that switch to blocking send.
zmq::send_flags block_flags = zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
result = xpub.send(part, block_flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed to publish with error %s (%d)\n", err.what(), err.num());
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
err.num());
break;
}
}
} while ( ! result );
}
}
else {
ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size());
}
}
}
// Empty result means xpub.send() returned EAGAIN. The socket reached
// its high-water-mark and we cannot send right now. We simply attempt
// to re-send the message without the dontwait flag after increasing
// the xpub stall metric. This way, ZeroMQ will block in xpub.send() until
// there's enough room available.
if ( ! result ) {
total_xpub_stalls->Inc();
void ZeroMQBackend::HandleLogMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
// sender, format, type, payload
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("log: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
try {
// We sent non-blocking above so we are able to observe and report stalls
// in a metric. Now that we have done that switch to blocking send.
zmq::send_flags block_flags =
zmq::send_flags::none | (flags & zmq::send_flags::sndmore);
result = xpub.send(part, block_flags);
} catch ( zmq::error_t& err ) {
if ( err.num() == ETERM )
return;
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
LogMessage lm{.format = std::string(msg[2].data<const char>(), msg[2].size()), .payload = std::move(payload)};
// XXX: What other error can happen here? How should we react?
ZEROMQ_THREAD_PRINTF("xpub: Failed blocking publish with error %s (%d)\n", err.what(),
err.num());
break;
}
}
} while ( ! result );
}
QueueForProcessing(std::move(lm));
}
}
void ZeroMQBackend::HandleXPubMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
continue;
}
// Check if the messages starts with \x00 or \x01 to understand if it's
// a subscription or unsubscription message.
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
if ( first == 0 || first == 1 ) {
QueueMessage qm;
auto* start = msg[0].data<std::byte>() + 1;
auto* end = msg[0].data<std::byte>() + msg[0].size();
byte_buffer topic(start, end);
if ( first == 1 ) {
qm = BackendMessage{1, std::move(topic)};
}
else if ( first == 0 ) {
qm = BackendMessage{0, std::move(topic)};
}
else {
ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size());
}
}
};
auto HandleXPubMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 1 ) {
ZEROMQ_THREAD_PRINTF("xpub: error: expected 1 part, have %zu!\n", msg.size());
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
continue;
}
// Check if the messages starts with \x00 or \x01 to understand if it's
// a subscription or unsubscription message.
auto first = *reinterpret_cast<const uint8_t*>(msg[0].data());
if ( first == 0 || first == 1 ) {
QueueMessage qm;
auto* start = msg[0].data<std::byte>() + 1;
auto* end = msg[0].data<std::byte>() + msg[0].size();
byte_buffer topic(start, end);
if ( first == 1 ) {
qm = BackendMessage{1, std::move(topic)};
}
else if ( first == 0 ) {
qm = BackendMessage{0, std::move(topic)};
}
else {
ZEROMQ_THREAD_PRINTF("xpub: error: unexpected first char: have '0x%02x'", first);
continue;
}
QueueForProcessing(std::move(qm));
}
QueueForProcessing(std::move(qm));
}
};
}
}
auto HandleXSubMessages = [this](const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
// Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == NodeId() )
continue;
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
EventMessage em{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(em));
void ZeroMQBackend::HandleXSubMessages(const std::vector<MultipartMessage>& msgs) {
for ( const auto& msg : msgs ) {
if ( msg.size() != 4 ) {
ZEROMQ_THREAD_PRINTF("xsub: error: expected 4 parts, have %zu!\n", msg.size());
continue;
}
};
// Filter out messages that are coming from this node.
std::string sender(msg[1].data<const char>(), msg[1].size());
if ( sender == NodeId() )
continue;
byte_buffer payload{msg[3].data<std::byte>(), msg[3].data<std::byte>() + msg[3].size()};
EventMessage em{.topic = std::string(msg[0].data<const char>(), msg[0].size()),
.format = std::string(msg[2].data<const char>(), msg[2].size()),
.payload = std::move(payload)};
QueueForProcessing(std::move(em));
}
}
void ZeroMQBackend::Run() {
char name[4 + 2 + 16 + 1]{}; // zmq-0x<8byte pointer in hex><nul>
snprintf(name, sizeof(name), "zmq-%p", this);
util::detail::set_thread_name(name);
ZEROMQ_DEBUG_THREAD_PRINTF(DebugFlag::THREAD, "Thread starting (%p)\n", this);
struct SocketInfo {
zmq::socket_ref socket;
@ -601,10 +597,10 @@ void ZeroMQBackend::Run() {
};
std::vector<SocketInfo> sockets = {
{.socket = child_inproc, .name = "inproc", .handler = HandleInprocMessages},
{.socket = xpub, .name = "xpub", .handler = HandleXPubMessages},
{.socket = xsub, .name = "xsub", .handler = HandleXSubMessages},
{.socket = log_pull, .name = "log_pull", .handler = HandleLogMessages},
{.socket = child_inproc, .name = "inproc", .handler = [this](auto& msgs) { HandleInprocMessages(msgs); }},
{.socket = xpub, .name = "xpub", .handler = [this](const auto& msgs) { HandleXPubMessages(msgs); }},
{.socket = xsub, .name = "xsub", .handler = [this](const auto& msgs) { HandleXSubMessages(msgs); }},
{.socket = log_pull, .name = "log_pull", .handler = [this](const auto& msgs) { HandleLogMessages(msgs); }},
};
// Called when Run() terminates.