Skip to content

Commit c31b516

Browse files
author
Alain Mazy
committed
added TransferMode for S3 (currently affected by aws/aws-sdk-cpp#2319 since we are using version 1.9.45)
1 parent 7234a7e commit c31b516

File tree

4 files changed

+180
-25
lines changed

4 files changed

+180
-25
lines changed

Aws/AwsS3StoragePlugin.cpp

Lines changed: 166 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,18 @@
2525
#include <aws/s3/model/ListObjectsRequest.h>
2626
#include <aws/s3/model/DeleteObjectRequest.h>
2727
#include <aws/core/auth/AWSCredentialsProvider.h>
28-
#include <aws/core/utils/memory/stl/AWSStringStream.h>
2928
#include <aws/core/utils/HashingUtils.h>
29+
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
30+
#include <aws/core/utils/memory/stl/AWSStringStream.h>
31+
#include <aws/core/utils/memory/AWSMemory.h>
32+
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
33+
#include <aws/core/utils/StringUtils.h>
34+
#include <aws/transfer/TransferManager.h>
3035
#include <aws/crt/Api.h>
36+
#include <fstream>
3137

3238
#include <boost/lexical_cast.hpp>
39+
#include <boost/interprocess/streams/bufferstream.hpp>
3340
#include <iostream>
3441
#include <fstream>
3542

@@ -39,13 +46,16 @@ class AwsS3StoragePlugin : public BaseStorage
3946
{
4047
public:
4148

42-
Aws::S3::S3Client client_;
4349
std::string bucketName_;
4450
bool storageContainsUnknownFiles_;
51+
bool useTransferManager_;
52+
std::shared_ptr<Aws::S3::S3Client> client_;
53+
std::shared_ptr<Aws::Utils::Threading::Executor> executor_;
54+
std::shared_ptr<Aws::Transfer::TransferManager> transferManager_;
4555

4656
public:
4757

48-
AwsS3StoragePlugin(const std::string& nameForLogs, const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles);
58+
AwsS3StoragePlugin(const std::string& nameForLogs, std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager);
4959

5060
virtual ~AwsS3StoragePlugin();
5161

@@ -55,21 +65,21 @@ class AwsS3StoragePlugin : public BaseStorage
5565
};
5666

5767

58-
class Writer : public IStorage::IWriter
68+
class DirectWriter : public IStorage::IWriter
5969
{
6070
std::string path_;
61-
Aws::S3::S3Client client_;
71+
std::shared_ptr<Aws::S3::S3Client> client_;
6272
std::string bucketName_;
6373

6474
public:
65-
Writer(const Aws::S3::S3Client& client, const std::string& bucketName, const std::string& path)
75+
DirectWriter(std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::string& path)
6676
: path_(path),
6777
client_(client),
6878
bucketName_(bucketName)
6979
{
7080
}
7181

72-
virtual ~Writer()
82+
virtual ~DirectWriter()
7383
{
7484
}
7585

@@ -89,7 +99,7 @@ class Writer : public IStorage::IWriter
8999
putObjectRequest.SetBody(stream);
90100
putObjectRequest.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(Aws::Utils::HashingUtils::CalculateMD5(*stream)));
91101

92-
auto result = client_.PutObject(putObjectRequest);
102+
auto result = client_->PutObject(putObjectRequest);
93103

94104
if (!result.IsSuccess())
95105
{
@@ -99,23 +109,24 @@ class Writer : public IStorage::IWriter
99109
};
100110

101111

102-
class Reader : public IStorage::IReader
112+
class DirectReader : public IStorage::IReader
103113
{
104-
Aws::S3::S3Client client_;
114+
protected:
115+
std::shared_ptr<Aws::S3::S3Client> client_;
105116
std::string bucketName_;
106117
std::list<std::string> paths_;
107118
std::string uuid_;
108119

109120
public:
110-
Reader(const Aws::S3::S3Client& client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
121+
DirectReader(std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
111122
: client_(client),
112123
bucketName_(bucketName),
113124
paths_(paths),
114125
uuid_(uuid)
115126
{
116127
}
117128

118-
virtual ~Reader()
129+
virtual ~DirectReader()
119130
{
120131

121132
}
@@ -160,7 +171,7 @@ class Reader : public IStorage::IReader
160171
listObjectRequest.SetBucket(bucketName_.c_str());
161172
listObjectRequest.SetPrefix(path.c_str());
162173

163-
auto result = client_.ListObjects(listObjectRequest);
174+
auto result = client_->ListObjects(listObjectRequest);
164175

165176
if (result.IsSuccess())
166177
{
@@ -232,7 +243,7 @@ class Reader : public IStorage::IReader
232243
});
233244

234245
// Get the object
235-
auto result = client_.GetObject(getObjectRequest);
246+
auto result = client_->GetObject(getObjectRequest);
236247
if (result.IsSuccess())
237248
{
238249
}
@@ -246,6 +257,111 @@ class Reader : public IStorage::IReader
246257

247258

248259

260+
class TransferWriter : public IStorage::IWriter
261+
{
262+
std::string path_;
263+
std::shared_ptr<Aws::Transfer::TransferManager> transferManager_;
264+
std::string bucketName_;
265+
266+
public:
267+
TransferWriter(std::shared_ptr<Aws::Transfer::TransferManager> transferManager, const std::string& bucketName, const std::string& path)
268+
: path_(path),
269+
transferManager_(transferManager),
270+
bucketName_(bucketName)
271+
{
272+
}
273+
274+
virtual ~TransferWriter()
275+
{
276+
}
277+
278+
virtual void Write(const char* data, size_t size)
279+
{
280+
boost::interprocess::bufferstream buffer(const_cast<char*>(static_cast<const char*>(data)), static_cast<size_t>(size));
281+
std::shared_ptr<Aws::IOStream> body = Aws::MakeShared<Aws::IOStream>(ALLOCATION_TAG, buffer.rdbuf());
282+
283+
std::shared_ptr<Aws::Transfer::TransferHandle> transferHandle = transferManager_->UploadFile(body, bucketName_, path_.c_str(), "application/binary", Aws::Map<Aws::String, Aws::String>());
284+
transferHandle->WaitUntilFinished();
285+
286+
if (transferHandle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED)
287+
{
288+
throw StoragePluginException(std::string("error while writing file ") + path_ + ": response code = " + boost::lexical_cast<std::string>(static_cast<int>(transferHandle->GetLastError().GetResponseCode())) + " " + transferHandle->GetLastError().GetMessage());
289+
}
290+
}
291+
};
292+
293+
294+
class TransferReader : public DirectReader
295+
{
296+
std::shared_ptr<Aws::Transfer::TransferManager> transferManager_;
297+
298+
public:
299+
TransferReader(std::shared_ptr<Aws::Transfer::TransferManager> transferManager, std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, const std::list<std::string>& paths, const char* uuid)
300+
: DirectReader(client, bucketName, paths, uuid),
301+
transferManager_(transferManager)
302+
{
303+
}
304+
305+
virtual ~TransferReader()
306+
{
307+
308+
}
309+
310+
virtual void ReadWhole(char* data, size_t size)
311+
{
312+
std::string firstExceptionMessage;
313+
314+
for (auto& path: paths_)
315+
{
316+
try
317+
{
318+
// The local variable 'streamBuffer' is captured by reference in a lambda.
319+
// It must persist until all downloading by the 'transfer_manager' is complete.
320+
Aws::Utils::Stream::PreallocatedStreamBuf streamBuffer(reinterpret_cast<unsigned char*>(data), size);
321+
322+
std::shared_ptr<Aws::Transfer::TransferHandle> downloadHandler = transferManager_->DownloadFile(bucketName_, path, [&]() { //Define a lambda expression for the callback method parameter to stream back the data.
323+
return Aws::New<Aws::IOStream>(ALLOCATION_TAG, &streamBuffer);
324+
});
325+
326+
downloadHandler->WaitUntilFinished();
327+
328+
if (downloadHandler->GetStatus() == Aws::Transfer::TransferStatus::COMPLETED)
329+
{
330+
return;
331+
}
332+
else if (firstExceptionMessage.empty())
333+
{
334+
firstExceptionMessage = downloadHandler->GetLastError().GetMessage();
335+
}
336+
// getObjectRequest.SetResponseStreamFactory(
337+
// [data, size]()
338+
// {
339+
// std::unique_ptr<Aws::StringStream>
340+
// istream(Aws::New<Aws::StringStream>(ALLOCATION_TAG));
341+
342+
// istream->rdbuf()->pubsetbuf(static_cast<char*>(data),
343+
// size);
344+
345+
// return istream.release();
346+
// });
347+
348+
349+
}
350+
catch (StoragePluginException& ex)
351+
{
352+
if (firstExceptionMessage.empty())
353+
{
354+
firstExceptionMessage = ex.what();
355+
}
356+
//ignore to retry
357+
}
358+
}
359+
throw StoragePluginException(firstExceptionMessage);
360+
}
361+
362+
};
363+
364+
249365

250366

251367
const char* AwsS3StoragePluginFactory::GetStoragePluginName()
@@ -340,26 +456,29 @@ IStorage* AwsS3StoragePluginFactory::CreateStorage(const std::string& nameForLog
340456
configuration.caFile = caFile;
341457
}
342458

459+
bool useTransferManager = true; // new in v 2.3.0
460+
pluginSection.LookupBooleanValue(useTransferManager, "UseTransferManager");
461+
343462
if (pluginSection.LookupStringValue(accessKey, "AccessKey") && pluginSection.LookupStringValue(secretKey, "SecretKey"))
344463
{
345464
OrthancPlugins::LogInfo("AWS S3 Storage: using credentials from the configuration file");
346465
Aws::Auth::AWSCredentials credentials(accessKey.c_str(), secretKey.c_str());
347466

348-
Aws::S3::S3Client client(credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
467+
std::shared_ptr<Aws::S3::S3Client> client = Aws::MakeShared<Aws::S3::S3Client>(ALLOCATION_TAG, credentials, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
349468

350469
OrthancPlugins::LogInfo("AWS S3 storage initialized");
351470

352-
return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles);
471+
return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager);
353472
}
354473
else
355474
{
356475
// when using default credentials, credentials are not checked at startup but only the first time you try to access the bucket !
357476
OrthancPlugins::LogInfo("AWS S3 Storage: using default credentials provider");
358-
Aws::S3::S3Client client(configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
477+
std::shared_ptr<Aws::S3::S3Client> client = Aws::MakeShared<Aws::S3::S3Client>(ALLOCATION_TAG, configuration, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, virtualAddressing);
359478

360479
OrthancPlugins::LogInfo("AWS S3 storage initialized");
361480

362-
return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles);
481+
return new AwsS3StoragePlugin(nameForLogs, client, bucketName, enableLegacyStorageStructure, storageContainsUnknownFiles, useTransferManager);
363482
}
364483
}
365484
catch (const std::exception& e)
@@ -379,17 +498,33 @@ AwsS3StoragePlugin::~AwsS3StoragePlugin()
379498
}
380499

381500

382-
AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, const Aws::S3::S3Client& client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles)
501+
AwsS3StoragePlugin::AwsS3StoragePlugin(const std::string& nameForLogs, std::shared_ptr<Aws::S3::S3Client> client, const std::string& bucketName, bool enableLegacyStorageStructure, bool storageContainsUnknownFiles, bool useTransferManager)
383502
: BaseStorage(nameForLogs, enableLegacyStorageStructure),
384-
client_(client),
385503
bucketName_(bucketName),
386-
storageContainsUnknownFiles_(storageContainsUnknownFiles)
504+
storageContainsUnknownFiles_(storageContainsUnknownFiles),
505+
useTransferManager_(useTransferManager),
506+
client_(client)
387507
{
508+
if (useTransferManager_)
509+
{
510+
executor_ = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(ALLOCATION_TAG, 10);
511+
Aws::Transfer::TransferManagerConfiguration transferConfig(executor_.get());
512+
transferConfig.s3Client = client_;
513+
514+
transferManager_ = Aws::Transfer::TransferManager::Create(transferConfig);
515+
}
388516
}
389517

