From f8547cb6f23f3d675552b1826132d12fcdd9c17b Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Thu, 6 Feb 2025 00:05:05 +0000 Subject: [PATCH 1/3] Fix Executors destruction deadlock if worker thread owns the Executor --- .../aws/core/utils/logging/LogMacros.h | 4 +- .../core/utils/threading/DefaultExecutor.h | 26 +++++- .../aws/core/utils/threading/ThreadTask.h | 7 +- .../utils/logging/DefaultCRTLogSystem.cpp | 9 +- .../utils/threading/DefaultExecutor.cpp | 91 ++++++++++++++----- .../utils/threading/PooledThreadExecutor.cpp | 12 ++- .../source/utils/threading/ThreadTask.cpp | 19 +++- .../utils/threading/DefaultExecutorTest.cpp | 69 ++++++++++++++ .../threading/PooledThreadExecutorTest.cpp | 83 +++++++++++++++++ 9 files changed, 287 insertions(+), 33 deletions(-) create mode 100644 tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h index ccc5e580db2..b1d530da622 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h @@ -76,7 +76,9 @@ } \ } - #define AWS_LOGSTREAM_FATAL(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Fatal, tag, streamExpression) + #define AWS_LOGSTREAM_FATAL(tag, streamExpression) \ + AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Fatal, tag, streamExpression) \ + AWS_LOG_FLUSH() #define AWS_LOGSTREAM_ERROR(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Error, tag, streamExpression) #define AWS_LOGSTREAM_WARN(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Warn, tag, streamExpression) #define AWS_LOGSTREAM_INFO(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Info, tag, streamExpression) diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h index 1e8c118b223..2e90799fda3 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h @@ -31,6 +31,28 @@ namespace Aws void WaitUntilStopped() override; protected: + class DefaultExecutorTask { + public: + DefaultExecutorTask(std::function&& task, DefaultExecutor* executor); + DefaultExecutorTask(const DefaultExecutorTask&) = delete; + DefaultExecutorTask& operator=(const DefaultExecutorTask&) = delete; + DefaultExecutorTask(DefaultExecutorTask&&) = default; + + /** + * Detaches the task from the executor + */ + void DoNotDetach(); + /** + * Starts task execution on a new thread + */ + static std::pair Launch(DefaultExecutorTask* task); + private: + void Execute(); + + std::function m_task; + DefaultExecutor* m_executor = nullptr; + }; + enum class State { Free, Locked, Shutdown @@ -38,7 +60,9 @@ namespace Aws bool SubmitToThread(std::function&&) override; void Detach(std::thread::id id); std::atomic m_state; - Aws::UnorderedMap m_threads; + + using DefaultExecutorTaskPair = std::pair; + Aws::UnorderedMap m_tasks; }; } // namespace Threading } // namespace Utils diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/ThreadTask.h b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/ThreadTask.h index 79326b3a151..1a5234b4a57 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/ThreadTask.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/ThreadTask.h @@ -33,15 +33,18 @@ namespace Aws ThreadTask(ThreadTask&&) = delete; ThreadTask& operator =(ThreadTask&&) = delete; - void StopProcessingWork(); + void StopProcessingWork(); + std::thread::id GetThreadId() const; + void DetachFromExecutor(); protected: void MainTaskRunner(); - private: + private: std::atomic m_continue; PooledThreadExecutor& m_executor; std::thread m_thread; + bool m_detached = false; }; } } diff --git a/src/aws-cpp-sdk-core/source/utils/logging/DefaultCRTLogSystem.cpp b/src/aws-cpp-sdk-core/source/utils/logging/DefaultCRTLogSystem.cpp index 27f39c2335f..46c36fbb8f4 100644 --- a/src/aws-cpp-sdk-core/source/utils/logging/DefaultCRTLogSystem.cpp +++ b/src/aws-cpp-sdk-core/source/utils/logging/DefaultCRTLogSystem.cpp @@ -5,13 +5,10 @@ #include #include +#include #include -#include #include #include -#include -#include -#include using namespace Aws::Utils; using namespace Aws::Utils::Logging; @@ -48,6 +45,10 @@ namespace Aws if (pLogSystem) { pLogSystem->vaLog(logLevel, subjectName, formatStr, args); + if (LogLevel::Fatal == logLevel) + { + AWS_LOG_FLUSH(); + } } } } diff --git a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp index fb79271edb4..18fe509366d 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ +#include #include #include @@ -10,17 +11,48 @@ using namespace Aws::Utils::Threading; -bool DefaultExecutor::SubmitToThread(std::function&& fx) +static const char DEFAULT_EXECUTOR_LOG_TAG[] = "DefaultExecutor"; + + +DefaultExecutor::DefaultExecutorTask::DefaultExecutorTask(std::function&& task, DefaultExecutor* executor) + : m_task(std::move(task)), m_executor(executor) { + assert(m_task); +} + +void DefaultExecutor::DefaultExecutorTask::Execute() { + assert(m_task); + m_task(); + if (m_executor) { + m_executor->Detach(std::this_thread::get_id()); + } + Aws::Delete(this); +} + +void DefaultExecutor::DefaultExecutorTask::DoNotDetach() { + m_executor = nullptr; +} + +std::pair DefaultExecutor::DefaultExecutorTask::Launch(DefaultExecutorTask* task) { + if(!task) { + assert(task); + AWS_LOGSTREAM_FATAL(DEFAULT_EXECUTOR_LOG_TAG, "Attempted to launch a nullptr task"); + return {}; + } + return {std::thread(&DefaultExecutorTask::Execute, task), task}; +} + +bool DefaultExecutor::SubmitToThread(std::function&& fx) { - // Generalized lambda capture is C++14, using std::bind as a workaround to force moving fx (instead of copying) - std::function main = std::bind( - [this](std::function& storedFx) - { - storedFx(); - Detach(std::this_thread::get_id()); - }, - std::move(fx) - ); + if(State::Shutdown == m_state.load()) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); + return false; + } + + auto* task = Aws::New(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), this); + if(!task) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to allocate async task!"); + return false; + } State expected; do @@ -28,9 +60,9 @@ bool DefaultExecutor::SubmitToThread(std::function&& fx) expected = State::Free; if(m_state.compare_exchange_strong(expected, State::Locked)) { - std::thread t(std::move(main)); - const auto id = t.get_id(); // copy the id before we std::move the thread - m_threads.emplace(id, std::move(t)); + DefaultExecutorTaskPair taskPair = DefaultExecutorTask::Launch(task); + const auto id = taskPair.first.get_id(); + m_tasks.emplace(id, std::move(taskPair)); m_state = State::Free; return true; } @@ -47,10 +79,10 @@ void DefaultExecutor::Detach(std::thread::id id) expected = State::Free; if(m_state.compare_exchange_strong(expected, State::Locked)) { - auto it = m_threads.find(id); - assert(it != m_threads.end()); - it->second.detach(); - m_threads.erase(it); + auto it = m_tasks.find(id); + assert(it != m_tasks.end()); + it->second.first.detach(); + m_tasks.erase(it); m_state = State::Free; return; } @@ -71,12 +103,25 @@ void DefaultExecutor::WaitUntilStopped() DefaultExecutor::~DefaultExecutor() { - WaitUntilStopped(); + DefaultExecutor::WaitUntilStopped(); // virtual call is resolved at compile time + const auto thisThreadId = std::this_thread::get_id(); + bool workerOwnsThis = false; - auto it = m_threads.begin(); - while(!m_threads.empty()) - { - it->second.join(); - it = m_threads.erase(it); + for(auto& taskItem : m_tasks) { + if (thisThreadId != taskItem.first) { + taskItem.second.first.join(); + } else { + workerOwnsThis = true; + taskItem.second.second->DoNotDetach(); // prevent task from self-detaching from Executor + } + } + + if(workerOwnsThis) { + std::thread toDetach = std::move(m_tasks[thisThreadId].first); + AWS_LOGSTREAM_WARN(DEFAULT_EXECUTOR_LOG_TAG, "DefaultExecutor is getting destructed from one of it's worker threads!"); + AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. + + m_tasks.clear(); + toDetach.detach(); } } diff --git a/src/aws-cpp-sdk-core/source/utils/threading/PooledThreadExecutor.cpp b/src/aws-cpp-sdk-core/source/utils/threading/PooledThreadExecutor.cpp index 0eec58d26f4..1845b67b123 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/PooledThreadExecutor.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/PooledThreadExecutor.cpp @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ +#include #include #include #include @@ -11,6 +12,8 @@ static const char* POOLED_CLASS_TAG = "PooledThreadExecutor"; using namespace Aws::Utils::Threading; +static const char POOLED_THREAD_EXECUTOR_LOG_TAG[] = "PooledThreadExecutor"; + PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) : m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy) { @@ -38,9 +41,16 @@ void PooledThreadExecutor::WaitUntilStopped() m_sync.ReleaseAll(); + const auto thisThreadId = std::this_thread::get_id(); for (auto threadTask : m_threadTaskHandles) { - Aws::Delete(threadTask); + if (threadTask->GetThreadId() != thisThreadId) { + Aws::Delete(threadTask); + } else { + AWS_LOGSTREAM_WARN(POOLED_THREAD_EXECUTOR_LOG_TAG, "PooledThreadExecutor is getting destructed from one of it's worker threads!"); + AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. + threadTask->DetachFromExecutor(); + } } m_threadTaskHandles.clear(); diff --git a/src/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp b/src/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp index 77121ca5717..18bb7543cad 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp @@ -16,7 +16,11 @@ ThreadTask::ThreadTask(PooledThreadExecutor& executor) : m_continue(true), m_exe ThreadTask::~ThreadTask() { StopProcessingWork(); - m_thread.join(); + if (!m_detached) { + m_thread.join(); + } else { + m_thread.detach(); + } } void ThreadTask::MainTaskRunner() @@ -38,9 +42,22 @@ void ThreadTask::MainTaskRunner() m_executor.m_sync.WaitOne(); } } + + if (m_detached) { + Aws::Delete(this); + } } void ThreadTask::StopProcessingWork() { m_continue = false; } + +std::thread::id ThreadTask::GetThreadId() const { + return m_thread.get_id(); +} + +void ThreadTask::DetachFromExecutor() { + m_detached = true; +} + diff --git a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp index 2cd2f051a33..fe840db54f2 100644 --- a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp +++ b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp @@ -42,3 +42,72 @@ TEST_F(DefaultExecutorTest, ThreadsDetachIfNotShuttingDown) i = i * 10; ASSERT_EQ(20, i.load()); } + +TEST_F(DefaultExecutorTest, WorkerThreadTheOnlyOwner) +{ + // If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock + auto pExec = Aws::MakeShared("WorkerThreadTheOnlyOwner"); + + std::mutex mtx; + std::condition_variable cv; + bool taskStarted = false; + bool taskCanContinue = false; + bool taskFinished = false; + + pExec->Submit([pExec, &mtx, &cv, &taskStarted, &taskCanContinue, &taskFinished]() mutable { + { + std::unique_lock lock(mtx); + taskStarted = true; + cv.notify_one(); + } + + if (!taskCanContinue) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskCanContinue]() { return taskCanContinue; }); + } + + ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!"; + ASSERT_TRUE(pExec); + ASSERT_EQ(1, pExec.use_count()); + + pExec.reset(); // focal point of the test + ASSERT_FALSE(pExec); + + { + std::unique_lock lock(mtx); + taskFinished = true; + cv.notify_one(); + } + }); + + if (!taskStarted) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; }); + } + ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!"; + if (!taskStarted) { + std::terminate(); // avoid hanging tests + } + + ASSERT_EQ(2, pExec.use_count()); + pExec.reset(); + ASSERT_FALSE(pExec); + // Now async task is the only owner of the Executor + + { + std::unique_lock lock(mtx); + taskCanContinue = true; + cv.notify_one(); + } + + if (!taskFinished) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskFinished]() { return taskFinished; }); + } + ASSERT_TRUE(taskFinished) << "Async task has not finished within 60 seconds!"; + if (!taskFinished) { + std::terminate(); // avoid hanging tests + } + + ASSERT_FALSE(pExec); // (just in case) executor pointer cannot magically resurrect. +} diff --git a/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp b/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp new file mode 100644 index 00000000000..b16ef622a6e --- /dev/null +++ b/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp @@ -0,0 +1,83 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +using namespace Aws::Utils::Threading; + +class PooledThreadExecutorTest : public Aws::Testing::AwsCppSdkGTestSuite +{ +}; + +TEST_F(PooledThreadExecutorTest, WorkerThreadTheOnlyOwner) +{ + // If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock + auto pExec = Aws::MakeShared("WorkerThreadTheOnlyOwner", 4); + + std::mutex mtx; + std::condition_variable cv; + bool taskStarted = false; + bool taskCanContinue = false; + bool taskFinished = false; + + pExec->Submit([pExec, &mtx, &cv, &taskStarted, &taskCanContinue, &taskFinished]() mutable { + { + std::unique_lock lock(mtx); + taskStarted = true; + cv.notify_one(); + } + + if (!taskCanContinue) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskCanContinue]() { return taskCanContinue; }); + } + + ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!"; + ASSERT_TRUE(pExec); + ASSERT_EQ(1, pExec.use_count()); + + pExec.reset(); // focal point of the test + ASSERT_FALSE(pExec); + + { + std::unique_lock lock(mtx); + taskFinished = true; + cv.notify_one(); + } + }); + + if (!taskStarted) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; }); + } + ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!"; + if (!taskStarted) { + std::terminate(); // avoid hanging tests + } + + ASSERT_EQ(2, pExec.use_count()); + pExec.reset(); + ASSERT_FALSE(pExec); + // Now async task is the only owner of the Executor + + { + std::unique_lock lock(mtx); + taskCanContinue = true; + cv.notify_one(); + } + + if (!taskFinished) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&taskFinished]() { return taskFinished; }); + } + ASSERT_TRUE(taskFinished) << "Async task has not finished within 60 seconds!"; + if (!taskFinished) { + std::terminate(); // avoid hanging tests + } + + ASSERT_FALSE(pExec); // (just in case) executor pointer cannot magically resurrect. +} From 816c2136409db871ab611ed5b8b9d8f25801130b Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Thu, 6 Feb 2025 21:16:01 +0000 Subject: [PATCH 2/3] Refactor DefaultExecutor into PIMPL --- .../core/utils/threading/DefaultExecutor.h | 91 +++---- .../utils/threading/DefaultExecutor.cpp | 242 +++++++++++------- .../utils/threading/DefaultExecutorTest.cpp | 60 ++--- .../threading/PooledThreadExecutorTest.cpp | 67 ++--- 4 files changed, 224 insertions(+), 236 deletions(-) diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h index 2e90799fda3..cdf3c4db6f5 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/threading/DefaultExecutor.h @@ -5,65 +5,40 @@ #pragma once -#include - #include +#include -#include #include -#include -#include - -namespace Aws -{ - namespace Utils - { - namespace Threading - { - /** - * Default Executor implementation. Simply spawns a thread and detaches it. - */ - class AWS_CORE_API DefaultExecutor : public Executor - { - public: - DefaultExecutor() : m_state(State::Free) {} - ~DefaultExecutor(); - - void WaitUntilStopped() override; - protected: - class DefaultExecutorTask { - public: - DefaultExecutorTask(std::function&& task, DefaultExecutor* executor); - DefaultExecutorTask(const DefaultExecutorTask&) = delete; - DefaultExecutorTask& operator=(const DefaultExecutorTask&) = delete; - DefaultExecutorTask(DefaultExecutorTask&&) = default; - /** - * Detaches the task from the executor - */ - void DoNotDetach(); - /** - * Starts task execution on a new thread - */ - static std::pair Launch(DefaultExecutorTask* task); - private: - void Execute(); - - std::function m_task; - DefaultExecutor* m_executor = nullptr; - }; - - enum class State - { - Free, Locked, Shutdown - }; - bool SubmitToThread(std::function&&) override; - void Detach(std::thread::id id); - std::atomic m_state; - - using DefaultExecutorTaskPair = std::pair; - Aws::UnorderedMap m_tasks; - }; - } // namespace Threading - } // namespace Utils -} // namespace Aws +namespace Aws { +namespace Utils { +namespace Threading { +/** + * Default Executor implementation. Simply spawns a thread and detaches it. + */ +class AWS_CORE_API DefaultExecutor : public Executor { + // API contract + public: + DefaultExecutor(); + DefaultExecutor(const DefaultExecutor& other); + DefaultExecutor& operator=(const DefaultExecutor&); + DefaultExecutor(DefaultExecutor&& other) = default; + DefaultExecutor& operator=(DefaultExecutor&&) = default; + + virtual ~DefaultExecutor(); + + void WaitUntilStopped() override; + + protected: + bool SubmitToThread(std::function&&) override; + + // implementation details + public: + struct impl; + + private: + std::shared_ptr pImpl; +}; +} // namespace Threading +} // namespace Utils +} // namespace Aws diff --git a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp index 18fe509366d..2294ba6dffa 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp @@ -9,119 +9,181 @@ #include -using namespace Aws::Utils::Threading; +namespace Aws { +namespace Utils { +namespace Threading { static const char DEFAULT_EXECUTOR_LOG_TAG[] = "DefaultExecutor"; +class DefaultExecutorTask; +using DefaultExecutorTaskPair = std::pair; -DefaultExecutor::DefaultExecutorTask::DefaultExecutorTask(std::function&& task, DefaultExecutor* executor) - : m_task(std::move(task)), m_executor(executor) { - assert(m_task); -} +struct DefaultExecutor::impl { + impl() = default; + impl(const impl&) = delete; + impl(impl&&) = delete; + impl& operator=(const impl&) = delete; + impl& operator=(impl&&) = delete; + + enum class State { Free, Locked, Shutdown }; + + ~impl(); + + bool SubmitToThread(const std::shared_ptr& spThis, std::function&& fx); + void WaitUntilStopped(); + void Detach(std::thread::id id); + + private: + std::atomic m_state{State::Free}; + Aws::UnorderedMap m_tasks; +}; -void DefaultExecutor::DefaultExecutorTask::Execute() { - assert(m_task); - m_task(); - if (m_executor) { - m_executor->Detach(std::this_thread::get_id()); +class DefaultExecutorTask { + public: + DefaultExecutorTask(std::function&& task, const std::shared_ptr& pExecutor) + : m_task(std::move(task)), m_executor(pExecutor) { + assert(m_task); } - Aws::Delete(this); -} + DefaultExecutorTask(const DefaultExecutorTask&) = delete; + DefaultExecutorTask& operator=(const DefaultExecutorTask&) = delete; + DefaultExecutorTask(DefaultExecutorTask&&) = default; + + /** + * Detaches the task from the executor + */ + void DoNotDetach() { m_executor.reset(); } + /** + * Starts task execution on a new thread + */ + static DefaultExecutorTaskPair Launch(DefaultExecutorTask* task) { + if (!task) { + AWS_LOGSTREAM_FATAL(DEFAULT_EXECUTOR_LOG_TAG, "Attempted to launch an empty task"); + return {}; + } + return {std::thread(&DefaultExecutorTask::Execute, task), task}; + } + + private: + void Execute() { + assert(m_task); + m_task(); + if (const auto spExecutor = m_executor.lock()) { + spExecutor->Detach(std::this_thread::get_id()); + } + Aws::Delete(this); + } + + std::function m_task; + std::weak_ptr m_executor; +}; -void DefaultExecutor::DefaultExecutorTask::DoNotDetach() { - m_executor = nullptr; +bool DefaultExecutor::SubmitToThread(std::function&& fx) { + if (!pImpl) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); + return false; + } + return pImpl->SubmitToThread(pImpl, std::move(fx)); } -std::pair DefaultExecutor::DefaultExecutorTask::Launch(DefaultExecutorTask* task) { - if(!task) { - assert(task); - AWS_LOGSTREAM_FATAL(DEFAULT_EXECUTOR_LOG_TAG, "Attempted to launch a nullptr task"); - return {}; +void DefaultExecutor::WaitUntilStopped() { + if (pImpl) { + pImpl->WaitUntilStopped(); } - return {std::thread(&DefaultExecutorTask::Execute, task), task}; } -bool DefaultExecutor::SubmitToThread(std::function&& fx) -{ - if(State::Shutdown == m_state.load()) { - AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); - return false; - } +bool DefaultExecutor::impl::SubmitToThread(const std::shared_ptr& spThis, std::function&& fx) { + if (State::Shutdown == m_state.load()) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); + return false; + } - auto* task = Aws::New(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), this); - if(!task) { - AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to allocate async task!"); - return false; + auto* task = Aws::New(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), spThis); + if (!task) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to allocate async task!"); + return false; + } + + State expected; + do { + expected = State::Free; + if (m_state.compare_exchange_strong(expected, State::Locked)) { + DefaultExecutorTaskPair taskPair = DefaultExecutorTask::Launch(task); + const auto id = taskPair.first.get_id(); + m_tasks.emplace(id, std::move(taskPair)); + m_state = State::Free; + return true; } + } while (expected != State::Shutdown); + return false; +} - State expected; - do - { - expected = State::Free; - if(m_state.compare_exchange_strong(expected, State::Locked)) - { - DefaultExecutorTaskPair taskPair = DefaultExecutorTask::Launch(task); - const auto id = taskPair.first.get_id(); - m_tasks.emplace(id, std::move(taskPair)); - m_state = State::Free; - return true; - } +void DefaultExecutor::impl::Detach(std::thread::id id) { + State expected; + do { + expected = State::Free; + if (m_state.compare_exchange_strong(expected, State::Locked)) { + auto it = m_tasks.find(id); + assert(it != m_tasks.end()); + it->second.first.detach(); + m_tasks.erase(it); + m_state = State::Free; + return; } - while(expected != State::Shutdown); - return false; + } while (expected != State::Shutdown); } -void DefaultExecutor::Detach(std::thread::id id) -{ - State expected; - do - { - expected = State::Free; - if(m_state.compare_exchange_strong(expected, State::Locked)) - { - auto it = m_tasks.find(id); - assert(it != m_tasks.end()); - it->second.first.detach(); - m_tasks.erase(it); - m_state = State::Free; - return; - } - } - while(expected != State::Shutdown); +void DefaultExecutor::impl::WaitUntilStopped() { + auto expected = State::Free; + while (!m_state.compare_exchange_strong(expected, State::Shutdown)) { + // spin while currently detaching threads finish + assert(expected == State::Locked); + expected = State::Free; + } } -void DefaultExecutor::WaitUntilStopped() -{ - auto expected = State::Free; - while(!m_state.compare_exchange_strong(expected, State::Shutdown)) - { - //spin while currently detaching threads finish - assert(expected == State::Locked); - expected = State::Free; - } +DefaultExecutor::DefaultExecutor() { pImpl = MakeShared(DEFAULT_EXECUTOR_LOG_TAG); } + +DefaultExecutor::DefaultExecutor(const DefaultExecutor& other) { + AWS_UNREFERENCED_PARAM(other); + pImpl = MakeShared(DEFAULT_EXECUTOR_LOG_TAG); } -DefaultExecutor::~DefaultExecutor() -{ - DefaultExecutor::WaitUntilStopped(); // virtual call is resolved at compile time - const auto thisThreadId = std::this_thread::get_id(); - bool workerOwnsThis = false; - - for(auto& taskItem : m_tasks) { - if (thisThreadId != taskItem.first) { - taskItem.second.first.join(); - } else { - workerOwnsThis = true; - taskItem.second.second->DoNotDetach(); // prevent task from self-detaching from Executor - } - } +DefaultExecutor& DefaultExecutor::operator=(const DefaultExecutor& other) { + if (this != &other) { + WaitUntilStopped(); + pImpl.reset(); + } + return *this; +} + +DefaultExecutor::~DefaultExecutor() { + DefaultExecutor::WaitUntilStopped(); // virtual call is resolved at compile time + pImpl.reset(); +} - if(workerOwnsThis) { - std::thread toDetach = std::move(m_tasks[thisThreadId].first); - AWS_LOGSTREAM_WARN(DEFAULT_EXECUTOR_LOG_TAG, "DefaultExecutor is getting destructed from one of it's worker threads!"); - AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. +DefaultExecutor::impl::~impl() { + const auto thisThreadId = std::this_thread::get_id(); + bool workerOwnsThis = false; - m_tasks.clear(); - toDetach.detach(); + for (auto& taskItem : m_tasks) { + if (thisThreadId != taskItem.first) { + taskItem.second.first.join(); + } else { + workerOwnsThis = true; + taskItem.second.second->DoNotDetach(); // prevent task from self-detaching from Executor } + } + + if (workerOwnsThis) { + std::thread toDetach = std::move(m_tasks[thisThreadId].first); + AWS_LOGSTREAM_WARN(DEFAULT_EXECUTOR_LOG_TAG, "DefaultExecutor is getting destructed from one of it's worker threads!"); + AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. + + m_tasks.clear(); + toDetach.detach(); + } } + +} // namespace Threading +} // namespace Utils +} // namespace Aws \ No newline at end of file diff --git a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp index fe840db54f2..4eba7a3a7a5 100644 --- a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp +++ b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp @@ -43,8 +43,7 @@ TEST_F(DefaultExecutorTest, ThreadsDetachIfNotShuttingDown) ASSERT_EQ(20, i.load()); } -TEST_F(DefaultExecutorTest, WorkerThreadTheOnlyOwner) -{ +TEST_F(DefaultExecutorTest, WorkerThreadTheOnlyOwner) { // If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock auto pExec = Aws::MakeShared("WorkerThreadTheOnlyOwner"); @@ -54,60 +53,37 @@ TEST_F(DefaultExecutorTest, WorkerThreadTheOnlyOwner) bool taskCanContinue = false; bool taskFinished = false; - pExec->Submit([pExec, &mtx, &cv, &taskStarted, &taskCanContinue, &taskFinished]() mutable { - { - std::unique_lock lock(mtx); - taskStarted = true; - cv.notify_one(); - } + auto sendSignal = [&mtx, &cv](bool& boolSignal) { + std::unique_lock lock(mtx); + boolSignal = true; + cv.notify_one(); + }; - if (!taskCanContinue) { + auto waitForSignal = [&mtx, &cv](bool& boolSignal, const Aws::String& msg) { + if (!boolSignal) { std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskCanContinue]() { return taskCanContinue; }); + cv.wait_for(lock, std::chrono::seconds(60), [&boolSignal]() { return boolSignal; }); } + ASSERT_TRUE(boolSignal) << msg; + }; - ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!"; + pExec->Submit([pExec, &sendSignal, &waitForSignal, &taskStarted, &taskCanContinue, &taskFinished]() mutable { + sendSignal(taskStarted); + waitForSignal(taskCanContinue, "Async task has not been allowed to continue within 60 seconds!"); ASSERT_TRUE(pExec); ASSERT_EQ(1, pExec.use_count()); - pExec.reset(); // focal point of the test ASSERT_FALSE(pExec); - - { - std::unique_lock lock(mtx); - taskFinished = true; - cv.notify_one(); - } + sendSignal(taskFinished); }); - if (!taskStarted) { - std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; }); - } - ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!"; - if (!taskStarted) { - std::terminate(); // avoid hanging tests - } - + waitForSignal(taskStarted, "Async task has not started within 60 seconds!"); ASSERT_EQ(2, pExec.use_count()); pExec.reset(); ASSERT_FALSE(pExec); // Now async task is the only owner of the Executor + sendSignal(taskCanContinue); - { - std::unique_lock lock(mtx); - taskCanContinue = true; - cv.notify_one(); - } - - if (!taskFinished) { - std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskFinished]() { return taskFinished; }); - } - ASSERT_TRUE(taskFinished) << "Async task has not finished within 60 seconds!"; - if (!taskFinished) { - std::terminate(); // avoid hanging tests - } - + waitForSignal(taskFinished, "Async task has not finished within 60 seconds!"); ASSERT_FALSE(pExec); // (just in case) executor pointer cannot magically resurrect. } diff --git a/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp b/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp index b16ef622a6e..30821ab75ff 100644 --- a/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp +++ b/tests/aws-cpp-sdk-core-tests/utils/threading/PooledThreadExecutorTest.cpp @@ -3,18 +3,16 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include #include +#include + #include using namespace Aws::Utils::Threading; -class PooledThreadExecutorTest : public Aws::Testing::AwsCppSdkGTestSuite -{ -}; +class PooledThreadExecutorTest : public Aws::Testing::AwsCppSdkGTestSuite {}; -TEST_F(PooledThreadExecutorTest, WorkerThreadTheOnlyOwner) -{ +TEST_F(PooledThreadExecutorTest, WorkerThreadTheOnlyOwner) { // If somehow the shared_ptr of the Executor gets owned by a worker thread of that Executor - Executor shutdown must not deadlock auto pExec = Aws::MakeShared("WorkerThreadTheOnlyOwner", 4); @@ -24,60 +22,37 @@ TEST_F(PooledThreadExecutorTest, WorkerThreadTheOnlyOwner) bool taskCanContinue = false; bool taskFinished = false; - pExec->Submit([pExec, &mtx, &cv, &taskStarted, &taskCanContinue, &taskFinished]() mutable { - { - std::unique_lock lock(mtx); - taskStarted = true; - cv.notify_one(); - } + auto sendSignal = [&mtx, &cv](bool& boolSignal) { + std::unique_lock lock(mtx); + boolSignal = true; + cv.notify_one(); + }; - if (!taskCanContinue) { + auto waitForSignal = [&mtx, &cv](bool& boolSignal, const Aws::String& msg) { + if (!boolSignal) { std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskCanContinue]() { return taskCanContinue; }); + cv.wait_for(lock, std::chrono::seconds(60), [&boolSignal]() { return boolSignal; }); } + ASSERT_TRUE(boolSignal) << msg; + }; - ASSERT_TRUE(taskCanContinue) << "Async task has not been allowed to continue withing 60 seconds!"; + pExec->Submit([pExec, &sendSignal, &waitForSignal, &taskStarted, &taskCanContinue, &taskFinished]() mutable { + sendSignal(taskStarted); + waitForSignal(taskCanContinue, "Async task has not been allowed to continue within 60 seconds!"); ASSERT_TRUE(pExec); ASSERT_EQ(1, pExec.use_count()); - pExec.reset(); // focal point of the test ASSERT_FALSE(pExec); - - { - std::unique_lock lock(mtx); - taskFinished = true; - cv.notify_one(); - } + sendSignal(taskFinished); }); - if (!taskStarted) { - std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskStarted]() { return taskStarted; }); - } - ASSERT_TRUE(taskStarted) << "Async task has not started within 60 seconds!"; - if (!taskStarted) { - std::terminate(); // avoid hanging tests - } - + waitForSignal(taskStarted, "Async task has not started within 60 seconds!"); ASSERT_EQ(2, pExec.use_count()); pExec.reset(); ASSERT_FALSE(pExec); // Now async task is the only owner of the Executor + sendSignal(taskCanContinue); - { - std::unique_lock lock(mtx); - taskCanContinue = true; - cv.notify_one(); - } - - if (!taskFinished) { - std::unique_lock lock(mtx); - cv.wait_for(lock, std::chrono::seconds(60), [&taskFinished]() { return taskFinished; }); - } - ASSERT_TRUE(taskFinished) << "Async task has not finished within 60 seconds!"; - if (!taskFinished) { - std::terminate(); // avoid hanging tests - } - + waitForSignal(taskFinished, "Async task has not finished within 60 seconds!"); ASSERT_FALSE(pExec); // (just in case) executor pointer cannot magically resurrect. } From c05068ec88a218367fad5e86c03c57d3b832fa54 Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Tue, 11 Feb 2025 20:55:18 +0000 Subject: [PATCH 3/3] Remove spin lock from thread executor, catch concurrent task submission and shutdown --- .../aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp | 2 +- .../aws/core/utils/logging/LogMacros.h | 4 +- .../config/EC2InstanceProfileConfigLoader.cpp | 2 +- .../utils/threading/DefaultExecutor.cpp | 120 +++++++++--------- .../utils/threading/DefaultExecutorTest.cpp | 68 +++++++++- .../s3/s3-crt/S3CrtServiceClientSourceInit.vm | 2 +- 6 files changed, 132 insertions(+), 66 deletions(-) diff --git a/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp b/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp index 45099e2ed00..9fcf255ebbe 100644 --- a/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp +++ b/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp @@ -361,7 +361,7 @@ void S3CrtClient::init(const S3Crt::ClientConfiguration& config, m_crtCredProvider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderDelegate({ std::bind([](const std::shared_ptr& provider) { if (provider == nullptr) { - AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider") + AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider"); return Aws::MakeShared(ALLOCATION_TAG); } AWSCredentials credentials = provider->GetAWSCredentials(); diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h index b1d530da622..3c811983742 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h @@ -77,8 +77,10 @@ } #define AWS_LOGSTREAM_FATAL(tag, streamExpression) \ + do { \ AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Fatal, tag, streamExpression) \ - AWS_LOG_FLUSH() + AWS_LOG_FLUSH() \ + } while(0) #define AWS_LOGSTREAM_ERROR(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Error, tag, streamExpression) #define AWS_LOGSTREAM_WARN(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Warn, tag, streamExpression) #define AWS_LOGSTREAM_INFO(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Info, tag, streamExpression) diff --git a/src/aws-cpp-sdk-core/source/config/EC2InstanceProfileConfigLoader.cpp b/src/aws-cpp-sdk-core/source/config/EC2InstanceProfileConfigLoader.cpp index 0b505b2c003..883d45cc695 100644 --- a/src/aws-cpp-sdk-core/source/config/EC2InstanceProfileConfigLoader.cpp +++ b/src/aws-cpp-sdk-core/source/config/EC2InstanceProfileConfigLoader.cpp @@ -49,7 +49,7 @@ namespace Aws this->credentialsValidUntilMillis = DateTime::Now().Millis(); if (!m_ec2metadataClient) { - AWS_LOGSTREAM_FATAL(EC2_INSTANCE_PROFILE_LOG_TAG, "EC2MetadataClient is a nullptr!") + AWS_LOGSTREAM_FATAL(EC2_INSTANCE_PROFILE_LOG_TAG, "EC2MetadataClient is a nullptr!"); return false; } auto credentialsStr = m_ec2metadataClient->GetDefaultCredentialsSecurely(); diff --git a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp index 2294ba6dffa..bd8c5682c1f 100644 --- a/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp +++ b/src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -34,14 +35,16 @@ struct DefaultExecutor::impl { void Detach(std::thread::id id); private: - std::atomic m_state{State::Free}; + std::mutex m_mutex; + std::condition_variable m_cv; + State m_state = State::Free; Aws::UnorderedMap m_tasks; }; class DefaultExecutorTask { public: DefaultExecutorTask(std::function&& task, const std::shared_ptr& pExecutor) - : m_task(std::move(task)), m_executor(pExecutor) { + : m_task(std::move(task)), m_impl(pExecutor) { assert(m_task); } DefaultExecutorTask(const DefaultExecutorTask&) = delete; @@ -51,7 +54,7 @@ class DefaultExecutorTask { /** * Detaches the task from the executor */ - void DoNotDetach() { m_executor.reset(); } + void DoNotDetach() { m_impl.reset(); } /** * Starts task execution on a new thread */ @@ -67,14 +70,14 @@ class DefaultExecutorTask { void Execute() { assert(m_task); m_task(); - if (const auto spExecutor = m_executor.lock()) { + if (const auto spExecutor = m_impl.lock()) { spExecutor->Detach(std::this_thread::get_id()); } Aws::Delete(this); } std::function m_task; - std::weak_ptr m_executor; + std::weak_ptr m_impl; }; bool DefaultExecutor::SubmitToThread(std::function&& fx) { @@ -92,52 +95,65 @@ void DefaultExecutor::WaitUntilStopped() { } bool DefaultExecutor::impl::SubmitToThread(const std::shared_ptr& spThis, std::function&& fx) { - if (State::Shutdown == m_state.load()) { - AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); - return false; - } - + // allocate outside critical section auto* task = Aws::New(DEFAULT_EXECUTOR_LOG_TAG, std::move(fx), spThis); if (!task) { AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to allocate async task!"); return false; } - State expected; - do { - expected = State::Free; - if (m_state.compare_exchange_strong(expected, State::Locked)) { - DefaultExecutorTaskPair taskPair = DefaultExecutorTask::Launch(task); - const auto id = taskPair.first.get_id(); - m_tasks.emplace(id, std::move(taskPair)); - m_state = State::Free; - return true; - } - } while (expected != State::Shutdown); - return false; + // lock and launch task + std::unique_lock lock(m_mutex); + if (State::Shutdown == m_state) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to submit async task: the executor is shut down!"); + Aws::Delete(task); + return false; + } + + DefaultExecutorTaskPair taskPair = DefaultExecutorTask::Launch(task); + const auto id = taskPair.first.get_id(); + m_tasks.emplace(id, std::move(taskPair)); + m_state = State::Free; + return true; } void DefaultExecutor::impl::Detach(std::thread::id id) { - State expected; - do { - expected = State::Free; - if (m_state.compare_exchange_strong(expected, State::Locked)) { - auto it = m_tasks.find(id); - assert(it != m_tasks.end()); - it->second.first.detach(); - m_tasks.erase(it); - m_state = State::Free; - return; - } - } while (expected != State::Shutdown); + std::unique_lock lock(m_mutex); + if (State::Shutdown == m_state) { + AWS_LOGSTREAM_ERROR(DEFAULT_EXECUTOR_LOG_TAG, "Unable to Detach async task: the executor is shut down!"); + } + auto it = m_tasks.find(id); + assert(it != m_tasks.end()); + it->second.first.detach(); + m_tasks.erase(it); + m_state = State::Free; + m_cv.notify_one(); } void DefaultExecutor::impl::WaitUntilStopped() { - auto expected = State::Free; - while (!m_state.compare_exchange_strong(expected, State::Shutdown)) { - // spin while currently detaching threads finish - assert(expected == State::Locked); - expected = State::Free; + std::unique_lock lock(m_mutex); + m_state = State::Shutdown; + + // if worker thread in m_tasks is actually this thread running the Executor Shutdown - detach it after waiting for all other threads + Crt::Optional toDetach = [&]() { + Crt::Optional ret; + const auto thisWorkerThreadIt = m_tasks.find(std::this_thread::get_id()); + if (thisWorkerThreadIt != m_tasks.end()) { + thisWorkerThreadIt->second.second->DoNotDetach(); + ret.emplace(std::move(thisWorkerThreadIt->second.first)); + m_tasks.erase(std::this_thread::get_id()); + } + return ret; + }(); + + // wait for all running tasks to finish and detach themselves + m_cv.wait(lock, [this]() { return m_tasks.empty(); }); + + if (toDetach) { + AWS_LOGSTREAM_WARN(DEFAULT_EXECUTOR_LOG_TAG, "DefaultExecutor is getting destructed from one of it's worker threads!"); + AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. + + toDetach->detach(); } } @@ -151,37 +167,19 @@ DefaultExecutor::DefaultExecutor(const DefaultExecutor& other) { DefaultExecutor& DefaultExecutor::operator=(const DefaultExecutor& other) { if (this != &other) { WaitUntilStopped(); - pImpl.reset(); + pImpl = MakeShared(DEFAULT_EXECUTOR_LOG_TAG); } return *this; } DefaultExecutor::~DefaultExecutor() { - DefaultExecutor::WaitUntilStopped(); // virtual call is resolved at compile time + DefaultExecutor::WaitUntilStopped(); pImpl.reset(); } DefaultExecutor::impl::~impl() { - const auto thisThreadId = std::this_thread::get_id(); - bool workerOwnsThis = false; - - for (auto& taskItem : m_tasks) { - if (thisThreadId != taskItem.first) { - taskItem.second.first.join(); - } else { - workerOwnsThis = true; - taskItem.second.second->DoNotDetach(); // prevent task from self-detaching from Executor - } - } - - if (workerOwnsThis) { - std::thread toDetach = std::move(m_tasks[thisThreadId].first); - AWS_LOGSTREAM_WARN(DEFAULT_EXECUTOR_LOG_TAG, "DefaultExecutor is getting destructed from one of it's worker threads!"); - AWS_LOGSTREAM_FLUSH(); // we are in UB zone and may crash soon. - - m_tasks.clear(); - toDetach.detach(); - } + WaitUntilStopped(); + assert(m_state == State::Shutdown && m_tasks.empty()); } } // namespace Threading diff --git a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp index 4eba7a3a7a5..c4e6c2fe067 100644 --- a/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp +++ b/tests/aws-cpp-sdk-core-tests/utils/threading/DefaultExecutorTest.cpp @@ -3,9 +3,10 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include #include #include +#include + #include #include @@ -87,3 +88,68 @@ TEST_F(DefaultExecutorTest, WorkerThreadTheOnlyOwner) { waitForSignal(taskFinished, "Async task has not finished within 60 seconds!"); ASSERT_FALSE(pExec); // (just in case) executor pointer cannot magically resurrect. } + +TEST_F(DefaultExecutorTest, ConcurrentTaskLaunchAndShutdown) { + // Try to force a race condition on task submission and shutdown + // one threads submit a new task, another starts the executor shutdown + std::mutex mtx; + std::condition_variable cv; + auto sendSignal = [&mtx, &cv](bool& boolSignal) { + std::unique_lock lock(mtx); + boolSignal = true; + cv.notify_all(); + }; + + auto waitForSignal = [&mtx, &cv](bool& boolSignal, const Aws::String& msg) { + if (!boolSignal) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&boolSignal]() { return boolSignal; }); + } + ASSERT_TRUE(boolSignal) << msg; + }; + + static const size_t TEST_REPEATS = 50; // test verifies race condition behavior, repeat it. + for (size_t testRepeat = 0; testRepeat < TEST_REPEATS; ++testRepeat) { + auto pExec = Aws::MakeShared("ConcurrentTaskLaunchAndShutdown"); + bool continueTest = false; + std::atomic someVar; + auto executorShutdown = [&](bool& executorShutdownReady) { + sendSignal(executorShutdownReady); + waitForSignal(continueTest, "Executor destroyer task did not resume after 60s!"); + + pExec->WaitUntilStopped(); + + return true; + }; + auto executorSubmitTask = [&](bool& executorSubmitterReady) { + sendSignal(executorSubmitterReady); + waitForSignal(continueTest, "Executor submit task was not resumed after 60s!"); + + pExec->Submit([&]() mutable { + // some new dummy async task; + ++someVar; + }); + + return true; + }; + bool executorShutdownReady = false; + std::future executorShutdownFuture = std::async(std::launch::async, executorShutdown, std::ref(executorShutdownReady)); + static const size_t PARALLEL_SUBMITTERS = 8; + Aws::Vector> submitterFutures(PARALLEL_SUBMITTERS); + bool executorSubmittersReady[PARALLEL_SUBMITTERS] = {false}; + for (size_t i = 0; i < PARALLEL_SUBMITTERS; ++i) { + submitterFutures[i] = std::async(std::launch::async, executorSubmitTask, std::ref(executorSubmittersReady[i])); + } + waitForSignal(executorShutdownReady, "Executor destroyer task did not get ready after 60s"); + for (size_t i = 0; i < PARALLEL_SUBMITTERS; ++i) { + waitForSignal(executorSubmittersReady[i], "Executor submitter tasks did not get ready after 60s"); + } + + sendSignal(continueTest); + + ASSERT_TRUE(executorShutdownFuture.get()); + for (size_t i = 0; i < PARALLEL_SUBMITTERS; ++i) { + ASSERT_TRUE(submitterFutures[i].get()); + } + } +} diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm index b8f8f385620..4ea5062dee7 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtServiceClientSourceInit.vm @@ -385,7 +385,7 @@ void ${className}::init(const ${clientConfigurationNamespace}::ClientConfigurati m_crtCredProvider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderDelegate({ std::bind([](const std::shared_ptr& provider) { if (provider == nullptr) { - AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider") + AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider"); return Aws::MakeShared(ALLOCATION_TAG); } AWSCredentials credentials = provider->GetAWSCredentials();