diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index e46d989476..92d9dd4624 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -161,7 +161,7 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa ThreadedBackend::ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, std::unique_ptr ehs) : Backend(std::move(es), std::move(ls), std::move(ehs)) { - onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); + onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); onloop->Register(true); // Register as don't count first } @@ -171,26 +171,35 @@ bool ThreadedBackend::DoInit() { return true; } -void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { - onloop->QueueForProcessing(std::move(qmessages)); -} - -void ThreadedBackend::Process() { onloop->Process(); } - -void ThreadedBackend::Process(QueueMessages&& to_process) { - for ( const auto& msg : to_process ) { - // sonarlint wants to use std::visit. not sure... - if ( auto* emsg = std::get_if(&msg) ) { - ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); - } - else if ( auto* lmsg = std::get_if(&msg) ) { - ProcessLogMessage(lmsg->format, lmsg->payload_span()); - } - else if ( auto* bmsg = std::get_if(&msg) ) { - ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); - } - else { - zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); - } +void ThreadedBackend::DoTerminate() { + if ( onloop ) { + onloop->Close(); + onloop = nullptr; + } +} + +void ThreadedBackend::QueueForProcessing(QueueMessage&& qmessages) { + if ( onloop ) + onloop->QueueForProcessing(std::move(qmessages)); +} + +void ThreadedBackend::Process() { + if ( onloop ) + onloop->Process(); +} + +void ThreadedBackend::Process(QueueMessage&& msg) { + // sonarlint wants to use std::visit. not sure... + if ( auto* emsg = std::get_if(&msg) ) { + ProcessEventMessage(emsg->topic, emsg->format, emsg->payload_span()); + } + else if ( auto* lmsg = std::get_if(&msg) ) { + ProcessLogMessage(lmsg->format, lmsg->payload_span()); + } + else if ( auto* bmsg = std::get_if(&msg) ) { + ProcessBackendMessage(bmsg->tag, bmsg->payload_span()); + } + else { + zeek::reporter->FatalError("Unimplemented QueueMessage %zu", msg.index()); } } diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 2feadb89a6..794b270eff 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -461,7 +461,6 @@ struct BackendMessage { }; using QueueMessage = std::variant; -using QueueMessages = std::vector; /** * Support for backends that use background threads or invoke @@ -482,7 +481,7 @@ protected: * * @param messages Messages to be enqueued. */ - void QueueForProcessing(QueueMessages&& messages); + void QueueForProcessing(QueueMessage&& messages); /** * Delegate to onloop->Process() to trigger processing @@ -502,6 +501,8 @@ protected: */ bool DoInit() override; + void DoTerminate() override; + private: /** * Process a backend specific message queued as BackendMessage. @@ -517,13 +518,13 @@ private: /** * Hook method for OnLooProcess. */ - void Process(QueueMessages&& messages); + void Process(QueueMessage&& messages); // Allow access to Process(QueueMessages) - friend class zeek::detail::OnLoopProcess; + friend class zeek::detail::OnLoopProcess; // Members used for communication with the main thread. - zeek::detail::OnLoopProcess* onloop; + zeek::detail::OnLoopProcess* onloop = nullptr; };