|
3 | 3 | * SPDX-License-Identifier: Apache-2.0.
|
4 | 4 | */
|
5 | 5 |
|
6 |
| -#include <aws/core/http/curl/CurlHttpClient.h> |
7 | 6 | #include <aws/core/http/HttpRequest.h>
|
| 7 | +#include <aws/core/http/curl/CurlHttpClient.h> |
8 | 8 | #include <aws/core/http/standard/StandardHttpResponse.h>
|
9 |
| -#include <aws/core/utils/StringUtils.h> |
| 9 | +#include <aws/core/monitoring/HttpClientMetrics.h> |
| 10 | +#include <aws/core/utils/DateTime.h> |
10 | 11 | #include <aws/core/utils/HashingUtils.h>
|
| 12 | +#include <aws/core/utils/Outcome.h> |
| 13 | +#include <aws/core/utils/StringUtils.h> |
| 14 | +#include <aws/core/utils/crypto/Hash.h> |
11 | 15 | #include <aws/core/utils/logging/LogMacros.h>
|
12 | 16 | #include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
|
13 |
| -#include <aws/core/utils/DateTime.h> |
14 |
| -#include <aws/core/utils/crypto/Hash.h> |
15 |
| -#include <aws/core/utils/Outcome.h> |
16 |
| -#include <aws/core/monitoring/HttpClientMetrics.h> |
17 |
| -#include <cassert> |
| 17 | +#include <aws/core/utils/stream/AwsChunkedStream.h> |
| 18 | + |
18 | 19 | #include <algorithm>
|
| 20 | +#include <cassert> |
19 | 21 | #include <thread>
|
20 | 22 |
|
21 |
| - |
22 | 23 | using namespace Aws::Client;
|
23 | 24 | using namespace Aws::Http;
|
24 | 25 | using namespace Aws::Http::Standard;
|
25 | 26 | using namespace Aws::Utils;
|
26 | 27 | using namespace Aws::Utils::Logging;
|
| 28 | +using namespace Aws::Utils::Stream; |
27 | 29 | using namespace Aws::Monitoring;
|
28 | 30 |
|
29 | 31 | #ifdef USE_AWS_MEMORY_MANAGEMENT
|
@@ -144,25 +146,28 @@ struct CurlWriteCallbackContext
|
144 | 146 | int64_t m_numBytesResponseReceived;
|
145 | 147 | };
|
146 | 148 |
|
| 149 | +static const char* CURL_HTTP_CLIENT_TAG = "CurlHttpClient"; |
| 150 | + |
147 | 151 | struct CurlReadCallbackContext
|
148 | 152 | {
|
149 |
| - CurlReadCallbackContext(const CurlHttpClient* client, CURL* curlHandle, HttpRequest* request, Aws::Utils::RateLimits::RateLimiterInterface* limiter) : |
150 |
| - m_client(client), |
| 153 | + CurlReadCallbackContext(const CurlHttpClient* client, CURL* curlHandle, HttpRequest* request, |
| 154 | + Aws::Utils::RateLimits::RateLimiterInterface* limiter, |
| 155 | + std::shared_ptr<AwsChunkedStream<>> chunkedStream = nullptr) |
| 156 | + : m_client(client), |
151 | 157 | m_curlHandle(curlHandle),
|
152 | 158 | m_rateLimiter(limiter),
|
153 | 159 | m_request(request),
|
154 |
| - m_chunkEnd(false) |
155 |
| - {} |
156 |
| - |
157 |
| - const CurlHttpClient* m_client; |
158 |
| - CURL* m_curlHandle; |
159 |
| - Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; |
160 |
| - HttpRequest* m_request; |
161 |
| - bool m_chunkEnd; |
| 160 | + m_chunkEnd(false), |
| 161 | + m_chunkedStream{std::move(chunkedStream)} {} |
| 162 | + |
| 163 | + const CurlHttpClient* m_client; |
| 164 | + CURL* m_curlHandle; |
| 165 | + Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; |
| 166 | + HttpRequest* m_request; |
| 167 | + bool m_chunkEnd; |
| 168 | + std::shared_ptr<Stream::AwsChunkedStream<>> m_chunkedStream; |
162 | 169 | };
|
163 | 170 |
|
164 |
| -static const char* CURL_HTTP_CLIENT_TAG = "CurlHttpClient"; |
165 |
| - |
166 | 171 | static int64_t GetContentLengthFromHeader(CURL* connectionHandle,
|
167 | 172 | bool& hasContentLength) {
|
168 | 173 | #if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0
|
@@ -293,67 +298,24 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo
|
293 | 298 | size_t amountToRead = size * nmemb;
|
294 | 299 | bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) &&
|
295 | 300 | request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE;
|
296 |
| - // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF |
297 |
| - // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) |
298 |
| - if (isAwsChunked) |
299 |
| - { |
300 |
| - Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); |
301 |
| - amountToRead -= (amountToReadHexString.size() + 4); |
302 |
| - } |
303 | 301 |
|
304 | 302 | if (ioStream != nullptr && amountToRead > 0)
|
305 | 303 | {
|
306 | 304 | size_t amountRead = 0;
|
307 |
| - if (isStreaming) |
308 |
| - { |
309 |
| - if (!ioStream->eof() && ioStream->peek() != EOF) |
310 |
| - { |
311 |
| - amountRead = (size_t) ioStream->readsome(ptr, amountToRead); |
312 |
| - } |
313 |
| - if (amountRead == 0 && !ioStream->eof()) |
314 |
| - { |
315 |
| - return CURL_READFUNC_PAUSE; |
316 |
| - } |
317 |
| - } |
318 |
| - else |
319 |
| - { |
320 |
| - ioStream->read(ptr, amountToRead); |
321 |
| - amountRead = static_cast<size_t>(ioStream->gcount()); |
322 |
| - } |
323 |
| - |
324 |
| - if (isAwsChunked) |
325 |
| - { |
326 |
| - if (amountRead > 0) |
327 |
| - { |
328 |
| - if (request->GetRequestHash().second != nullptr) |
329 |
| - { |
330 |
| - request->GetRequestHash().second->Update(reinterpret_cast<unsigned char*>(ptr), amountRead); |
331 |
| - } |
332 |
| - |
333 |
| - Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); |
334 |
| - memmove(ptr + hex.size() + 2, ptr, amountRead); |
335 |
| - memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); |
336 |
| - memmove(ptr, hex.c_str(), hex.size()); |
337 |
| - memmove(ptr + hex.size(), "\r\n", 2); |
338 |
| - amountRead += hex.size() + 4; |
339 |
| - } |
340 |
| - else if (!context->m_chunkEnd) |
341 |
| - { |
342 |
| - Aws::StringStream chunkedTrailer; |
343 |
| - chunkedTrailer << "0\r\n"; |
344 |
| - if (request->GetRequestHash().second != nullptr) |
345 |
| - { |
346 |
| - chunkedTrailer << "x-amz-checksum-" |
347 |
| - << request->GetRequestHash().first |
348 |
| - << ":" |
349 |
| - << HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) |
350 |
| - << "\r\n"; |
351 |
| - } |
352 |
| - chunkedTrailer << "\r\n"; |
353 |
| - amountRead = chunkedTrailer.str().size(); |
354 |
| - memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); |
355 |
| - context->m_chunkEnd = true; |
356 |
| - } |
| 305 | + if (isStreaming) { |
| 306 | + if (!ioStream->eof() && ioStream->peek() != EOF) { |
| 307 | + amountRead = (size_t)ioStream->readsome(ptr, amountToRead); |
| 308 | + } |
| 309 | + if (amountRead == 0 && !ioStream->eof()) { |
| 310 | + return CURL_READFUNC_PAUSE; |
| 311 | + } |
| 312 | + } else if (isAwsChunked && context->m_chunkedStream != nullptr) { |
| 313 | + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Called with size: " << amountToRead); |
| 314 | + amountRead = context->m_chunkedStream->BufferedRead(ptr, amountToRead); |
| 315 | + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "read: " << amountRead); |
| 316 | + } else { |
| 317 | + ioStream->read(ptr, amountToRead); |
| 318 | + amountRead = static_cast<size_t>(ioStream->gcount()); |
357 | 319 | }
|
358 | 320 |
|
359 | 321 | auto& sentHandler = request->GetDataSentEventHandler();
|
@@ -724,7 +686,14 @@ std::shared_ptr<HttpResponse> CurlHttpClient::MakeRequest(const std::shared_ptr<
|
724 | 686 | }
|
725 | 687 |
|
726 | 688 | CurlWriteCallbackContext writeContext(this, request.get(), response.get(), readLimiter);
|
727 |
| - CurlReadCallbackContext readContext(this, connectionHandle, request.get(), writeLimiter); |
| 689 | + |
| 690 | + const auto readContext = [this, &connectionHandle, &request, &writeLimiter]() -> CurlReadCallbackContext { |
| 691 | + if (request->GetContentBody() != nullptr) { |
| 692 | + auto chunkedBodyPtr = Aws::MakeShared<AwsChunkedStream<>>(CURL_HTTP_CLIENT_TAG, request.get(), request->GetContentBody()); |
| 693 | + return {this, connectionHandle, request.get(), writeLimiter, std::move(chunkedBodyPtr)}; |
| 694 | + } |
| 695 | + return {this, connectionHandle, request.get(), writeLimiter}; |
| 696 | + }(); |
728 | 697 |
|
729 | 698 | SetOptCodeForHttpMethod(connectionHandle, request);
|
730 | 699 |
|
|
0 commit comments