diff --git a/src/cluster/OnLoop.h b/src/cluster/OnLoop.h index 88edfa21bc..d0aae42d45 100644 --- a/src/cluster/OnLoop.h +++ b/src/cluster/OnLoop.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,8 @@ #include "zeek/iosource/Manager.h" namespace zeek::detail { + + /** * Template class allowing work items to be queued by threads and processed * in Zeek's main thread. @@ -38,12 +41,15 @@ public: * * @param proc The instance processing. * @param tag The tag to use as the IOSource's tag. + * @param max_queue_size How many messages to queue before blocking the producing thread. + * @param cond_timeout If a producer is blocked for more than that many microseconds, report a warning. + * @param main_thread_id The ID of the main thread for usage checks. */ - OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 10, - std::chrono::microseconds block_duration = std::chrono::microseconds(100), + OnLoopProcess(Proc* proc, std::string tag, size_t max_queue_size = 250, + std::chrono::microseconds cond_timeout = std::chrono::microseconds(100000), std::thread::id main_thread_id = std::this_thread::get_id()) - : max_queue_size(max_queue_size), - block_duration(block_duration), + : cond_timeout(cond_timeout), + max_queue_size(max_queue_size), proc(proc), tag(std::move(tag)), main_thread_id(main_thread_id) {} @@ -76,6 +82,9 @@ public: std::scoped_lock lock(mtx); SetClosed(true); + // Wake a process stuck in queueing. + cond.notify_one(); + // Don't attempt to Process anymore. proc = nullptr; } @@ -92,12 +101,21 @@ public: */ void Process() override { std::list to_process; + bool notify = false; { std::scoped_lock lock(mtx); + if ( queue.size() >= max_queue_size ) + notify = true; + to_process.splice(to_process.end(), queue); flare.Extinguish(); } + // The queue was full before and is now empty, + // wake up any pending thread. + if ( notify ) + cond.notify_one(); + // We've been closed, so proc will most likely // be invalid at this point and we'll discard // whatever was left to do. @@ -139,43 +157,58 @@ public: bool fire = false; size_t qs = 0; - while ( ! to_queue.empty() ) { - { - std::scoped_lock lock(mtx); + int timeouts = 0; - if ( ! IsOpen() ) { - // IO Source is being removed. - fire = false; - break; - } + { + std::unique_lock lock(mtx); - qs = queue.size(); - if ( qs < max_queue_size ) { - queue.splice(queue.end(), to_queue); - fire = fire || qs == 0; - assert(to_queue.empty()); - assert(! queue.empty()); - } + // Wait for room in the queue. + while ( IsOpen() && queue.size() >= max_queue_size ) { + auto status = cond.wait_for(lock, cond_timeout); + if ( status == std::cv_status::timeout && IsOpen() ) + ++timeouts; } - if ( ! to_queue.empty() ) { - std::this_thread::sleep_for(block_duration); - fire = true; + if ( IsOpen() ) { + assert(queue.size() < max_queue_size); + assert(to_queue.size() == 1); + queue.splice(queue.end(), to_queue); + fire = queue.size() == 1; // first element in queue triggers processing. + } + else { + // IO Source is being or was removed. + fire = false; } } + if ( fire ) flare.Fire(); + if ( timeouts > 0 ) { + // XXX: Should this invoke some callback or change the return value + // so users can react on this? + // + // We could also do suicidal snail pattern here. If the event + // loop is unable to process, we may as well knock ourselves out. + std::fprintf(stderr, "timeouts %d!\n", timeouts); + } + --queuers; } private: + // Flare to notify Zeek's IO loop. zeek::detail::Flare flare; + + // Mutex, condition and timeout protecting access to queue. std::mutex mtx; + std::condition_variable cond; + std::chrono::microseconds cond_timeout; + std::list queue; size_t max_queue_size; - std::chrono::microseconds block_duration; + Proc* proc; std::string tag; std::atomic queuers = 0;