From b2746226d563ef6b73c32925c029e76f14ff4392 Mon Sep 17 00:00:00 2001 From: Tim Wojtulewicz Date: Fri, 7 Feb 2025 16:49:54 -0700 Subject: [PATCH] src/3rdparty: Add jthread and stop_token headers --- src/3rdparty/jthread.hpp | 153 +++++++++++ src/3rdparty/stop_token.hpp | 510 ++++++++++++++++++++++++++++++++++++ 2 files changed, 663 insertions(+) create mode 100644 src/3rdparty/jthread.hpp create mode 100644 src/3rdparty/stop_token.hpp diff --git a/src/3rdparty/jthread.hpp b/src/3rdparty/jthread.hpp new file mode 100644 index 0000000000..b9ad2bca2c --- /dev/null +++ b/src/3rdparty/jthread.hpp @@ -0,0 +1,153 @@ +// Copied from https://github.com/josuttis/jthread, +// used under CC-BY-4.0. + +// ----------------------------------------------------- +// cooperative interruptable and joining thread: +// ----------------------------------------------------- +#ifndef JTHREAD_HPP +#define JTHREAD_HPP + +#include // for invoke() +#include +#include // for debugging output +#include +#include + +#include "stop_token.hpp" + +namespace std { + +//***************************************** +//* class jthread +//* - joining std::thread with signaling stop/end support +//***************************************** +class jthread { +public: + //***************************************** + //* standardized API: + //***************************************** + // - cover full API of std::thread + // to be able to switch from std::thread to std::jthread + + // types are those from std::thread: + using id = ::std::thread::id; + using native_handle_type = ::std::thread::native_handle_type; + + // construct/copy/destroy: + jthread() noexcept; + // template explicit jthread(F&& f, Args&&... args); + // THE constructor that starts the thread: + // - NOTE: does SFINAE out copy constructor semantics + template, jthread>>> + explicit jthread(Callable&& cb, Args&&... args); + ~jthread(); + + jthread(const jthread&) = delete; + jthread(jthread&&) noexcept = default; + jthread& operator=(const jthread&) = delete; + jthread& operator=(jthread&&) noexcept; + + // members: + void swap(jthread&) noexcept; + bool joinable() const noexcept; + void join(); + void detach(); + + id get_id() const noexcept; + native_handle_type native_handle(); + + // static members: + static unsigned hardware_concurrency() noexcept { return ::std::thread::hardware_concurrency(); }; + + //***************************************** + // - supplementary API: + // - for the calling thread: + [[nodiscard]] stop_source get_stop_source() noexcept; + [[nodiscard]] stop_token get_stop_token() const noexcept; + bool request_stop() noexcept { return get_stop_source().request_stop(); } + + + //***************************************** + //* implementation: + //***************************************** + +private: + //*** API for the starting thread: + stop_source _stopSource; // stop_source for started thread + ::std::thread _thread{}; // started thread (if any) +}; + + +//********************************************************************** + +//***************************************** +//* implementation of class jthread +//***************************************** + +// default constructor: +inline jthread::jthread() noexcept : _stopSource{nostopstate} {} + +// THE constructor that starts the thread: +// - NOTE: declaration does SFINAE out copy constructor semantics +template +inline jthread::jthread(Callable&& cb, Args&&... args) + : _stopSource{}, // initialize stop_source + _thread{ + [](stop_token st, auto&& cb, auto&&... args) { // called lambda in the thread + // perform tasks of the thread: + if constexpr ( std::is_invocable_v ) { + // pass the stop_token as first argument to the started thread: + ::std::invoke(::std::forward(cb), std::move(st), + ::std::forward(args)...); + } + else { + // started thread does not expect a stop token: + ::std::invoke(::std::forward(cb), ::std::forward(args)...); + } + }, + _stopSource.get_token(), // not captured due to possible races if immediately set + ::std::forward(cb), // pass callable + ::std::forward(args)... // pass arguments for callable + } {} + +// move assignment operator: +inline jthread& jthread::operator=(jthread&& t) noexcept { + if ( joinable() ) { // if not joined/detached, signal stop and wait for end: + request_stop(); + join(); + } + + _thread = std::move(t._thread); + _stopSource = std::move(t._stopSource); + return *this; +} + +// destructor: +inline jthread::~jthread() { + if ( joinable() ) { // if not joined/detached, signal stop and wait for end: + request_stop(); + join(); + } +} + + +// others: +inline bool jthread::joinable() const noexcept { return _thread.joinable(); } +inline void jthread::join() { _thread.join(); } +inline void jthread::detach() { _thread.detach(); } +inline typename jthread::id jthread::get_id() const noexcept { return _thread.get_id(); } +inline typename jthread::native_handle_type jthread::native_handle() { return _thread.native_handle(); } + +inline stop_source jthread::get_stop_source() noexcept { return _stopSource; } +inline stop_token jthread::get_stop_token() const noexcept { return _stopSource.get_token(); } + +inline void jthread::swap(jthread& t) noexcept { + std::swap(_stopSource, t._stopSource); + std::swap(_thread, t._thread); +} + + +} // namespace std + +#endif // JTHREAD_HPP diff --git a/src/3rdparty/stop_token.hpp b/src/3rdparty/stop_token.hpp new file mode 100644 index 0000000000..4a7bcbbb88 --- /dev/null +++ b/src/3rdparty/stop_token.hpp @@ -0,0 +1,510 @@ +// Copied from https://github.com/josuttis/jthread, +// used under CC-BY-4.0. + +#pragma once +// header + +#include +#include +#include +#include +#ifdef SAFE +#include +#endif + +#if defined(__x86_64__) || defined(_M_X64) +#include +#endif + +namespace std { +inline void __spin_yield() noexcept { + // TODO: Platform-specific code here +#if defined(__x86_64__) || defined(_M_X64) + _mm_pause(); +#endif +} + + +//----------------------------------------------- +// internal types for shared stop state +//----------------------------------------------- + +struct __stop_callback_base { + void (*__callback_)(__stop_callback_base*) = nullptr; + + __stop_callback_base* __next_ = nullptr; + __stop_callback_base** __prev_ = nullptr; + bool* __isRemoved_ = nullptr; + std::atomic __callbackFinishedExecuting_{false}; + + void __execute() noexcept { __callback_(this); } + +protected: + // it shall only by us who deletes this + // (workaround for virtual __execute() and destructor) + ~__stop_callback_base() = default; +}; + +struct __stop_state { +public: + void __add_token_reference() noexcept { __state_.fetch_add(__token_ref_increment, std::memory_order_relaxed); } + + void __remove_token_reference() noexcept { + auto __oldState = __state_.fetch_sub(__token_ref_increment, std::memory_order_acq_rel); + if ( __oldState < (__token_ref_increment + __source_ref_increment) ) { + delete this; + } + } + + void __add_source_reference() noexcept { __state_.fetch_add(__source_ref_increment, std::memory_order_relaxed); } + + void __remove_source_reference() noexcept { + auto __oldState = __state_.fetch_sub(__source_ref_increment, std::memory_order_acq_rel); + if ( __oldState < (__token_ref_increment + __source_ref_increment) ) { + delete this; + } + } + + bool __request_stop() noexcept { + if ( ! __try_lock_and_signal_until_signalled() ) { + // Stop has already been requested. + return false; + } + + // Set the 'stop_requested' signal and acquired the lock. + + __signallingThread_ = std::this_thread::get_id(); + + while ( __head_ != nullptr ) { + // Dequeue the head of the queue + auto* __cb = __head_; + __head_ = __cb->__next_; + const bool anyMore = __head_ != nullptr; + if ( anyMore ) { + __head_->__prev_ = &__head_; + } + // Mark this item as removed from the list. + __cb->__prev_ = nullptr; + + // Don't hold lock while executing callback + // so we don't block other threads from deregistering callbacks. + __unlock(); + + // TRICKY: Need to store a flag on the stack here that the callback + // can use to signal that the destructor was executed inline + // during the call. If the destructor was executed inline then + // it's not safe to dereference __cb after __execute() returns. + // If the destructor runs on some other thread then the other + // thread will block waiting for this thread to signal that the + // callback has finished executing. + bool __isRemoved = false; + __cb->__isRemoved_ = &__isRemoved; + + __cb->__execute(); + + if ( ! __isRemoved ) { + __cb->__isRemoved_ = nullptr; + __cb->__callbackFinishedExecuting_.store(true, std::memory_order_release); + } + + if ( ! anyMore ) { + // This was the last item in the queue when we dequeued it. + // No more items should be added to the queue after we have + // marked the state as interrupted, only removed from the queue. + // Avoid acquring/releasing the lock in this case. + return true; + } + + __lock(); + } + + __unlock(); + + return true; + } + + bool __is_stop_requested() noexcept { return __is_stop_requested(__state_.load(std::memory_order_acquire)); } + + bool __is_stop_requestable() noexcept { return __is_stop_requestable(__state_.load(std::memory_order_acquire)); } + + bool __try_add_callback(__stop_callback_base* __cb, bool __incrementRefCountIfSuccessful) noexcept { + std::uint64_t __oldState; + goto __load_state; + do { + goto __check_state; + do { + __spin_yield(); + __load_state: + __oldState = __state_.load(std::memory_order_acquire); + __check_state: + if ( __is_stop_requested(__oldState) ) { + __cb->__execute(); + return false; + } + else if ( ! __is_stop_requestable(__oldState) ) { + return false; + } + } while ( __is_locked(__oldState) ); + } while ( ! __state_.compare_exchange_weak(__oldState, __oldState | __locked_flag, std::memory_order_acquire) ); + + // Push callback onto callback list. + __cb->__next_ = __head_; + if ( __cb->__next_ != nullptr ) { + __cb->__next_->__prev_ = &__cb->__next_; + } + __cb->__prev_ = &__head_; + __head_ = __cb; + + if ( __incrementRefCountIfSuccessful ) { + __unlock_and_increment_token_ref_count(); + } + else { + __unlock(); + } + + // Successfully added the callback. + return true; + } + + void __remove_callback(__stop_callback_base* __cb) noexcept { + __lock(); + + if ( __cb->__prev_ != nullptr ) { + // Still registered, not yet executed + // Just remove from the list. + *__cb->__prev_ = __cb->__next_; + if ( __cb->__next_ != nullptr ) { + __cb->__next_->__prev_ = __cb->__prev_; + } + + __unlock_and_decrement_token_ref_count(); + + return; + } + + __unlock(); + + // Callback has either already executed or is executing + // concurrently on another thread. + + if ( __signallingThread_ == std::this_thread::get_id() ) { + // Callback executed on this thread or is still currently executing + // and is deregistering itself from within the callback. + if ( __cb->__isRemoved_ != nullptr ) { + // Currently inside the callback, let the __request_stop() method + // know the object is about to be destructed and that it should + // not try to access the object when the callback returns. + *__cb->__isRemoved_ = true; + } + } + else { + // Callback is currently executing on another thread, + // block until it finishes executing. + while ( ! __cb->__callbackFinishedExecuting_.load(std::memory_order_acquire) ) { + __spin_yield(); + } + } + + __remove_token_reference(); + } + +private: + static bool __is_locked(std::uint64_t __state) noexcept { return (__state & __locked_flag) != 0; } + + static bool __is_stop_requested(std::uint64_t __state) noexcept { return (__state & __stop_requested_flag) != 0; } + + static bool __is_stop_requestable(std::uint64_t __state) noexcept { + // Interruptible if it has already been interrupted or if there are + // still interrupt_source instances in existence. + return __is_stop_requested(__state) || (__state >= __source_ref_increment); + } + + bool __try_lock_and_signal_until_signalled() noexcept { + std::uint64_t __oldState = __state_.load(std::memory_order_acquire); + do { + if ( __is_stop_requested(__oldState) ) + return false; + while ( __is_locked(__oldState) ) { + __spin_yield(); + __oldState = __state_.load(std::memory_order_acquire); + if ( __is_stop_requested(__oldState) ) + return false; + } + } while ( ! __state_.compare_exchange_weak(__oldState, __oldState | __stop_requested_flag | __locked_flag, + std::memory_order_acq_rel, std::memory_order_acquire) ); + return true; + } + + void __lock() noexcept { + auto __oldState = __state_.load(std::memory_order_relaxed); + do { + while ( __is_locked(__oldState) ) { + __spin_yield(); + __oldState = __state_.load(std::memory_order_relaxed); + } + } while ( ! __state_.compare_exchange_weak(__oldState, __oldState | __locked_flag, std::memory_order_acquire, + std::memory_order_relaxed) ); + } + + void __unlock() noexcept { __state_.fetch_sub(__locked_flag, std::memory_order_release); } + + void __unlock_and_increment_token_ref_count() noexcept { + __state_.fetch_sub(__locked_flag - __token_ref_increment, std::memory_order_release); + } + + void __unlock_and_decrement_token_ref_count() noexcept { + auto __oldState = __state_.fetch_sub(__locked_flag + __token_ref_increment, std::memory_order_acq_rel); + // Check if new state is less than __token_ref_increment which would + // indicate that this was the last reference. + if ( __oldState < (__locked_flag + __token_ref_increment + __token_ref_increment) ) { + delete this; + } + } + + static constexpr std::uint64_t __stop_requested_flag = 1u; + static constexpr std::uint64_t __locked_flag = 2u; + static constexpr std::uint64_t __token_ref_increment = 4u; + static constexpr std::uint64_t __source_ref_increment = static_cast(1u) << 33u; + + // bit 0 - stop-requested + // bit 1 - locked + // bits 2-32 - token ref count (31 bits) + // bits 33-63 - source ref count (31 bits) + std::atomic __state_{__source_ref_increment}; + __stop_callback_base* __head_ = nullptr; + std::thread::id __signallingThread_{}; +}; + + +//----------------------------------------------- +// forward declarations +//----------------------------------------------- + +class stop_source; +template +class stop_callback; + +// std::nostopstate +// - to initialize a stop_source without shared stop state +struct nostopstate_t { + explicit nostopstate_t() = default; +}; +inline constexpr nostopstate_t nostopstate{}; + + +//----------------------------------------------- +// stop_token +//----------------------------------------------- + +class stop_token { +public: + // construct: + // - TODO: explicit? + stop_token() noexcept : __state_(nullptr) {} + + // copy/move/assign/destroy: + stop_token(const stop_token& __it) noexcept : __state_(__it.__state_) { + if ( __state_ != nullptr ) { + __state_->__add_token_reference(); + } + } + + stop_token(stop_token&& __it) noexcept : __state_(std::exchange(__it.__state_, nullptr)) {} + + ~stop_token() { + if ( __state_ != nullptr ) { + __state_->__remove_token_reference(); + } + } + + stop_token& operator=(const stop_token& __it) noexcept { + if ( __state_ != __it.__state_ ) { + stop_token __tmp{__it}; + swap(__tmp); + } + return *this; + } + + stop_token& operator=(stop_token&& __it) noexcept { + stop_token __tmp{std::move(__it)}; + swap(__tmp); + return *this; + } + + void swap(stop_token& __it) noexcept { std::swap(__state_, __it.__state_); } + + // stop handling: + [[nodiscard]] bool stop_requested() const noexcept { + return __state_ != nullptr && __state_->__is_stop_requested(); + } + + [[nodiscard]] bool stop_possible() const noexcept { + return __state_ != nullptr && __state_->__is_stop_requestable(); + } + + [[nodiscard]] friend bool operator==(const stop_token& __a, const stop_token& __b) noexcept { + return __a.__state_ == __b.__state_; + } + [[nodiscard]] friend bool operator!=(const stop_token& __a, const stop_token& __b) noexcept { + return __a.__state_ != __b.__state_; + } + +private: + friend class stop_source; + template + friend class stop_callback; + + explicit stop_token(__stop_state* __state) noexcept : __state_(__state) { + if ( __state_ != nullptr ) { + __state_->__add_token_reference(); + } + } + + __stop_state* __state_; +}; + + +//----------------------------------------------- +// stop_source +//----------------------------------------------- + +class stop_source { +public: + stop_source() : __state_(new __stop_state()) {} + + explicit stop_source(nostopstate_t) noexcept : __state_(nullptr) {} + + ~stop_source() { + if ( __state_ != nullptr ) { + __state_->__remove_source_reference(); + } + } + + stop_source(const stop_source& __other) noexcept : __state_(__other.__state_) { + if ( __state_ != nullptr ) { + __state_->__add_source_reference(); + } + } + + stop_source(stop_source&& __other) noexcept : __state_(std::exchange(__other.__state_, nullptr)) {} + + stop_source& operator=(stop_source&& __other) noexcept { + stop_source __tmp{std::move(__other)}; + swap(__tmp); + return *this; + } + + stop_source& operator=(const stop_source& __other) noexcept { + if ( __state_ != __other.__state_ ) { + stop_source __tmp{__other}; + swap(__tmp); + } + return *this; + } + + [[nodiscard]] bool stop_requested() const noexcept { + return __state_ != nullptr && __state_->__is_stop_requested(); + } + + [[nodiscard]] bool stop_possible() const noexcept { return __state_ != nullptr; } + + bool request_stop() noexcept { + if ( __state_ != nullptr ) { + return __state_->__request_stop(); + } + return false; + } + + [[nodiscard]] stop_token get_token() const noexcept { return stop_token{__state_}; } + + void swap(stop_source& __other) noexcept { std::swap(__state_, __other.__state_); } + + [[nodiscard]] friend bool operator==(const stop_source& __a, const stop_source& __b) noexcept { + return __a.__state_ == __b.__state_; + } + [[nodiscard]] friend bool operator!=(const stop_source& __a, const stop_source& __b) noexcept { + return __a.__state_ != __b.__state_; + } + +private: + __stop_state* __state_; +}; + + +//----------------------------------------------- +// stop_callback +//----------------------------------------------- + +template +// requires Destructible<_Callback> && Invocable<_Callback> +class [[nodiscard]] stop_callback : private __stop_callback_base { +public: + using callback_type = _Callback; + + template, int> = 0> + // requires Constructible + explicit stop_callback(const stop_token& __token, + _CB&& __cb) noexcept(std::is_nothrow_constructible_v<_Callback, _CB>) + : __stop_callback_base{[](__stop_callback_base* __that) noexcept { + static_cast(__that)->__execute(); + }}, + __state_(nullptr), + __cb_(static_cast<_CB&&>(__cb)) { + if ( __token.__state_ != nullptr && __token.__state_->__try_add_callback(this, true) ) { + __state_ = __token.__state_; + } + } + + template, int> = 0> + // requires Constructible + explicit stop_callback(stop_token&& __token, _CB&& __cb) noexcept(std::is_nothrow_constructible_v<_Callback, _CB>) + : __stop_callback_base{[](__stop_callback_base* __that) noexcept { + static_cast(__that)->__execute(); + }}, + __state_(nullptr), + __cb_(static_cast<_CB&&>(__cb)) { + if ( __token.__state_ != nullptr && __token.__state_->__try_add_callback(this, false) ) { + __state_ = std::exchange(__token.__state_, nullptr); + } + } + + ~stop_callback() { +#ifdef SAFE + if ( __inExecute_.load() ) { + std::cerr << "*** OOPS: ~stop_callback() while callback executed\n"; + } +#endif + if ( __state_ != nullptr ) { + __state_->__remove_callback(this); + } + } + + stop_callback& operator=(const stop_callback&) = delete; + stop_callback& operator=(stop_callback&&) = delete; + stop_callback(const stop_callback&) = delete; + stop_callback(stop_callback&&) = delete; + +private: + void __execute() noexcept { + // Executed in a noexcept context + // If it throws then we call std::terminate(). +#ifdef SAFE + __inExecute_.store(true); + __cb_(); + __inExecute_.store(false); +#else + __cb_(); +#endif + } + + __stop_state* __state_; + _Callback __cb_; +#ifdef SAFE + std::atomic __inExecute_{false}; +#endif +}; + +template +stop_callback(stop_token, _Callback) -> stop_callback<_Callback>; + +} // namespace std