cluster/ThreadedBackend: Switch to OnLoopProcess

This commit is contained in:
Arne Welzel 2025-01-21 18:46:58 +01:00
parent 5dee77e6f2
commit 23405194a0
2 changed files with 36 additions and 71 deletions

View file

@ -10,8 +10,8 @@
#include "zeek/Func.h" #include "zeek/Func.h"
#include "zeek/Reporter.h" #include "zeek/Reporter.h"
#include "zeek/Type.h" #include "zeek/Type.h"
#include "zeek/cluster/OnLoop.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/iosource/Manager.h"
#include "zeek/logging/Manager.h" #include "zeek/logging/Manager.h"
#include "zeek/util.h" #include "zeek/util.h"
@ -158,63 +158,26 @@ bool ThreadedBackend::ProcessBackendMessage(int tag, detail::byte_buffer_span pa
return DoProcessBackendMessage(tag, payload); return DoProcessBackendMessage(tag, payload);
} }
namespace { ThreadedBackend::ThreadedBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs)
bool register_io_source(zeek::iosource::IOSource* src, int fd, bool dont_count) { : Backend(std::move(es), std::move(ls), std::move(ehs)) {
constexpr bool manage_lifetime = true; onloop = new zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>(this, "ThreadedBackend");
onloop->Register(true); // Register as don't count first
zeek::iosource_mgr->Register(src, dont_count, manage_lifetime);
if ( ! zeek::iosource_mgr->RegisterFd(fd, src) ) {
zeek::reporter->Error("Failed to register messages_flare with IO manager");
return false;
} }
return true;
}
} // namespace
bool ThreadedBackend::DoInit() { bool ThreadedBackend::DoInit() {
// Register as counting during DoInit() to avoid Zeek from shutting down. // Have the backend count so Zeek does not terminate.
return register_io_source(this, messages_flare.FD(), false); onloop->Register(/*dont_count=*/false);
} return true;
void ThreadedBackend::DoInitPostScript() {
// Register non-counting after parsing scripts.
register_io_source(this, messages_flare.FD(), true);
} }
void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) { void ThreadedBackend::QueueForProcessing(QueueMessages&& qmessages) {
bool fire = false; onloop->QueueForProcessing(std::move(qmessages));
// Enqueue under lock.
{
std::scoped_lock lock(messages_mtx);
fire = messages.empty();
if ( messages.empty() ) {
messages = std::move(qmessages);
}
else {
messages.reserve(messages.size() + qmessages.size());
for ( auto& qmsg : qmessages )
messages.emplace_back(std::move(qmsg));
}
} }
if ( fire ) void ThreadedBackend::Process() { onloop->Process(); }
messages_flare.Fire();
}
void ThreadedBackend::Process() {
QueueMessages to_process;
{
std::scoped_lock lock(messages_mtx);
to_process = std::move(messages);
messages_flare.Extinguish();
messages.clear();
}
void ThreadedBackend::Process(QueueMessages&& to_process) {
for ( const auto& msg : to_process ) { for ( const auto& msg : to_process ) {
// sonarlint wants to use std::visit. not sure... // sonarlint wants to use std::visit. not sure...
if ( auto* emsg = std::get_if<EventMessage>(&msg) ) { if ( auto* emsg = std::get_if<EventMessage>(&msg) ) {

View file

@ -5,17 +5,14 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <mutex>
#include <optional> #include <optional>
#include <string_view> #include <string_view>
#include <variant> #include <variant>
#include "zeek/EventHandler.h" #include "zeek/EventHandler.h"
#include "zeek/Flare.h"
#include "zeek/IntrusivePtr.h" #include "zeek/IntrusivePtr.h"
#include "zeek/Span.h" #include "zeek/Span.h"
#include "zeek/cluster/Serializer.h" #include "zeek/cluster/Serializer.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/logging/Types.h" #include "zeek/logging/Types.h"
namespace zeek { namespace zeek {
@ -28,6 +25,11 @@ class Val;
using ValPtr = IntrusivePtr<Val>; using ValPtr = IntrusivePtr<Val>;
using ArgsSpan = Span<const ValPtr>; using ArgsSpan = Span<const ValPtr>;
namespace detail {
template<class Proc, class Work>
class OnLoopProcess;
}
namespace cluster { namespace cluster {
namespace detail { namespace detail {
@ -465,11 +467,14 @@ using QueueMessages = std::vector<QueueMessage>;
* Support for backends that use background threads or invoke * Support for backends that use background threads or invoke
* callbacks on non-main threads. * callbacks on non-main threads.
*/ */
class ThreadedBackend : public Backend, public zeek::iosource::IOSource { class ThreadedBackend : public Backend {
public:
using Backend::Backend;
protected: protected:
/**
* Constructor.
*/
ThreadedBackend(std::unique_ptr<EventSerializer> es, std::unique_ptr<LogSerializer> ls,
std::unique_ptr<detail::EventHandlingStrategy> ehs);
/** /**
* To be used by implementations to enqueue messages for processing on the IO loop. * To be used by implementations to enqueue messages for processing on the IO loop.
* *
@ -479,20 +484,11 @@ protected:
*/ */
void QueueForProcessing(QueueMessages&& messages); void QueueForProcessing(QueueMessages&& messages);
void Process() override;
double GetNextTimeout() override { return -1; }
/** /**
* The DoInitPostScript() implementation of ThreadedBackend * Delegate to onloop->Process() to trigger processing
* registers itself as a non-counting IO source. * of outstanding queued messages explicitly, if any.
*
* Classes deriving from ThreadedBackend and providing their
* own DoInitPostScript() method should invoke the ThreadedBackend's
* implementation to register themselves as a non-counting
* IO source with the IO loop.
*/ */
void DoInitPostScript() override; void Process();
/** /**
* The default DoInit() implementation of ThreadedBackend * The default DoInit() implementation of ThreadedBackend
@ -518,10 +514,16 @@ private:
*/ */
virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; }; virtual bool DoProcessBackendMessage(int tag, detail::byte_buffer_span payload) { return false; };
/**
* Hook method for OnLooProcess.
*/
void Process(QueueMessages&& messages);
// Allow access to Process(QueueMessages)
friend class zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>;
// Members used for communication with the main thread. // Members used for communication with the main thread.
std::mutex messages_mtx; zeek::detail::OnLoopProcess<ThreadedBackend, QueueMessages>* onloop;
std::vector<QueueMessage> messages;
zeek::detail::Flare messages_flare;
}; };