390518
IStorage::IWriter* AwsS3StoragePlugin::GetWriterForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
391519
{
392-
return new Writer(client_, bucketName_, GetPath(uuid, type, encryptionEnabled));
520+
if (useTransferManager_)
521+
{
522+
return new TransferWriter(transferManager_, bucketName_, GetPath(uuid, type, encryptionEnabled));
523+
}
524+
else
525+
{
526+
return new DirectWriter(client_, bucketName_, GetPath(uuid, type, encryptionEnabled));
527+
}
393528
}
394529

395530
IStorage::IReader* AwsS3StoragePlugin::GetReaderForObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
@@ -401,7 +536,14 @@ IStorage::IReader* AwsS3StoragePlugin::GetReaderForObject(const char* uuid, Orth
401536
paths.push_back(GetPath(uuid, type, encryptionEnabled, true));
402537
}
403538

404-
return new Reader(client_, bucketName_, paths, uuid);
539+
if (useTransferManager_)
540+
{
541+
return new TransferReader(transferManager_, client_, bucketName_, paths, uuid);
542+
}
543+
else
544+
{
545+
return new DirectReader(client_, bucketName_, paths, uuid);
546+
}
405547
}
406548

407549
void AwsS3StoragePlugin::DeleteObject(const char* uuid, OrthancPluginContentType type, bool encryptionEnabled)
@@ -422,7 +564,7 @@ void AwsS3StoragePlugin::DeleteObject(const char* uuid, OrthancPluginContentType
422564
deleteObjectRequest.SetBucket(bucketName_.c_str());
423565
deleteObjectRequest.SetKey(path.c_str());
424566

425-
auto result = client_.DeleteObject(deleteObjectRequest);
567+
auto result = client_->DeleteObject(deleteObjectRequest);
426568

427569
if (!result.IsSuccess() && firstExceptionMessage.empty())
428570
{

Aws/AwsStaticConfiguration.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ list(APPEND AWS_SOURCES_SUBDIRS
156156
${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-core/source/utils/xml/
157157
${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/
158158
${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-s3/source/model/
159+
${AWS_SDK_CPP_SOURCES_DIR}/aws-cpp-sdk-transfer/source/transfer/
159160
)
160161

161162

Aws/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ if (NOT STATIC_BUILD AND USE_VCPKG_PACKAGES)
112112
# Use vcpkg by Microsoft
113113
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
114114
find_package(cryptopp CONFIG REQUIRED)
115-
find_package(AWSSDK REQUIRED COMPONENTS s3)
115+
find_package(AWSSDK REQUIRED COMPONENTS s3 transfer)
116116
set(CRYPTOPP_LIBRARIES cryptopp-static)
117117

118118
else()

NEWS

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
Pending changes in the mainline
2+
===============================
3+
4+
* AWS plugin:
5+
* Added a new configuration "UseTransferMode" (true by default).
6+
When set to true, the Transfer Manager mode is used to upload/download
7+
whole files to/from the bucket. If set to false, the default "object"
8+
mode is used. The Transfer Manager mode is supposed to be faster,
9+
especially for large files.
10+
11+
12+
113
2023-07-20 - v 2.2.0
214
====================
315

0 commit comments

Comments
 (0)