From 09ccb2e2502caaa5c73f456f9bacdaf9c7c6b0cb Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 22 Jan 2025 12:09:32 +0100 Subject: [PATCH] cluster/Backend: Queue a single message only The ZeroMQ backend would accumulate multiple messages and enqueue them all at once. However, as this could potentially result in huge batches of events being queued into the event loop at once, switch to a one message at a time model. If there's too many messages queued already, OnLoop::QueueForProcessing() will block the ZeroMQ thread until there's room available again. --- src/cluster/Backend.cc | 53 ++++++++++++++++++++++++------------------ src/cluster/Backend.h | 11 +++++---- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index e46d989476..92d9dd4624 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -161,7 +161,7 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa ThreadedBackend::ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : Backend(std::move(es), std::move(ls), std::move(ehs)) { - onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); + onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); onloop->Register(true); // Register as don't count first } @@ -171,26 +171,35 @@ bool ThreadedBackend::DoInit() { return true; } -void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { - onloop->QueueForProcessing(std::move(qmessages)); -} - -void ThreadedBackend::Process() { onloop->Process(); } - -void ThreadedBackend::Process(QueueMessages&& to_process) { - for ( const auto& msg : to_process ) { - // sonarlint wants to use std::visit. not sure... - if ( auto* emsg = std::get_if(&msg) ) { - ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); - } - else if ( auto* lmsg = std::get_if(&msg) ) { - ProcessLogMessage(lmsg->format, lmsg->payload_span()); - } - else if ( auto* bmsg = std::get_if(&msg) ) { - ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); - } - else { - zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); - } +void ThreadedBackend::DoTerminate() { + if ( onloop ) { + onloop->Close(); + onloop = nullptr; + } +} + +void ThreadedBackend::QueueForProcessing(QueueMessage&& qmessages) { + if ( onloop ) + onloop->QueueForProcessing(std::move(qmessages)); +} + +void ThreadedBackend::Process() { + if ( onloop ) + onloop->Process(); +} + +void ThreadedBackend::Process(QueueMessage&& msg) { + // sonarlint wants to use std::visit. not sure... + if ( auto* emsg = std::get_if(&msg) ) { + ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); + } + else if ( auto* lmsg = std::get_if(&msg) ) { + ProcessLogMessage(lmsg->format, lmsg->payload_span()); + } + else if ( auto* bmsg = std::get_if(&msg) ) { + ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); + } + else { + zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); } } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 2feadb89a6..794b270eff 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -461,7 +461,6 @@ struct BackendMessage { }; using QueueMessage = std::variant; -using QueueMessages = std::vector; /** * Support for backends that use background threads or invoke @@ -482,7 +481,7 @@ protected: * * @param messages Messages to be enqueued. */ - void QueueForProcessing(QueueMessages&& messages); + void QueueForProcessing(QueueMessage&& messages); /** * Delegate to onloop->Process() to trigger processing @@ -502,6 +501,8 @@ protected: */ bool DoInit() override; + void DoTerminate() override; + private: /** * Process a backend specific message queued as BackendMessage. @@ -517,13 +518,13 @@ private: /** * Hook method for OnLooProcess. */ - void Process(QueueMessages&& messages); + void Process(QueueMessage&& messages); // Allow access to Process(QueueMessages) - friend class zeek::detail::OnLoopProcess; + friend class zeek::detail::OnLoopProcess; // Members used for communication with the main thread. - zeek::detail::OnLoopProcess* onloop; + zeek::detail::OnLoopProcess* onloop = nullptr; };