cluster/Backend: Queue a single message only

The ZeroMQ backend would accumulate multiple messages and enqueue them
all at once. However, as this could potentially result in huge batches
of events being queued into the event loop at once, switch to a one
message at a time model. If there's too many messages queued already,
OnLoop::QueueForProcessing() will block the ZeroMQ thread until
there's room available again.
This commit is contained in:
Arne Welzel 2025-01-22 12:09:32 +01:00
parent 827eccb732
commit 09ccb2e250
2 changed files with 37 additions and 27 deletions

View file

@ -161,7 +161,7 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa
ThreadedBackend::ThreadedBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
: Backend(std::move(es), std::move(ls), std::move(ehs)) {
onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>(this, "ThreadedBackend");
onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>(this, "ThreadedBackend");
onloop->Register(true); // Register as don't count first
}
@ -171,26 +171,35 @@ bool ThreadedBackend::DoInit() {
return true;
}
void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) {
onloop->QueueForProcessing(std::move(qmessages));
}
void ThreadedBackend::Process() { onloop->Process(); }
void ThreadedBackend::Process(QueueMessages&& to_process) {
for ( const auto& msg : to_process ) {
// sonarlint wants to use std::visit. not sure...
if ( auto* emsg = std::get_if<EventMessage>(&msg) ) {
ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span());
}
else if ( auto* lmsg = std::get_if<LogMessage>(&msg) ) {
ProcessLogMessage(lmsg->format, lmsg->payload_span());
}
else if ( auto* bmsg = std::get_if<BackendMessage>(&msg) ) {
ProcessBackendMessage(bmsg->tag, bmsg->payload_span());
}
else {
zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index());
}
void ThreadedBackend::DoTerminate() {
if ( onloop ) {
onloop->Close();
onloop = nullptr;
}
}
void ThreadedBackend::QueueForProcessing(QueueMessage&& qmessages) {
if ( onloop )
onloop->QueueForProcessing(std::move(qmessages));
}
void ThreadedBackend::Process() {
if ( onloop )
onloop->Process();
}
void ThreadedBackend::Process(QueueMessage&& msg) {
// sonarlint wants to use std::visit. not sure...
if ( auto* emsg = std::get_if<EventMessage>(&msg) ) {
ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span());
}
else if ( auto* lmsg = std::get_if<LogMessage>(&msg) ) {
ProcessLogMessage(lmsg->format, lmsg->payload_span());
}
else if ( auto* bmsg = std::get_if<BackendMessage>(&msg) ) {
ProcessBackendMessage(bmsg->tag, bmsg->payload_span());
}
else {
zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index());
}
}

View file

@ -461,7 +461,6 @@ struct BackendMessage {
};
using QueueMessage = std::variant<EventMessage, LogMessage, BackendMessage>;
using QueueMessages = std::vector<QueueMessage>;
/**
* Support for backends that use background threads or invoke
@ -482,7 +481,7 @@ protected:
*
* @param messages Messages to be enqueued.
*/
void QueueForProcessing(QueueMessages&& messages);
void QueueForProcessing(QueueMessage&& messages);
/**
* Delegate to onloop->Process() to trigger processing
@ -502,6 +501,8 @@ protected:
*/
bool DoInit() override;
void DoTerminate() override;
private:
/**
* Process a backend specific message queued as BackendMessage.
@ -517,13 +518,13 @@ private:
/**
* Hook method for OnLooProcess.
*/
void Process(QueueMessages&& messages);
void Process(QueueMessage&& messages);
// Allow access to Process(QueueMessages)
friend class zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>;
friend class zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>;
// Members used for communication with the main thread.
zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>* onloop;
zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessage>* onloop = nullptr;
};