diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift new file mode 100644 index 000000000..fd108eb98 --- /dev/null +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift @@ -0,0 +1,168 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=5.5) && canImport(_Concurrency) +import struct Foundation.URL +import Logging +import NIOCore +import NIOHTTP1 + +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension HTTPClient { + /// Execute arbitrary HTTP requests. + /// + /// - Parameters: + /// - request: HTTP request to execute. + /// - deadline: Point in time by which the request must complete. + /// - logger: The logger to use for this request. + /// - Returns: The response to the request. Note that the `body` of the response may not yet have been fully received. + func execute( + _ request: HTTPClientRequest, + deadline: NIODeadline, + logger: Logger + ) async throws -> HTTPClientResponse { + try await self.executeAndFollowRedirectsIfNeeded( + request, + deadline: deadline, + logger: logger, + redirectState: RedirectState(self.configuration.redirectConfiguration.mode, initialURL: request.url) + ) + } +} + +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension HTTPClient { + private func executeAndFollowRedirectsIfNeeded( + _ request: HTTPClientRequest, + deadline: NIODeadline, + logger: Logger, + redirectState: RedirectState? + ) async throws -> HTTPClientResponse { + var currentRequest = request + var currentRedirectState = redirectState + + // this loop is there to follow potential redirects + while true { + let preparedRequest = try HTTPClientRequest.Prepared(currentRequest) + let response = try await executeCancellable(preparedRequest, deadline: deadline, logger: logger) + + guard var redirectState = currentRedirectState else { + // a `nil` redirectState means we should not follow redirects + return response + } + + guard let redirectURL = response.headers.extractRedirectTarget( + status: response.status, + originalURL: preparedRequest.url, + originalScheme: preparedRequest.poolKey.scheme + ) else { + // response does not want a redirect + return response + } + + // validate that we do not exceed any limits or are running circles + try redirectState.redirect(to: redirectURL.absoluteString) + currentRedirectState = redirectState + + let newRequest = preparedRequest.followingRedirect(to: redirectURL, status: response.status) + + guard newRequest.body.canBeConsumedMultipleTimes else { + // we already send the request body and it cannot be send again + return response + } + + currentRequest = newRequest + } + } + + private func executeCancellable( + _ request: HTTPClientRequest.Prepared, + deadline: NIODeadline, + logger: Logger + ) async throws -> HTTPClientResponse { + let cancelHandler = TransactionCancelHandler() + + return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) -> Void in + let transaction = Transaction( + request: request, + requestOptions: .init(idleReadTimeout: nil), + logger: logger, + connectionDeadline: deadline, + preferredEventLoop: self.eventLoopGroup.next(), + responseContinuation: continuation + ) + + cancelHandler.registerTransaction(transaction) + + self.poolManager.executeRequest(transaction) + } + }, onCancel: { + cancelHandler.cancel() + }) + } +} + +/// There is currently no good way to asynchronously cancel an object that is initiated inside the `body` closure of `with*Continuation`. +/// As a workaround we use `TransactionCancelHandler` which will take care of the race between instantiation of `Transaction` +/// in the `body` closure and cancelation from the `onCancel` closure of `withTaskCancellationHandler`. +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +private actor TransactionCancelHandler { + private enum State { + case initialised + case register(Transaction) + case cancelled + } + + private var state: State = .initialised + + init() {} + + private func _registerTransaction(_ transaction: Transaction) { + switch self.state { + case .initialised: + self.state = .register(transaction) + case .cancelled: + transaction.cancel() + case .register: + preconditionFailure("transaction already set") + } + } + + nonisolated func registerTransaction(_ transaction: Transaction) { + Task { + await self._registerTransaction(transaction) + } + } + + private func _cancel() { + switch self.state { + case .register(let bag): + self.state = .cancelled + bag.cancel() + case .cancelled: + break + case .initialised: + self.state = .cancelled + } + } + + nonisolated func cancel() { + Task { + await self._cancel() + } + } +} + +#endif diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift index 8eacb4f47..974616143 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift @@ -13,11 +13,13 @@ //===----------------------------------------------------------------------===// #if compiler(>=5.5) && canImport(_Concurrency) +import struct Foundation.URL import NIOHTTP1 @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) extension HTTPClientRequest { struct Prepared { + var url: URL var poolKey: ConnectionPool.Key var requestFramingMetadata: RequestFramingMetadata var head: HTTPRequestHead @@ -28,22 +30,27 @@ extension HTTPClientRequest { @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) extension HTTPClientRequest.Prepared { init(_ request: HTTPClientRequest) throws { - let url = try DeconstructedURL(url: request.url) + guard let url = URL(string: request.url) else { + throw HTTPClientError.invalidURL + } + + let deconstructedURL = try DeconstructedURL(url: url) var headers = request.headers - headers.addHostIfNeeded(for: url) + headers.addHostIfNeeded(for: deconstructedURL) let metadata = try headers.validateAndSetTransportFraming( method: request.method, bodyLength: .init(request.body) ) self.init( - poolKey: .init(url: url, tlsConfiguration: nil), + url: url, + poolKey: .init(url: deconstructedURL, tlsConfiguration: nil), requestFramingMetadata: metadata, head: .init( version: .http1_1, method: request.method, - uri: url.uri, + uri: deconstructedURL.uri, headers: headers ), body: request.body @@ -59,12 +66,31 @@ extension RequestBodyLength { self = .fixed(length: 0) case .byteBuffer(let buffer): self = .fixed(length: buffer.readableBytes) - case .sequence(nil, _), .asyncSequence(nil, _): + case .sequence(nil, _, _), .asyncSequence(nil, _): self = .dynamic - case .sequence(.some(let length), _), .asyncSequence(.some(let length), _): + case .sequence(.some(let length), _, _), .asyncSequence(.some(let length), _): self = .fixed(length: length) } } } +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension HTTPClientRequest.Prepared { + func followingRedirect(to redirectURL: URL, status: HTTPResponseStatus) -> HTTPClientRequest { + let (method, headers, body) = transformRequestForRedirect( + from: self.url, + method: self.head.method, + headers: self.head.headers, + body: self.body, + to: redirectURL, + status: status + ) + var newRequest = HTTPClientRequest(url: redirectURL.absoluteString) + newRequest.method = method + newRequest.headers = headers + newRequest.body = body + return newRequest + } +} + #endif diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift index a7d40d468..3c729b75c 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift @@ -37,7 +37,7 @@ extension HTTPClientRequest { struct Body { internal enum Mode { case asyncSequence(length: Int?, (ByteBufferAllocator) async throws -> ByteBuffer?) - case sequence(length: Int?, (ByteBufferAllocator) -> ByteBuffer) + case sequence(length: Int?, canBeConsumedMultipleTimes: Bool, (ByteBufferAllocator) -> ByteBuffer) case byteBuffer(ByteBuffer) } @@ -57,10 +57,25 @@ extension HTTPClientRequest.Body { @inlinable static func bytes( - length: Int? = nil, + length: Int?, _ bytes: Bytes ) -> Self where Bytes.Element == UInt8 { - self.init(.sequence(length: length) { allocator in + self.init(.sequence(length: length, canBeConsumedMultipleTimes: false) { allocator in + if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { + // fastpath + return buffer + } + // potentially really slow path + return allocator.buffer(bytes: bytes) + }) + } + + @inlinable + static func bytes( + length: Int?, + _ bytes: Bytes + ) -> Self where Bytes.Element == UInt8 { + self.init(.sequence(length: length, canBeConsumedMultipleTimes: true) { allocator in if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { // fastpath return buffer @@ -74,7 +89,7 @@ extension HTTPClientRequest.Body { static func bytes( _ bytes: Bytes ) -> Self where Bytes.Element == UInt8 { - self.init(.sequence(length: bytes.count) { allocator in + self.init(.sequence(length: bytes.count, canBeConsumedMultipleTimes: true) { allocator in if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { // fastpath return buffer @@ -86,7 +101,7 @@ extension HTTPClientRequest.Body { @inlinable static func stream( - length: Int? = nil, + length: Int?, _ sequenceOfBytes: SequenceOfBytes ) -> Self where SequenceOfBytes.Element == ByteBuffer { var iterator = sequenceOfBytes.makeAsyncIterator() @@ -98,11 +113,11 @@ extension HTTPClientRequest.Body { @inlinable static func stream( - length: Int? = nil, + length: Int?, _ bytes: Bytes ) -> Self where Bytes.Element == UInt8 { var iterator = bytes.makeAsyncIterator() - let body = self.init(.asyncSequence(length: nil) { allocator -> ByteBuffer? in + let body = self.init(.asyncSequence(length: length) { allocator -> ByteBuffer? in var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number while buffer.writableBytes > 0, let byte = try await iterator.next() { buffer.writeInteger(byte) @@ -116,4 +131,16 @@ extension HTTPClientRequest.Body { } } +@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) +extension Optional where Wrapped == HTTPClientRequest.Body { + internal var canBeConsumedMultipleTimes: Bool { + switch self?.mode { + case .none: return true + case .byteBuffer: return true + case .sequence(_, let canBeConsumedMultipleTimes, _): return canBeConsumedMultipleTimes + case .asyncSequence: return false + } + } +} + #endif diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index 2596fb978..c231d45e4 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -203,7 +203,7 @@ extension Transaction: HTTPExecutableRequest { case .none: break - case .sequence(_, let create): + case .sequence(_, _, let create): let byteBuffer = create(allocator) self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer) } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift index 2abb9b4f3..082f37761 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift @@ -411,7 +411,7 @@ class HTTPClientRequestTests: XCTestCase { .asAsyncSequence() .map { ByteBuffer($0) } - request.body = .stream(asyncSequence) + request.body = .stream(length: nil, asyncSequence) var preparedRequest: PreparedRequest? XCTAssertNoThrow(preparedRequest = try PreparedRequest(request)) guard let preparedRequest = preparedRequest else { return } @@ -498,7 +498,7 @@ extension Optional where Wrapped == HTTPClientRequest.Body { return ByteBuffer() case .byteBuffer(let buffer): return buffer - case .sequence(let announcedLength, let generate): + case .sequence(let announcedLength, _, let generate): let buffer = generate(ByteBufferAllocator()) if let announcedLength = announcedLength, announcedLength != buffer.readableBytes { diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 38b17463e..9513605f7 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -189,7 +189,7 @@ final class TransactionTests: XCTestCase { var request = HTTPClientRequest(url: "https://localhost/") request.method = .POST - request.body = .stream(streamWriter) + request.body = .stream(length: nil, streamWriter) var maybePreparedRequest: PreparedRequest? XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request)) @@ -318,7 +318,7 @@ final class TransactionTests: XCTestCase { var request = HTTPClientRequest(url: "https://localhost/") request.method = .POST - request.body = .bytes("Hello world!".utf8) + request.body = .bytes(length: nil, "Hello world!".utf8) var maybePreparedRequest: PreparedRequest? XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request)) guard let preparedRequest = maybePreparedRequest else {