diff --git a/src/cluster/Backend.cc b/src/cluster/Backend.cc index 177997238b..e46d989476 100644 --- a/src/cluster/Backend.cc +++ b/src/cluster/Backend.cc @@ -10,8 +10,8 @@ #include "zeek/Func.h" #include "zeek/Reporter.h" #include "zeek/Type.h" +#include "zeek/cluster/OnLoop.h" #include "zeek/cluster/Serializer.h" -#include "zeek/iosource/Manager.h" #include "zeek/logging/Manager.h" #include "zeek/util.h" @@ -158,63 +158,26 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa return DoProcessBackendMessage(tag, payload); } -namespace { - -bool register_io_source(zeek::iosource::IOSource* src, int fd, bool dont_count) { - constexpr bool manage_lifetime = true; - - zeek::iosource_mgr->Register(src, dont_count, manage_lifetime); - - if ( ! zeek::iosource_mgr->RegisterFd(fd, src) ) { - zeek::reporter->Error("Failed to register messages_flare with IO manager"); - return false; - } - - return true; +ThreadedBackend::ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs) + : Backend(std::move(es), std::move(ls), std::move(ehs)) { + onloop = new zeek::detail::OnLoopProcess(this, "ThreadedBackend"); + onloop->Register(true); // Register as don't count first } -} // namespace bool ThreadedBackend::DoInit() { - // Register as counting during DoInit() to avoid Zeek from shutting down. - return register_io_source(this, messages_flare.FD(), false); -} - -void ThreadedBackend::DoInitPostScript() { - // Register non-counting after parsing scripts. - register_io_source(this, messages_flare.FD(), true); + // Have the backend count so Zeek does not terminate. + onloop->Register(/*dont_count=*/false); + return true; } void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { - bool fire = false; - - // Enqueue under lock. - { - std::scoped_lock lock(messages_mtx); - fire = messages.empty(); - - if ( messages.empty() ) { - messages = std::move(qmessages); - } - else { - messages.reserve(messages.size() + qmessages.size()); - for ( auto& qmsg : qmessages ) - messages.emplace_back(std::move(qmsg)); - } - } - - if ( fire ) - messages_flare.Fire(); + onloop->QueueForProcessing(std::move(qmessages)); } -void ThreadedBackend::Process() { - QueueMessages to_process; - { - std::scoped_lock lock(messages_mtx); - to_process = std::move(messages); - messages_flare.Extinguish(); - messages.clear(); - } +void ThreadedBackend::Process() { onloop->Process(); } +void ThreadedBackend::Process(QueueMessages&& to_process) { for ( const auto& msg : to_process ) { // sonarlint wants to use std::visit. not sure... if ( auto* emsg = std::get_if(&msg) ) { diff --git a/src/cluster/Backend.h b/src/cluster/Backend.h index 5953c7b8b2..2feadb89a6 100644 --- a/src/cluster/Backend.h +++ b/src/cluster/Backend.h @@ -5,17 +5,14 @@ #pragma once #include -#include #include #include #include #include "zeek/EventHandler.h" -#include "zeek/Flare.h" #include "zeek/IntrusivePtr.h" #include "zeek/Span.h" #include "zeek/cluster/Serializer.h" -#include "zeek/iosource/IOSource.h" #include "zeek/logging/Types.h" namespace zeek { @@ -28,6 +25,11 @@ class Val; using ValPtr = IntrusivePtr; using ArgsSpan = Span; +namespace detail { +template +class OnLoopProcess; +} + namespace cluster { namespace detail { @@ -465,11 +467,14 @@ using QueueMessages = std::vector; * Support for backends that use background threads or invoke * callbacks on non-main threads. */ -class ThreadedBackend : public Backend, public zeek::iosource::IOSource { -public: - using Backend::Backend; - +class ThreadedBackend : public Backend { protected: + /** + * Constructor. + */ + ThreadedBackend(std::unique_ptr es, std::unique_ptr ls, + std::unique_ptr ehs); + /** * To be used by implementations to enqueue messages for processing on the IO loop. * @@ -479,20 +484,11 @@ protected: */ void QueueForProcessing(QueueMessages&& messages); - void Process() override; - - double GetNextTimeout() override { return -1; } - /** - * The DoInitPostScript() implementation of ThreadedBackend - * registers itself as a non-counting IO source. - * - * Classes deriving from ThreadedBackend and providing their - * own DoInitPostScript() method should invoke the ThreadedBackend's - * implementation to register themselves as a non-counting - * IO source with the IO loop. + * Delegate to onloop->Process() to trigger processing + * of outstanding queued messages explicitly, if any. */ - void DoInitPostScript() override; + void Process(); /** * The default DoInit() implementation of ThreadedBackend @@ -518,10 +514,16 @@ private: */ virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; }; + /** + * Hook method for OnLooProcess. + */ + void Process(QueueMessages&& messages); + + // Allow access to Process(QueueMessages) + friend class zeek::detail::OnLoopProcess; + // Members used for communication with the main thread. - std::mutex messages_mtx; - std::vector messages; - zeek::detail::Flare messages_flare; + zeek::detail::OnLoopProcess* onloop; };