diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c0f47a46b..c58e4685a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -119,12 +119,16 @@ jobs: run: ./bin/inv_wrapper.sh dev.cc faabric_tests - name: "Run tests" run: ./bin/inv_wrapper.sh tests + timeout-minutes: 10 dist-tests: if: github.event.pull_request.draft == false needs: [conan-cache] - runs-on: ubuntu-latest + runs-on: self-hosted env: + # Make a unique per-job cluster name, so that different instances can + # run in parallel + COMPOSE_PROJECT_NAME: faabric-gha-${{ github.job }}-${{ github.run_id }}-${{ github.run_attempt }} CONAN_CACHE_MOUNT_SOURCE: ~/.conan/ steps: # --- Code update --- @@ -136,9 +140,13 @@ jobs: run: ./dist-test/build.sh - name: "Run the distributed tests" run: ./dist-test/run.sh + timeout-minutes: 10 - name: "Print planner logs" if: always() run: docker compose logs planner + - name: "Chown all files to avoid docker-related root-owned files" + if: always() + run: sudo chown -R $(id -u):$(id -g) . examples: if: github.event.pull_request.draft == false diff --git a/include/faabric/atomic_queue/atomic_queue.h b/include/faabric/atomic_queue/atomic_queue.h new file mode 100644 index 000000000..904e25761 --- /dev/null +++ b/include/faabric/atomic_queue/atomic_queue.h @@ -0,0 +1,646 @@ +/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ +#ifndef ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED +#define ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED + +// Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. + +#include "defs.h" + +#include +#include +#include +#include +#include +#include + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace atomic_queue { + +using std::uint32_t; +using std::uint64_t; +using std::uint8_t; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace details { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template struct GetCacheLineIndexBits { static int constexpr value = 0; }; +template<> struct GetCacheLineIndexBits<256> { static int constexpr value = 8; }; +template<> struct GetCacheLineIndexBits<128> { static int constexpr value = 7; }; +template<> struct GetCacheLineIndexBits< 64> { static int constexpr value = 6; }; +template<> struct GetCacheLineIndexBits< 32> { static int constexpr value = 5; }; +template<> struct GetCacheLineIndexBits< 16> { static int constexpr value = 4; }; +template<> struct GetCacheLineIndexBits< 8> { static int constexpr value = 3; }; +template<> struct GetCacheLineIndexBits< 4> { static int constexpr value = 2; }; +template<> struct GetCacheLineIndexBits< 2> { static int constexpr value = 1; }; + +template +struct GetIndexShuffleBits { + static int constexpr bits = GetCacheLineIndexBits::value; + static unsigned constexpr min_size = 1u << (bits * 2); + static int constexpr value = array_size < min_size ? 0 : bits; +}; + +template +struct GetIndexShuffleBits { + static int constexpr value = 0; +}; + +// Multiple writers/readers contend on the same cache line when storing/loading elements at +// subsequent indexes, aka false sharing. For power of 2 ring buffer size it is possible to re-map +// the index in such a way that each subsequent element resides on another cache line, which +// minimizes contention. This is done by swapping the lowest order N bits (which are the index of +// the element within the cache line) with the next N bits (which are the index of the cache line) +// of the element index. +template +constexpr unsigned remap_index(unsigned index) noexcept { + unsigned constexpr mix_mask{(1u << BITS) - 1}; + unsigned const mix{(index ^ (index >> BITS)) & mix_mask}; + return index ^ mix ^ (mix << BITS); +} + +template<> +constexpr unsigned remap_index<0>(unsigned index) noexcept { + return index; +} + +template +constexpr T& map(T* elements, unsigned index) noexcept { + return elements[remap_index(index)]; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// Implement a "bit-twiddling hack" for finding the next power of 2 in either 32 bits or 64 bits +// in C++11 compatible constexpr functions. The library no longer maintains C++11 compatibility. + +// "Runtime" version for 32 bits +// --a; +// a |= a >> 1; +// a |= a >> 2; +// a |= a >> 4; +// a |= a >> 8; +// a |= a >> 16; +// ++a; + +template +constexpr T decrement(T x) noexcept { + return x - 1; +} + +template +constexpr T increment(T x) noexcept { + return x + 1; +} + +template +constexpr T or_equal(T x, unsigned u) noexcept { + return x | x >> u; +} + +template +constexpr T or_equal(T x, unsigned u, Args... rest) noexcept { + return or_equal(or_equal(x, u), rest...); +} + +constexpr uint32_t round_up_to_power_of_2(uint32_t a) noexcept { + return increment(or_equal(decrement(a), 1, 2, 4, 8, 16)); +} + +constexpr uint64_t round_up_to_power_of_2(uint64_t a) noexcept { + return increment(or_equal(decrement(a), 1, 2, 4, 8, 16, 32)); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +constexpr T nil() noexcept { +#if __cpp_lib_atomic_is_always_lock_free // Better compile-time error message requires C++17. + static_assert(std::atomic::is_always_lock_free, "Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types."); +#endif + return {}; +} + +template +inline void destroy_n(T* p, unsigned n) noexcept { + for(auto q = p + n; p != q;) + (p++)->~T(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace details + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +class AtomicQueueCommon { +protected: + // Put these on different cache lines to avoid false sharing between readers and writers. + alignas(CACHE_LINE_SIZE) std::atomic head_ = {}; + alignas(CACHE_LINE_SIZE) std::atomic tail_ = {}; + + // The special member functions are not thread-safe. + + AtomicQueueCommon() noexcept = default; + + AtomicQueueCommon(AtomicQueueCommon const& b) noexcept + : head_(b.head_.load(X)) + , tail_(b.tail_.load(X)) {} + + AtomicQueueCommon& operator=(AtomicQueueCommon const& b) noexcept { + head_.store(b.head_.load(X), X); + tail_.store(b.tail_.load(X), X); + return *this; + } + + void swap(AtomicQueueCommon& b) noexcept { + unsigned h = head_.load(X); + unsigned t = tail_.load(X); + head_.store(b.head_.load(X), X); + tail_.store(b.tail_.load(X), X); + b.head_.store(h, X); + b.tail_.store(t, X); + } + + template + static T do_pop_atomic(std::atomic& q_element) noexcept { + if(Derived::spsc_) { + for(;;) { + T element = q_element.load(A); + if(ATOMIC_QUEUE_LIKELY(element != NIL)) { + q_element.store(NIL, X); + return element; + } + if(Derived::maximize_throughput_) + spin_loop_pause(); + } + } + else { + for(;;) { + T element = q_element.exchange(NIL, A); // (2) The store to wait for. + if(ATOMIC_QUEUE_LIKELY(element != NIL)) + return element; + // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. + do + spin_loop_pause(); + while(Derived::maximize_throughput_ && q_element.load(X) == NIL); + } + } + } + + template + static void do_push_atomic(T element, std::atomic& q_element) noexcept { + assert(element != NIL); + if(Derived::spsc_) { + while(ATOMIC_QUEUE_UNLIKELY(q_element.load(X) != NIL)) + if(Derived::maximize_throughput_) + spin_loop_pause(); + q_element.store(element, R); + } + else { + for(T expected = NIL; ATOMIC_QUEUE_UNLIKELY(!q_element.compare_exchange_weak(expected, element, R, X)); expected = NIL) { + do + spin_loop_pause(); // (1) Wait for store (2) to complete. + while(Derived::maximize_throughput_ && q_element.load(X) != NIL); + } + } + } + + enum State : unsigned char { EMPTY, STORING, STORED, LOADING }; + + template + static T do_pop_any(std::atomic& state, T& q_element) noexcept { + if(Derived::spsc_) { + while(ATOMIC_QUEUE_UNLIKELY(state.load(A) != STORED)) + if(Derived::maximize_throughput_) + spin_loop_pause(); + T element{std::move(q_element)}; + state.store(EMPTY, R); + return element; + } + else { + for(;;) { + unsigned char expected = STORED; + if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_weak(expected, LOADING, A, X))) { + T element{std::move(q_element)}; + state.store(EMPTY, R); + return element; + } + // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. + do + spin_loop_pause(); + while(Derived::maximize_throughput_ && state.load(X) != STORED); + } + } + } + + template + static void do_push_any(U&& element, std::atomic& state, T& q_element) noexcept { + if(Derived::spsc_) { + while(ATOMIC_QUEUE_UNLIKELY(state.load(A) != EMPTY)) + if(Derived::maximize_throughput_) + spin_loop_pause(); + q_element = std::forward(element); + state.store(STORED, R); + } + else { + for(;;) { + unsigned char expected = EMPTY; + if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_weak(expected, STORING, A, X))) { + q_element = std::forward(element); + state.store(STORED, R); + return; + } + // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. + do + spin_loop_pause(); + while(Derived::maximize_throughput_ && state.load(X) != EMPTY); + } + } + } + +public: + template + bool try_push(T&& element) noexcept { + auto head = head_.load(X); + if(Derived::spsc_) { + if(static_cast(head - tail_.load(X)) >= static_cast(static_cast(*this).size_)) + return false; + head_.store(head + 1, X); + } + else { + do { + if(static_cast(head - tail_.load(X)) >= static_cast(static_cast(*this).size_)) + return false; + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + 1, X, X))); // This loop is not FIFO. + } + + static_cast(*this).do_push(std::forward(element), head); + return true; + } + + template + bool try_pop(T& element) noexcept { + auto tail = tail_.load(X); + if(Derived::spsc_) { + if(static_cast(head_.load(X) - tail) <= 0) + return false; + tail_.store(tail + 1, X); + } + else { + do { + if(static_cast(head_.load(X) - tail) <= 0) + return false; + } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + 1, X, X))); // This loop is not FIFO. + } + + element = static_cast(*this).do_pop(tail); + return true; + } + + template + void push(T&& element) noexcept { + unsigned head; + if(Derived::spsc_) { + head = head_.load(X); + head_.store(head + 1, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + head = head_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + static_cast(*this).do_push(std::forward(element), head); + } + + auto pop() noexcept { + unsigned tail; + if(Derived::spsc_) { + tail = tail_.load(X); + tail_.store(tail + 1, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + tail = tail_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + return static_cast(*this).do_pop(tail); + } + + bool was_empty() const noexcept { + return !was_size(); + } + + bool was_full() const noexcept { + return was_size() >= static_cast(static_cast(*this).size_); + } + + unsigned was_size() const noexcept { + // tail_ can be greater than head_ because of consumers doing pop, rather that try_pop, when the queue is empty. + return std::max(static_cast(head_.load(X) - tail_.load(X)), 0); + } + + unsigned capacity() const noexcept { + return static_cast(*this).size_; + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template(), bool MINIMIZE_CONTENTION = true, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> +class AtomicQueue : public AtomicQueueCommon> { + using Base = AtomicQueueCommon>; + friend Base; + + static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; + static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits)>::value; + static constexpr bool total_order_ = TOTAL_ORDER; + static constexpr bool spsc_ = SPSC; + static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; + + alignas(CACHE_LINE_SIZE) std::atomic elements_[size_]; + + T do_pop(unsigned tail) noexcept { + std::atomic& q_element = details::map(elements_, tail % size_); + return Base::template do_pop_atomic(q_element); + } + + void do_push(T element, unsigned head) noexcept { + std::atomic& q_element = details::map(elements_, head % size_); + Base::template do_push_atomic(element, q_element); + } + +public: + using value_type = T; + + AtomicQueue() noexcept { + assert(std::atomic{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types. + for(auto p = elements_, q = elements_ + size_; p != q; ++p) + p->store(NIL, X); + } + + AtomicQueue(AtomicQueue const&) = delete; + AtomicQueue& operator=(AtomicQueue const&) = delete; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +class AtomicQueue2 : public AtomicQueueCommon> { + using Base = AtomicQueueCommon>; + using State = typename Base::State; + friend Base; + + static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; + static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits::value; + static constexpr bool total_order_ = TOTAL_ORDER; + static constexpr bool spsc_ = SPSC; + static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; + + alignas(CACHE_LINE_SIZE) std::atomic states_[size_] = {}; + alignas(CACHE_LINE_SIZE) T elements_[size_] = {}; + + T do_pop(unsigned tail) noexcept { + unsigned index = details::remap_index(tail % size_); + return Base::template do_pop_any(states_[index], elements_[index]); + } + + template + void do_push(U&& element, unsigned head) noexcept { + unsigned index = details::remap_index(head % size_); + Base::template do_push_any(std::forward(element), states_[index], elements_[index]); + } + +public: + using value_type = T; + + AtomicQueue2() noexcept = default; + AtomicQueue2(AtomicQueue2 const&) = delete; + AtomicQueue2& operator=(AtomicQueue2 const&) = delete; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template, T NIL = details::nil(), bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> +class AtomicQueueB : private std::allocator_traits::template rebind_alloc>, + public AtomicQueueCommon> { + using AllocatorElements = typename std::allocator_traits::template rebind_alloc>; + using Base = AtomicQueueCommon>; + friend Base; + + static constexpr bool total_order_ = TOTAL_ORDER; + static constexpr bool spsc_ = SPSC; + static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; + + static constexpr auto ELEMENTS_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(std::atomic); + static_assert(ELEMENTS_PER_CACHE_LINE, "Unexpected ELEMENTS_PER_CACHE_LINE."); + + static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; + static_assert(SHUFFLE_BITS, "Unexpected SHUFFLE_BITS."); + + // AtomicQueueCommon members are stored into by readers and writers. + // Allocate these immutable members on another cache line which never gets invalidated by stores. + alignas(CACHE_LINE_SIZE) unsigned size_; + std::atomic* elements_; + + T do_pop(unsigned tail) noexcept { + std::atomic& q_element = details::map(elements_, tail & (size_ - 1)); + return Base::template do_pop_atomic(q_element); + } + + void do_push(T element, unsigned head) noexcept { + std::atomic& q_element = details::map(elements_, head & (size_ - 1)); + Base::template do_push_atomic(element, q_element); + } + +public: + using value_type = T; + using allocator_type = A; + + // The special member functions are not thread-safe. + + AtomicQueueB(unsigned size, A const& allocator = A{}) + : AllocatorElements(allocator) + , size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) + , elements_(AllocatorElements::allocate(size_)) { + assert(std::atomic{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types. + std::uninitialized_fill_n(elements_, size_, NIL); + assert(get_allocator() == allocator); // The standard requires the original and rebound allocators to manage the same state. + } + + AtomicQueueB(AtomicQueueB&& b) noexcept + : AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , elements_(std::exchange(b.elements_, nullptr)) + {} + + AtomicQueueB& operator=(AtomicQueueB&& b) noexcept { + b.swap(*this); + return *this; + } + + ~AtomicQueueB() noexcept { + if(elements_) { + details::destroy_n(elements_, size_); + AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that. + } + } + + A get_allocator() const noexcept { + return *this; // The standard requires implicit conversion between rebound allocators. + } + + void swap(AtomicQueueB& b) noexcept { + using std::swap; + swap(static_cast(*this), static_cast(b)); + Base::swap(b); + swap(size_, b.size_); + swap(elements_, b.elements_); + } + + friend void swap(AtomicQueueB& a, AtomicQueueB& b) noexcept { + a.swap(b); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> +class AtomicQueueB2 : private std::allocator_traits::template rebind_alloc, + public AtomicQueueCommon> { + using StorageAllocator = typename std::allocator_traits::template rebind_alloc; + using Base = AtomicQueueCommon>; + using State = typename Base::State; + using AtomicState = std::atomic; + friend Base; + + static constexpr bool total_order_ = TOTAL_ORDER; + static constexpr bool spsc_ = SPSC; + static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; + + // AtomicQueueCommon members are stored into by readers and writers. + // Allocate these immutable members on another cache line which never gets invalidated by stores. + alignas(CACHE_LINE_SIZE) unsigned size_; + AtomicState* states_; + T* elements_; + + static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(AtomicState); + static_assert(STATES_PER_CACHE_LINE, "Unexpected STATES_PER_CACHE_LINE."); + + static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; + static_assert(SHUFFLE_BITS, "Unexpected SHUFFLE_BITS."); + + T do_pop(unsigned tail) noexcept { + unsigned index = details::remap_index(tail & (size_ - 1)); + return Base::template do_pop_any(states_[index], elements_[index]); + } + + template + void do_push(U&& element, unsigned head) noexcept { + unsigned index = details::remap_index(head & (size_ - 1)); + Base::template do_push_any(std::forward(element), states_[index], elements_[index]); + } + + template + U* allocate_() { + U* p = reinterpret_cast(StorageAllocator::allocate(size_ * sizeof(U))); + assert(reinterpret_cast(p) % alignof(U) == 0); // Allocated storage must be suitably aligned for U. + return p; + } + + template + void deallocate_(U* p) noexcept { + StorageAllocator::deallocate(reinterpret_cast(p), size_ * sizeof(U)); // TODO: This must be noexcept, static_assert that. + } + +public: + using value_type = T; + using allocator_type = A; + + // The special member functions are not thread-safe. + + AtomicQueueB2(unsigned size, A const& allocator = A{}) + : StorageAllocator(allocator) + , size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) + , states_(allocate_()) + , elements_(allocate_()) { + std::uninitialized_fill_n(states_, size_, Base::EMPTY); + A a = get_allocator(); + assert(a == allocator); // The standard requires the original and rebound allocators to manage the same state. + for(auto p = elements_, q = elements_ + size_; p < q; ++p) + std::allocator_traits::construct(a, p); + } + + AtomicQueueB2(AtomicQueueB2&& b) noexcept + : StorageAllocator(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , states_(std::exchange(b.states_, nullptr)) + , elements_(std::exchange(b.elements_, nullptr)) + {} + + AtomicQueueB2& operator=(AtomicQueueB2&& b) noexcept { + b.swap(*this); + return *this; + } + + ~AtomicQueueB2() noexcept { + if(elements_) { + A a = get_allocator(); + for(auto p = elements_, q = elements_ + size_; p < q; ++p) + std::allocator_traits::destroy(a, p); + deallocate_(elements_); + details::destroy_n(states_, size_); + deallocate_(states_); + } + } + + A get_allocator() const noexcept { + return *this; // The standard requires implicit conversion between rebound allocators. + } + + void swap(AtomicQueueB2& b) noexcept { + using std::swap; + swap(static_cast(*this), static_cast(b)); + Base::swap(b); + swap(size_, b.size_); + swap(states_, b.states_); + swap(elements_, b.elements_); + } + + friend void swap(AtomicQueueB2& a, AtomicQueueB2& b) noexcept { + a.swap(b); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +struct RetryDecorator : Queue { + using T = typename Queue::value_type; + + using Queue::Queue; + + void push(T element) noexcept { + while(!this->try_push(element)) + spin_loop_pause(); + } + + T pop() noexcept { + T element; + while(!this->try_pop(element)) + spin_loop_pause(); + return element; + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace atomic_queue + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#endif // ATOMIC_QUEUE_ATOMIC_QUEUE_H_INCLUDED diff --git a/include/faabric/atomic_queue/atomic_queue_mutex.h b/include/faabric/atomic_queue/atomic_queue_mutex.h new file mode 100644 index 000000000..ea6731f7c --- /dev/null +++ b/include/faabric/atomic_queue/atomic_queue_mutex.h @@ -0,0 +1,92 @@ +/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ +#ifndef ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED +#define ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED + +// Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. + +#include "atomic_queue.h" +#include "spinlock.h" + +#include +#include + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace atomic_queue { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +struct ScopedLockType { + using type = typename M::scoped_lock; +}; + +template<> +struct ScopedLockType { + using type = std::unique_lock; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +template +class AtomicQueueMutexT { + static constexpr unsigned size_ = MINIMIZE_CONTENTION ? details::round_up_to_power_of_2(SIZE) : SIZE; + + Mutex mutex_; + alignas(CACHE_LINE_SIZE) unsigned head_ = 0; + alignas(CACHE_LINE_SIZE) unsigned tail_ = 0; + alignas(CACHE_LINE_SIZE) T q_[size_] = {}; + + static constexpr int SHUFFLE_BITS = details::GetIndexShuffleBits::value; + + using ScopedLock = typename ScopedLockType::type; + +public: + using value_type = T; + + template + bool try_push(U&& element) noexcept { + ScopedLock lock(mutex_); + if(ATOMIC_QUEUE_LIKELY(head_ - tail_ < size_)) { + q_[details::remap_index(head_ % size_)] = std::forward(element); + ++head_; + return true; + } + return false; + } + + bool try_pop(T& element) noexcept { + ScopedLock lock(mutex_); + if(ATOMIC_QUEUE_LIKELY(head_ != tail_)) { + element = std::move(q_[details::remap_index(tail_ % size_)]); + ++tail_; + return true; + } + return false; + } + + bool was_empty() const noexcept { + return static_cast(head_ - tail_) <= 0; + } + + bool was_full() const noexcept { + return static_cast(head_ - tail_) >= static_cast(size_); + } +}; + +template +using AtomicQueueMutex = AtomicQueueMutexT; + +template +using AtomicQueueSpinlock = AtomicQueueMutexT; + +// template +// using AtomicQueueSpinlockHle = AtomicQueueMutexT; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace atomic_queue + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#endif // ATOMIC_QUEUE_ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED diff --git a/include/faabric/atomic_queue/barrier.h b/include/faabric/atomic_queue/barrier.h new file mode 100644 index 000000000..23ca0dccc --- /dev/null +++ b/include/faabric/atomic_queue/barrier.h @@ -0,0 +1,38 @@ +/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ +#ifndef BARRIER_H_INCLUDED +#define BARRIER_H_INCLUDED + +// Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. + +#include "defs.h" + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace atomic_queue { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class Barrier { + std::atomic counter_ = {}; + +public: + void wait() noexcept { + counter_.fetch_add(1, std::memory_order_acquire); + while(counter_.load(std::memory_order_relaxed)) + spin_loop_pause(); + } + + void release(unsigned expected_counter) noexcept { + while(expected_counter != counter_.load(std::memory_order_relaxed)) + spin_loop_pause(); + counter_.store(0, std::memory_order_release); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace atomic_queue + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#endif // BARRIER_H_INCLUDED diff --git a/include/faabric/atomic_queue/defs.h b/include/faabric/atomic_queue/defs.h new file mode 100644 index 000000000..4601b1d46 --- /dev/null +++ b/include/faabric/atomic_queue/defs.h @@ -0,0 +1,99 @@ +/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ +#ifndef ATOMIC_QUEUE_DEFS_H_INCLUDED +#define ATOMIC_QUEUE_DEFS_H_INCLUDED + +// Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. + +#include + +#if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86) +#include +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; +static inline void spin_loop_pause() noexcept { + _mm_pause(); +} +} // namespace atomic_queue +#elif defined(__arm__) || defined(__aarch64__) || defined(_M_ARM64) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; +static inline void spin_loop_pause() noexcept { +#if (defined(__ARM_ARCH_6K__) || \ + defined(__ARM_ARCH_6Z__) || \ + defined(__ARM_ARCH_6ZK__) || \ + defined(__ARM_ARCH_6T2__) || \ + defined(__ARM_ARCH_7__) || \ + defined(__ARM_ARCH_7A__) || \ + defined(__ARM_ARCH_7R__) || \ + defined(__ARM_ARCH_7M__) || \ + defined(__ARM_ARCH_7S__) || \ + defined(__ARM_ARCH_8A__) || \ + defined(__aarch64__)) + asm volatile ("yield" ::: "memory"); +#elif defined(_M_ARM64) + __yield(); +#else + asm volatile ("nop" ::: "memory"); +#endif +} +} // namespace atomic_queue +#elif defined(__ppc64__) || defined(__powerpc64__) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 128; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept { + asm volatile("or 31,31,31 # very low priority"); // TODO: Review and benchmark that this is the right instruction. +} +} // namespace atomic_queue +#elif defined(__s390x__) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 256; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept {} // TODO: Find the right instruction to use here, if any. +} // namespace atomic_queue +#elif defined(__riscv) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; +static inline void spin_loop_pause() noexcept { + asm volatile (".insn i 0x0F, 0, x0, x0, 0x010"); +} +} // namespace atomic_queue +#else +#ifdef _MSC_VER +#pragma message("Unknown CPU architecture. Using L1 cache line size of 64 bytes and no spinloop pause instruction.") +#else +#warning "Unknown CPU architecture. Using L1 cache line size of 64 bytes and no spinloop pause instruction." +#endif +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept {} +} // namespace atomic_queue +#endif + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace atomic_queue { + +#if defined(__GNUC__) || defined(__clang__) +#define ATOMIC_QUEUE_LIKELY(expr) __builtin_expect(static_cast(expr), 1) +#define ATOMIC_QUEUE_UNLIKELY(expr) __builtin_expect(static_cast(expr), 0) +#define ATOMIC_QUEUE_NOINLINE __attribute__((noinline)) +#else +#define ATOMIC_QUEUE_LIKELY(expr) (expr) +#define ATOMIC_QUEUE_UNLIKELY(expr) (expr) +#define ATOMIC_QUEUE_NOINLINE +#endif + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +auto constexpr A = std::memory_order_acquire; +auto constexpr R = std::memory_order_release; +auto constexpr X = std::memory_order_relaxed; +auto constexpr C = std::memory_order_seq_cst; +auto constexpr AR = std::memory_order_acq_rel; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace atomic_queue + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#endif // ATOMIC_QUEUE_DEFS_H_INCLUDED diff --git a/include/faabric/atomic_queue/spinlock.h b/include/faabric/atomic_queue/spinlock.h new file mode 100644 index 000000000..802ea4676 --- /dev/null +++ b/include/faabric/atomic_queue/spinlock.h @@ -0,0 +1,203 @@ +/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil; tab-width: 4 -*- */ +#ifndef ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED +#define ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED + +// Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE. + +#include "defs.h" + +#include +#include +#include + +#include + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +namespace atomic_queue { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class Spinlock { + pthread_spinlock_t s_; + +public: + using scoped_lock = std::lock_guard; + + Spinlock() noexcept { + if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_init(&s_, 0))) + std::abort(); + } + + Spinlock(Spinlock const&) = delete; + Spinlock& operator=(Spinlock const&) = delete; + + ~Spinlock() noexcept { + ::pthread_spin_destroy(&s_); + } + + void lock() noexcept { + if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_lock(&s_))) + std::abort(); + } + + void unlock() noexcept { + if(ATOMIC_QUEUE_UNLIKELY(::pthread_spin_unlock(&s_))) + std::abort(); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class TicketSpinlock { + alignas(CACHE_LINE_SIZE) std::atomic ticket_{0}; + alignas(CACHE_LINE_SIZE) std::atomic next_{0}; + +public: + class LockGuard { + TicketSpinlock* const m_; + unsigned const ticket_; + public: + LockGuard(TicketSpinlock& m) noexcept + : m_(&m) + , ticket_(m.lock()) + {} + + LockGuard(LockGuard const&) = delete; + LockGuard& operator=(LockGuard const&) = delete; + + ~LockGuard() noexcept { + m_->unlock(ticket_); + } + }; + + using scoped_lock = LockGuard; + + TicketSpinlock() noexcept = default; + TicketSpinlock(TicketSpinlock const&) = delete; + TicketSpinlock& operator=(TicketSpinlock const&) = delete; + + ATOMIC_QUEUE_NOINLINE unsigned lock() noexcept { + auto ticket = ticket_.fetch_add(1, std::memory_order_relaxed); + for(;;) { + auto position = ticket - next_.load(std::memory_order_acquire); + if(ATOMIC_QUEUE_LIKELY(!position)) + break; + do + spin_loop_pause(); + while(--position); + } + return ticket; + } + + void unlock() noexcept { + unlock(next_.load(std::memory_order_relaxed) + 1); + } + + void unlock(unsigned ticket) noexcept { + next_.store(ticket + 1, std::memory_order_release); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class UnfairSpinlock { + std::atomic lock_{0}; + +public: + using scoped_lock = std::lock_guard; + + UnfairSpinlock(UnfairSpinlock const&) = delete; + UnfairSpinlock& operator=(UnfairSpinlock const&) = delete; + + void lock() noexcept { + for(;;) { + if(!lock_.load(std::memory_order_relaxed) && !lock_.exchange(1, std::memory_order_acquire)) + return; + spin_loop_pause(); + } + } + + void unlock() noexcept { + lock_.store(0, std::memory_order_release); + } +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// class SpinlockHle { +// int lock_ = 0; + +// #ifdef __gcc__ +// static constexpr int HLE_ACQUIRE = __ATOMIC_HLE_ACQUIRE; +// static constexpr int HLE_RELEASE = __ATOMIC_HLE_RELEASE; +// #else +// static constexpr int HLE_ACQUIRE = 0; +// static constexpr int HLE_RELEASE = 0; +// #endif + +// public: +// using scoped_lock = std::lock_guard; + +// SpinlockHle(SpinlockHle const&) = delete; +// SpinlockHle& operator=(SpinlockHle const&) = delete; + +// void lock() noexcept { +// for(int expected = 0; +// !__atomic_compare_exchange_n(&lock_, &expected, 1, false, __ATOMIC_ACQUIRE | HLE_ACQUIRE, __ATOMIC_RELAXED); +// expected = 0) +// spin_loop_pause(); +// } + +// void unlock() noexcept { +// __atomic_store_n(&lock_, 0, __ATOMIC_RELEASE | HLE_RELEASE); +// } +// }; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// class AdaptiveMutex { +// pthread_mutex_t m_; + +// public: +// using scoped_lock = std::lock_guard; + +// AdaptiveMutex() noexcept { +// pthread_mutexattr_t a; +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_init(&a))) +// std::abort(); +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_settype(&a, PTHREAD_MUTEX_ADAPTIVE_NP))) +// std::abort(); +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_init(&m_, &a))) +// std::abort(); +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutexattr_destroy(&a))) +// std::abort(); +// m_.__data.__spins = 32767; +// } + +// AdaptiveMutex(AdaptiveMutex const&) = delete; +// AdaptiveMutex& operator=(AdaptiveMutex const&) = delete; + +// ~AdaptiveMutex() noexcept { +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_destroy(&m_))) +// std::abort(); +// } + +// void lock() noexcept { +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_lock(&m_))) +// std::abort(); +// } + +// void unlock() noexcept { +// if(ATOMIC_QUEUE_UNLIKELY(::pthread_mutex_unlock(&m_))) +// std::abort(); +// } +// }; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace atomic_queue + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#endif // ATOMIC_QUEUE_SPIN_LOCK_H_INCLUDED diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index 465b0dacf..930b4b463 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -26,7 +26,7 @@ namespace faabric::mpi { // as the broker already has mocking capabilities std::vector getMpiMockedMessages(int sendRank); -typedef faabric::util::FixedCapacityQueue InMemoryMpiQueue; +typedef faabric::util::SpinLockQueue InMemoryMpiQueue; class MpiWorld { @@ -186,8 +186,6 @@ class MpiWorld std::shared_ptr getLocalQueue(int sendRank, int recvRank); - long getLocalQueueSize(int sendRank, int recvRank); - void overrideHost(const std::string& newHost); double getWTime(); diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 6d89aab18..f04b50939 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -4,9 +4,12 @@ #include #include +#include #include #include #include +// TODO: install properly rather than just copy headers in the source tree +#include #define DEFAULT_QUEUE_TIMEOUT_MS 5000 #define DEFAULT_QUEUE_SIZE 1024 @@ -215,6 +218,80 @@ class FixedCapacityQueue moodycamel::BlockingReaderWriterCircularBuffer mq; }; +// High-performance, spin-lock single-producer, single-consumer queue. This +// queue spin-locks, so use at your own risk! +template +class SpinLockQueue +{ + public: + void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + while (!mq.push(value)) { + ; + }; + } + + T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + T value; + + while (!mq.pop(value)) { + ; + } + + return value; + } + + long size() + { + throw std::runtime_error("Size for fast queue unimplemented!"); + } + + void drain() + { + while (mq.pop()) { + ; + } + } + + void reset() { ; } + + private: + boost::lockfree::spsc_queue> mq; +}; + +template +class NewQueue +{ + public: + void enqueue(T& value, long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + mq.push(value); + } + + T dequeue(long timeoutMs = DEFAULT_QUEUE_TIMEOUT_MS) + { + return mq.pop(); + } + + long size() + { + throw std::runtime_error("Size for fast queue unimplemented!"); + } + + void drain() + { + while (mq.pop()) { + ; + } + } + + void reset() { ; } + + private: + atomic_queue::AtomicQueue2 mq; +}; + class TokenPool { public: diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 4c707432f..02f1b5c70 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -2015,13 +2015,6 @@ int MpiWorld::getIndexForRanks(int sendRank, int recvRank) const return index; } -long MpiWorld::getLocalQueueSize(int sendRank, int recvRank) -{ - const std::shared_ptr& queue = - getLocalQueue(sendRank, recvRank); - return queue->size(); -} - double MpiWorld::getWTime() { double t = faabric::util::getTimeDiffMillis(creationTime); diff --git a/tests/test/mpi/test_multiple_mpi_worlds.cpp b/tests/test/mpi/test_multiple_mpi_worlds.cpp index 2ff2b835c..3ca7f932f 100644 --- a/tests/test/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/test/mpi/test_multiple_mpi_worlds.cpp @@ -154,29 +154,6 @@ TEST_CASE_METHOD(MultiWorldMpiTestFixture, worldB.send( rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size()); - SECTION("Test queueing") - { - // Check for world A - REQUIRE(worldA.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldA.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA1, 2) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA2, 2) == 0); - const std::shared_ptr& queueA2 = - worldA.getLocalQueue(rankA1, rankA2); - MpiMessage actualMessage = queueA2->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - - // Check for world B - REQUIRE(worldB.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldB.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA1, 2) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA2, 2) == 0); - const std::shared_ptr& queueA2B = - worldB.getLocalQueue(rankA1, rankA2); - actualMessage = queueA2B->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - } - SECTION("Test recv") { MPI_Status status{};