Skip to content

async/await execute #524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 14, 2021
168 changes: 168 additions & 0 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=5.5) && canImport(_Concurrency)
import struct Foundation.URL
import Logging
import NIOCore
import NIOHTTP1

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension HTTPClient {
/// Execute arbitrary HTTP requests.
///
/// - Parameters:
/// - request: HTTP request to execute.
/// - deadline: Point in time by which the request must complete.
/// - logger: The logger to use for this request.
/// - Returns: The response to the request. Note that the `body` of the response may not yet have been fully received.
func execute(
_ request: HTTPClientRequest,
deadline: NIODeadline,
logger: Logger
) async throws -> HTTPClientResponse {
try await self.executeAndFollowRedirectsIfNeeded(
request,
deadline: deadline,
logger: logger,
redirectState: RedirectState(self.configuration.redirectConfiguration.mode, initialURL: request.url)
)
}
}

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension HTTPClient {
private func executeAndFollowRedirectsIfNeeded(
_ request: HTTPClientRequest,
deadline: NIODeadline,
logger: Logger,
redirectState: RedirectState?
) async throws -> HTTPClientResponse {
var currentRequest = request
var currentRedirectState = redirectState

// this loop is there to follow potential redirects
while true {
let preparedRequest = try HTTPClientRequest.Prepared(currentRequest)
let response = try await executeCancellable(preparedRequest, deadline: deadline, logger: logger)

guard var redirectState = currentRedirectState else {
// a `nil` redirectState means we should not follow redirects
return response
}

guard let redirectURL = response.headers.extractRedirectTarget(
status: response.status,
originalURL: preparedRequest.url,
originalScheme: preparedRequest.poolKey.scheme
) else {
// response does not want a redirect
return response
}

// validate that we do not exceed any limits or are running circles
try redirectState.redirect(to: redirectURL.absoluteString)
currentRedirectState = redirectState

let newRequest = preparedRequest.followingRedirect(to: redirectURL, status: response.status)

guard newRequest.body.canBeConsumedMultipleTimes else {
// we already send the request body and it cannot be send again
return response
}

currentRequest = newRequest
}
}

private func executeCancellable(
_ request: HTTPClientRequest.Prepared,
deadline: NIODeadline,
logger: Logger
) async throws -> HTTPClientResponse {
let cancelHandler = TransactionCancelHandler()

return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
let transaction = Transaction(
request: request,
requestOptions: .init(idleReadTimeout: nil),
logger: logger,
connectionDeadline: deadline,
preferredEventLoop: self.eventLoopGroup.next(),
responseContinuation: continuation
)

cancelHandler.registerTransaction(transaction)

self.poolManager.executeRequest(transaction)
}
}, onCancel: {
cancelHandler.cancel()
})
}
}

