cluster/OnLoop: Switch to condition variable

The busy polling wasn't clever and usually resulted in delays. For now,
switch to mutex/condition variable and log an error if the timeouts are
immense.
This commit is contained in:
Arne Welzel 2025-03-20 11:56:02 +01:00
parent 2963c49f27
commit 387237e9c2

View file

@ -4,6 +4,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <list> #include <list>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
@ -14,6 +15,8 @@
#include "zeek/iosource/Manager.h" #include "zeek/iosource/Manager.h"
namespace zeek::detail { namespace zeek::detail {
/** /**
* Template class allowing work items to be queued by threads and processed * Template class allowing work items to be queued by threads and processed
* in Zeek's main thread. * in Zeek's main thread.
@ -38,12 +41,15 @@ public:
* *
* @param proc The instance processing. * @param proc The instance processing.
* @param tag The tag to use as the IOSource's tag. * @param tag The tag to use as the IOSource's tag.
* @param max_queue_size How many messages to queue before blocking the producing thread.
* @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning.
* @param main_thread_id The ID of the main thread for usage checks.
*/ */
OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 10, OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250,
std::chrono::microseconds block_duration = std::chrono::microseconds(100), std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000),
std::thread::id main_thread_id = std::this_thread::get_id()) std::thread::id main_thread_id = std::this_thread::get_id())
: max_queue_size(max_queue_size), : cond_timeout(cond_timeout),
block_duration(block_duration), max_queue_size(max_queue_size),
proc(proc), proc(proc),
tag(std::move(tag)), tag(std::move(tag)),
main_thread_id(main_thread_id) {} main_thread_id(main_thread_id) {}
@ -76,6 +82,9 @@ public:
std::scoped_lock lock(mtx); std::scoped_lock lock(mtx);
SetClosed(true); SetClosed(true);
// Wake a process stuck in queueing.
cond.notify_one();
// Don't attempt to Process anymore. // Don't attempt to Process anymore.
proc = nullptr; proc = nullptr;
} }
@ -92,12 +101,21 @@ public:
*/ */
void Process() override { void Process() override {
std::list<Work> to_process; std::list<Work> to_process;
bool notify = false;
{ {
std::scoped_lock lock(mtx); std::scoped_lock lock(mtx);
if ( queue.size() >= max_queue_size )
notify = true;
to_process.splice(to_process.end(), queue); to_process.splice(to_process.end(), queue);
flare.Extinguish(); flare.Extinguish();
} }
// The queue was full before and is now empty,
// wake up any pending thread.
if ( notify )
cond.notify_one();
// We've been closed, so proc will most likely // We've been closed, so proc will most likely
// be invalid at this point and we'll discard // be invalid at this point and we'll discard
// whatever was left to do. // whatever was left to do.
@ -139,43 +157,58 @@ public:
bool fire = false; bool fire = false;
size_t qs = 0; size_t qs = 0;
while ( ! to_queue.empty() ) { int timeouts = 0;
{ {
std::scoped_lock lock(mtx); std::unique_lock lock(mtx);
if ( ! IsOpen() ) { // Wait for room in the queue.
// IO Source is being removed. while ( IsOpen() && queue.size() >= max_queue_size ) {
fire = false; auto status = cond.wait_for(lock, cond_timeout);
break; if ( status == std::cv_status::timeout && IsOpen() )
++timeouts;
} }
qs = queue.size(); if ( IsOpen() ) {
if ( qs < max_queue_size ) { assert(queue.size() < max_queue_size);
assert(to_queue.size() == 1);
queue.splice(queue.end(), to_queue); queue.splice(queue.end(), to_queue);
fire = fire || qs == 0; fire = queue.size() == 1; // first element in queue triggers processing.
assert(to_queue.empty()); }
assert(! queue.empty()); else {
// IO Source is being or was removed.
fire = false;
} }
} }
if ( ! to_queue.empty() ) {
std::this_thread::sleep_for(block_duration);
fire = true;
}
}
if ( fire ) if ( fire )
flare.Fire(); flare.Fire();
if ( timeouts > 0 ) {
// XXX: Should this invoke some callback or change the return value
// so users can react on this?
//
// We could also do suicidal snail pattern here. If the event
// loop is unable to process, we may as well knock ourselves out.
std::fprintf(stderr, "timeouts %d!\n", timeouts);
}
--queuers; --queuers;
} }
private: private:
// Flare to notify Zeek's IO loop.
zeek::detail::Flare flare; zeek::detail::Flare flare;
// Mutex, condition and timeout protecting access to queue.
std::mutex mtx; std::mutex mtx;
std::condition_variable cond;
std::chrono::microseconds cond_timeout;
std::list<Work> queue; std::list<Work> queue;
size_t max_queue_size; size_t max_queue_size;
std::chrono::microseconds block_duration;
Proc* proc; Proc* proc;
std::string tag; std::string tag;
std::atomic<int> queuers = 0; std::atomic<int> queuers = 0;