Skip to content

[libc++] fix counting_semaphore lost wakeups #79265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions libcxx/include/__atomic/atomic_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ __libcpp_atomic_monitor(__cxx_atomic_contention_t const volatile*);
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_EXPORTED_FROM_ABI void
__libcpp_atomic_wait(__cxx_atomic_contention_t const volatile*, __cxx_contention_t);

template <class _Atp, class _Fn>
template <class _Atp, class _BackoffTest>
struct __libcpp_atomic_wait_backoff_impl {
_Atp* __a;
_Fn __test_fn;
_Atp* __a_;
_BackoffTest __backoff_test_;
_LIBCPP_AVAILABILITY_SYNC
_LIBCPP_HIDE_FROM_ABI bool operator()(chrono::nanoseconds __elapsed) const {
if (__elapsed > chrono::microseconds(64)) {
auto const __monitor = std::__libcpp_atomic_monitor(__a);
if (__test_fn())
auto __monitor = std::__libcpp_atomic_monitor(__a_);
if (__backoff_test_(__monitor))
return true;
std::__libcpp_atomic_wait(__a, __monitor);
std::__libcpp_atomic_wait(__a_, __monitor);
} else if (__elapsed > chrono::microseconds(4))
__libcpp_thread_yield();
else {
Expand All @@ -62,10 +62,26 @@ struct __libcpp_atomic_wait_backoff_impl {
}
};

template <class _Atp, class _Fn>
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __cxx_atomic_wait(_Atp* __a, _Fn&& __test_fn) {
__libcpp_atomic_wait_backoff_impl<_Atp, __decay_t<_Fn> > __backoff_fn = {__a, __test_fn};
return std::__libcpp_thread_poll_with_backoff(__test_fn, __backoff_fn);
template <class _Atp, class _Poll, class _BackoffTest>
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool
__cxx_atomic_wait(_Atp* __a, _Poll&& __poll, _BackoffTest&& __backoff_test) {
__libcpp_atomic_wait_backoff_impl<_Atp, __decay_t<_BackoffTest> > __backoff_fn = {__a, __backoff_test};
return std::__libcpp_thread_poll_with_backoff(__poll, __backoff_fn);
}

template <class _Poll>
struct __libcpp_atomic_wait_poll_as_backoff_test {
_Poll __poll_;

_LIBCPP_AVAILABILITY_SYNC
_LIBCPP_HIDE_FROM_ABI bool operator()(__cxx_contention_t&) const { return __poll_(); }
};

template <class _Atp, class _Poll>
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __cxx_atomic_wait(_Atp* __a, _Poll&& __poll) {
__libcpp_atomic_wait_backoff_impl<_Atp, __libcpp_atomic_wait_poll_as_backoff_test<_Poll&> > __backoff_fn = {
__a, {__poll}};
return std::__libcpp_thread_poll_with_backoff(__poll, __backoff_fn);
}

#else // _LIBCPP_HAS_NO_THREADS
Expand Down
28 changes: 18 additions & 10 deletions libcxx/include/semaphore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ using binary_semaphore = counting_semaphore<1>;
#include <__assert> // all public C++ headers provide the assertion handler
#include <__atomic/atomic_base.h>
#include <__atomic/atomic_sync.h>
#include <__atomic/contention_t.h>
#include <__atomic/memory_order.h>
#include <__availability>
#include <__chrono/time_point.h>
Expand Down Expand Up @@ -94,31 +95,38 @@ public:
auto __old = __a_.fetch_add(__update, memory_order_release);
_LIBCPP_ASSERT_ARGUMENT_WITHIN_DOMAIN(
__update <= _LIBCPP_SEMAPHORE_MAX - __old, "update is greater than the expected value");

if (__old > 0) {
// Nothing to do
} else if (__update > 1)
if (__old == 0) {
__a_.notify_all();
else
__a_.notify_one();
}
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void acquire() {
auto const __test_fn = [this]() -> bool {
auto const __poll_fn = [this]() -> bool {
auto __old = __a_.load(memory_order_relaxed);
return (__old != 0) && __a_.compare_exchange_strong(__old, __old - 1, memory_order_acquire, memory_order_relaxed);
};
__cxx_atomic_wait(&__a_.__a_, __test_fn);
auto const __backoff_test = [this](__cxx_contention_t& __monitor) -> bool {
ptrdiff_t __old = __monitor;
bool __r = __try_acquire_impl(__old);
__monitor = static_cast<__cxx_contention_t>(__old);
return __r;
};
__cxx_atomic_wait(&__a_.__a_, __poll_fn, __backoff_test);
}
template <class _Rep, class _Period>
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool
try_acquire_for(chrono::duration<_Rep, _Period> const& __rel_time) {
if (__rel_time == chrono::duration<_Rep, _Period>::zero())
return try_acquire();
auto const __test_fn = [this]() { return try_acquire(); };
return std::__libcpp_thread_poll_with_backoff(__test_fn, __libcpp_timed_backoff_policy(), __rel_time);
auto const __poll_fn = [this]() { return try_acquire(); };
return std::__libcpp_thread_poll_with_backoff(__poll_fn, __libcpp_timed_backoff_policy(), __rel_time);
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool try_acquire() {
auto __old = __a_.load(memory_order_acquire);
return __try_acquire_impl(__old);
}

private:
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __try_acquire_impl(ptrdiff_t& __old) {
while (true) {
if (__old == 0)
return false;
Expand Down
64 changes: 64 additions & 0 deletions libcxx/test/std/thread/thread.semaphore/lost_wakeup.pass.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//===----------------------------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

// UNSUPPORTED: no-threads
// UNSUPPORTED: c++03, c++11, c++14, c++17

// XFAIL: availability-synchronization_library-missing

// This is a regression test for https://llvm.org/PR47013.

// <semaphore>

#include <barrier>
#include <semaphore>
#include <thread>
#include <vector>

#include "make_test_thread.h"

static std::counting_semaphore<> s(0);
static std::barrier<> b(8 + 1);

void acquire() {
for (int i = 0; i < 10'000; ++i) {
s.acquire();
b.arrive_and_wait();
}
}

void release() {
for (int i = 0; i < 10'000; ++i) {
s.release(1);
s.release(1);
s.release(1);
s.release(1);

s.release(1);
s.release(1);
s.release(1);
s.release(1);

b.arrive_and_wait();
}
}

int main(int, char**) {
for (int run = 0; run < 20; ++run) {
std::vector<std::thread> threads;
for (int i = 0; i < 8; ++i)
threads.push_back(support::make_test_thread(acquire));

threads.push_back(support::make_test_thread(release));

for (auto& thread : threads)
thread.join();
}

return 0;
}