/// There is currently no good way to asynchronously cancel an object that is initiated inside the `body` closure of `with*Continuation`.
/// As a workaround we use `TransactionCancelHandler` which will take care of the race between instantiation of `Transaction`
/// in the `body` closure and cancelation from the `onCancel` closure of `withTaskCancellationHandler`.
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
private actor TransactionCancelHandler {
private enum State {
case initialised
case register(Transaction)
case cancelled
}

private var state: State = .initialised

init() {}

private func _registerTransaction(_ transaction: Transaction) {
switch self.state {
case .initialised:
self.state = .register(transaction)
case .cancelled:
transaction.cancel()
case .register:
preconditionFailure("transaction already set")
}
}

nonisolated func registerTransaction(_ transaction: Transaction) {
Task {
await self._registerTransaction(transaction)
}
}

private func _cancel() {
switch self.state {
case .register(let bag):
self.state = .cancelled
bag.cancel()
case .cancelled:
break
case .initialised:
self.state = .cancelled
}
}

nonisolated func cancel() {
Task {
await self._cancel()
}
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
//===----------------------------------------------------------------------===//

#if compiler(>=5.5) && canImport(_Concurrency)
import struct Foundation.URL
import NIOHTTP1

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension HTTPClientRequest {
struct Prepared {
var url: URL
var poolKey: ConnectionPool.Key
var requestFramingMetadata: RequestFramingMetadata
var head: HTTPRequestHead
Expand All @@ -28,22 +30,27 @@ extension HTTPClientRequest {
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension HTTPClientRequest.Prepared {
init(_ request: HTTPClientRequest) throws {
let url = try DeconstructedURL(url: request.url)
guard let url = URL(string: request.url) else {
throw HTTPClientError.invalidURL
}

let deconstructedURL = try DeconstructedURL(url: url)

var headers = request.headers
headers.addHostIfNeeded(for: url)
headers.addHostIfNeeded(for: deconstructedURL)
let metadata = try headers.validateAndSetTransportFraming(
method: request.method,
bodyLength: .init(request.body)
)

self.init(
poolKey: .init(url: url, tlsConfiguration: nil),
url: url,
poolKey: .init(url: deconstructedURL, tlsConfiguration: nil),
requestFramingMetadata: metadata,
head: .init(
version: .http1_1,
method: request.method,
uri: url.uri,
uri: deconstructedURL.uri,
headers: headers
),
body: request.body
Expand All @@ -59,12 +66,31 @@ extension RequestBodyLength {
self = .fixed(length: 0)
case .byteBuffer(let buffer):
self = .fixed(length: buffer.readableBytes)
case .sequence(nil, _), .asyncSequence(nil, _):
case .sequence(nil, _, _), .asyncSequence(nil, _):
self = .dynamic
case .sequence(.some(let length), _), .asyncSequence(.some(let length), _):
case .sequence(.some(let length), _, _), .asyncSequence(.some(let length), _):
self = .fixed(length: length)
}
}
}

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension HTTPClientRequest.Prepared {
func followingRedirect(to redirectURL: URL, status: HTTPResponseStatus) -> HTTPClientRequest {
let (method, headers, body) = transformRequestForRedirect(
from: self.url,
method: self.head.method,
headers: self.head.headers,
body: self.body,
to: redirectURL,
status: status
)
var newRequest = HTTPClientRequest(url: redirectURL.absoluteString)
newRequest.method = method
newRequest.headers = headers
newRequest.body = body
return newRequest
}
}

#endif
41 changes: 34 additions & 7 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ extension HTTPClientRequest {
struct Body {
internal enum Mode {
case asyncSequence(length: Int?, (ByteBufferAllocator) async throws -> ByteBuffer?)
case sequence(length: Int?, (ByteBufferAllocator) -> ByteBuffer)
case sequence(length: Int?, canBeConsumedMultipleTimes: Bool, (ByteBufferAllocator) -> ByteBuffer)
case byteBuffer(ByteBuffer)
}

Expand All @@ -57,10 +57,25 @@ extension HTTPClientRequest.Body {

@inlinable
static func bytes<Bytes: Sequence>(
length: Int? = nil,
length: Int?,
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(length: length) { allocator in
self.init(.sequence(length: length, canBeConsumedMultipleTimes: false) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
}
// potentially really slow path
return allocator.buffer(bytes: bytes)
})
}

@inlinable
static func bytes<Bytes: Collection>(
length: Int?,
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(length: length, canBeConsumedMultipleTimes: true) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
Expand All @@ -74,7 +89,7 @@ extension HTTPClientRequest.Body {
static func bytes<Bytes: RandomAccessCollection>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(length: bytes.count) { allocator in
self.init(.sequence(length: bytes.count, canBeConsumedMultipleTimes: true) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
Expand All @@ -86,7 +101,7 @@ extension HTTPClientRequest.Body {

@inlinable
static func stream<SequenceOfBytes: AsyncSequence>(
length: Int? = nil,
length: Int?,
_ sequenceOfBytes: SequenceOfBytes
) -> Self where SequenceOfBytes.Element == ByteBuffer {
var iterator = sequenceOfBytes.makeAsyncIterator()
Expand All @@ -98,11 +113,11 @@ extension HTTPClientRequest.Body {

@inlinable
static func stream<Bytes: AsyncSequence>(
length: Int? = nil,
length: Int?,
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
var iterator = bytes.makeAsyncIterator()
let body = self.init(.asyncSequence(length: nil) { allocator -> ByteBuffer? in
let body = self.init(.asyncSequence(length: length) { allocator -> ByteBuffer? in
var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number
while buffer.writableBytes > 0, let byte = try await iterator.next() {
buffer.writeInteger(byte)
Expand All @@ -116,4 +131,16 @@ extension HTTPClientRequest.Body {
}
}

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
extension Optional where Wrapped == HTTPClientRequest.Body {
internal var canBeConsumedMultipleTimes: Bool {
switch self?.mode {
case .none: return true
case .byteBuffer: return true
case .sequence(_, let canBeConsumedMultipleTimes, _): return canBeConsumedMultipleTimes
case .asyncSequence: return false
}
}
}

#endif
2 changes: 1 addition & 1 deletion Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ extension Transaction: HTTPExecutableRequest {
case .none:
break

case .sequence(_, let create):
case .sequence(_, _, let create):
let byteBuffer = create(allocator)
self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer)
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ class HTTPClientRequestTests: XCTestCase {
.asAsyncSequence()
.map { ByteBuffer($0) }

request.body = .stream(asyncSequence)
request.body = .stream(length: nil, asyncSequence)
var preparedRequest: PreparedRequest?
XCTAssertNoThrow(preparedRequest = try PreparedRequest(request))
guard let preparedRequest = preparedRequest else { return }
Expand Down Expand Up @@ -498,7 +498,7 @@ extension Optional where Wrapped == HTTPClientRequest.Body {
return ByteBuffer()
case .byteBuffer(let buffer):
return buffer
case .sequence(let announcedLength, let generate):
case .sequence(let announcedLength, _, let generate):
let buffer = generate(ByteBufferAllocator())
if let announcedLength = announcedLength,
announcedLength != buffer.readableBytes {
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/TransactionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ final class TransactionTests: XCTestCase {

var request = HTTPClientRequest(url: "https://localhost/")
request.method = .POST
request.body = .stream(streamWriter)
request.body = .stream(length: nil, streamWriter)

var maybePreparedRequest: PreparedRequest?
XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request))
Expand Down Expand Up @@ -318,7 +318,7 @@ final class TransactionTests: XCTestCase {

var request = HTTPClientRequest(url: "https://localhost/")
request.method = .POST
request.body = .bytes("Hello world!".utf8)
request.body = .bytes(length: nil, "Hello world!".utf8)
var maybePreparedRequest: PreparedRequest?
XCTAssertNoThrow(maybePreparedRequest = try PreparedRequest(request))
guard let preparedRequest = maybePreparedRequest else {
Expand Down