Skip to content

Fix DefaultExecutor destruction deadlock if worker thread owns the Executor #3288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void S3CrtClient::init(const S3Crt::ClientConfiguration& config,
m_crtCredProvider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderDelegate({
std::bind([](const std::shared_ptr<AWSCredentialsProvider>& provider) {
if (provider == nullptr) {
AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider")
AWS_LOGSTREAM_FATAL(ALLOCATION_TAG, "No provider provided, using anonymous provider");
return Aws::MakeShared<Aws::Crt::Auth::Credentials>(ALLOCATION_TAG);
}
AWSCredentials credentials = provider->GetAWSCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@
} \
}

#define AWS_LOGSTREAM_FATAL(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Fatal, tag, streamExpression)
#define AWS_LOGSTREAM_FATAL(tag, streamExpression) \
do { \
AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Fatal, tag, streamExpression) \
AWS_LOG_FLUSH() \
} while(0)
#define AWS_LOGSTREAM_ERROR(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Error, tag, streamExpression)
#define AWS_LOGSTREAM_WARN(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Warn, tag, streamExpression)
#define AWS_LOGSTREAM_INFO(tag, streamExpression) AWS_LOGSTREAM(Aws::Utils::Logging::LogLevel::Info, tag, streamExpression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,40 @@

#pragma once

#include <aws/core/utils/threading/Executor.h>

#include <aws/core/utils/memory/stl/AWSMap.h>
#include <aws/core/utils/threading/Executor.h>

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>

namespace Aws
{
namespace Utils
{
namespace Threading
{
/**
* Default Executor implementation. Simply spawns a thread and detaches it.
*/
class AWS_CORE_API DefaultExecutor : public Executor
{
public:
DefaultExecutor() : m_state(State::Free) {}
~DefaultExecutor();

void WaitUntilStopped() override;
protected:
enum class State
{
Free, Locked, Shutdown
};
bool SubmitToThread(std::function<void()>&&) override;
void Detach(std::thread::id id);
std::atomic<State> m_state;
Aws::UnorderedMap<std::thread::id, std::thread> m_threads;
};
} // namespace Threading
} // namespace Utils
} // namespace Aws

namespace Aws {
namespace Utils {
namespace Threading {
/**
* Default Executor implementation. Simply spawns a thread and detaches it.
*/
class AWS_CORE_API DefaultExecutor : public Executor {
// API contract
public:
DefaultExecutor();
DefaultExecutor(const DefaultExecutor& other);
DefaultExecutor& operator=(const DefaultExecutor&);
DefaultExecutor(DefaultExecutor&& other) = default;
DefaultExecutor& operator=(DefaultExecutor&&) = default;

virtual ~DefaultExecutor();

void WaitUntilStopped() override;

protected:
bool SubmitToThread(std::function<void()>&&) override;

// implementation details
public:
struct impl;

private:
std::shared_ptr<impl> pImpl;
};
} // namespace Threading
} // namespace Utils
} // namespace Aws
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ namespace Aws
ThreadTask(ThreadTask&&) = delete;
ThreadTask& operator =(ThreadTask&&) = delete;

void StopProcessingWork();
void StopProcessingWork();

std::thread::id GetThreadId() const;
void DetachFromExecutor();
protected:
void MainTaskRunner();

private:
private:
std::atomic<bool> m_continue;
PooledThreadExecutor& m_executor;
std::thread m_thread;
bool m_detached = false;
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace Aws
this->credentialsValidUntilMillis = DateTime::Now().Millis();

if (!m_ec2metadataClient) {
AWS_LOGSTREAM_FATAL(EC2_INSTANCE_PROFILE_LOG_TAG, "EC2MetadataClient is a nullptr!")
AWS_LOGSTREAM_FATAL(EC2_INSTANCE_PROFILE_LOG_TAG, "EC2MetadataClient is a nullptr!");
return false;
}
auto credentialsStr = m_ec2metadataClient->GetDefaultCredentialsSecurely();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

#include <aws/core/utils/logging/DefaultCRTLogSystem.h>
#include <aws/core/utils/logging/AWSLogging.h>
#include <aws/core/utils/logging/LogMacros.h>
#include <aws/core/utils/logging/LogSystemInterface.h>
#include <aws/core/utils/Array.h>
#include <aws/common/common.h>
#include <cstdarg>
#include <chrono>
#include <thread>
#include <mutex>

using namespace Aws::Utils;
using namespace Aws::Utils::Logging;
Expand Down Expand Up @@ -48,6 +45,10 @@ namespace Aws
if (pLogSystem)
{
pLogSystem->vaLog(logLevel, subjectName, formatStr, args);
if (LogLevel::Fatal == logLevel)
{
AWS_LOG_FLUSH();
}
}
}
}
Expand Down
Loading
Loading