Skip to content

Make HTTPClientResponse.init public #632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Sources/AsyncHTTPClient/AsyncAwait/AnyAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@usableFromInline
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
@usableFromInline typealias AsyncIteratorNextCallback = () async throws -> Element?

@usableFromInline struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline let nextCallback: AsyncIteratorNextCallback

@inlinable init(nextCallback: @escaping AsyncIteratorNextCallback) {
self.nextCallback = nextCallback
}

@inlinable mutating func next() async throws -> Element? {
try await self.nextCallback()
}
}

@usableFromInline var makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback

@inlinable init<SequenceOfBytes>(
_ asyncSequence: SequenceOfBytes
) where SequenceOfBytes: AsyncSequence & Sendable, SequenceOfBytes.Element == Element {
self.makeAsyncIteratorCallback = {
var iterator = asyncSequence.makeAsyncIterator()
return {
try await iterator.next()
}
}
}

@inlinable func makeAsyncIterator() -> AsyncIterator {
.init(nextCallback: self.makeAsyncIteratorCallback())
}
}
52 changes: 52 additions & 0 deletions Sources/AsyncHTTPClient/AsyncAwait/AsyncLazySequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@usableFromInline
struct AsyncLazySequence<Base: Sequence>: AsyncSequence {
@usableFromInline typealias Element = Base.Element
@usableFromInline struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline var iterator: Base.Iterator
@inlinable init(iterator: Base.Iterator) {
self.iterator = iterator
}

@inlinable mutating func next() async throws -> Base.Element? {
self.iterator.next()
}
}

@usableFromInline var base: Base

@inlinable init(base: Base) {
self.base = base
}

@inlinable func makeAsyncIterator() -> AsyncIterator {
.init(iterator: self.base.makeIterator())
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncLazySequence: Sendable where Base: Sendable {}
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncLazySequence.AsyncIterator: Sendable where Base.Iterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension Sequence {
/// Turns `self` into an `AsyncSequence` by vending each element of `self` asynchronously.
@inlinable var async: AsyncLazySequence<Self> {
.init(base: self)
}
}
139 changes: 83 additions & 56 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,98 +33,125 @@ public struct HTTPClientResponse: Sendable {
/// The body of this HTTP response.
public var body: Body

/// A representation of the response body for an HTTP response.
///
/// The body is streamed as an `AsyncSequence` of `ByteBuffer`, where each `ByteBuffer` contains
/// an arbitrarily large chunk of data. The boundaries between `ByteBuffer` objects in the sequence
/// are entirely synthetic and have no semantic meaning.
public struct Body: Sendable {
private let bag: Transaction
private let reference: ResponseRef

fileprivate init(_ transaction: Transaction) {
self.bag = transaction
self.reference = ResponseRef(transaction: transaction)
}
}

init(
bag: Transaction,
version: HTTPVersion,
status: HTTPResponseStatus,
headers: HTTPHeaders
) {
self.body = Body(bag)
self.version = version
self.status = status
self.headers = headers
self.body = Body(TransactionBody(bag))
}

@inlinable public init(
version: HTTPVersion = .http1_1,
status: HTTPResponseStatus = .ok,
headers: HTTPHeaders = [:],
body: Body = Body()
) {
self.version = version
self.status = status
self.headers = headers
self.body = body
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body: AsyncSequence {
public typealias Element = AsyncIterator.Element
extension HTTPClientResponse {
/// A representation of the response body for an HTTP response.
///
/// The body is streamed as an `AsyncSequence` of `ByteBuffer`, where each `ByteBuffer` contains
/// an arbitrarily large chunk of data. The boundaries between `ByteBuffer` objects in the sequence
/// are entirely synthetic and have no semantic meaning.
public struct Body: AsyncSequence, Sendable {
public typealias Element = ByteBuffer
public struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline var storage: Storage.AsyncIterator

public struct AsyncIterator: AsyncIteratorProtocol {
private let stream: IteratorStream
@inlinable init(storage: Storage.AsyncIterator) {
self.storage = storage
}

fileprivate init(stream: IteratorStream) {
self.stream = stream
@inlinable public mutating func next() async throws -> ByteBuffer? {
try await self.storage.next()
}
}

public mutating func next() async throws -> ByteBuffer? {
try await self.stream.next()
@usableFromInline var storage: Storage

@inlinable public func makeAsyncIterator() -> AsyncIterator {
.init(storage: self.storage.makeAsyncIterator())
}
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(stream: IteratorStream(bag: self.bag))
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body {
@usableFromInline enum Storage: Sendable {
case transaction(TransactionBody)
case anyAsyncSequence(AnyAsyncSequence<ByteBuffer>)
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body {
/// The purpose of this object is to inform the transaction about the response body being deinitialized.
/// If the users has not called `makeAsyncIterator` on the body, before it is deinited, the http
/// request needs to be cancelled.
fileprivate final class ResponseRef: Sendable {
private let transaction: Transaction

init(transaction: Transaction) {
self.transaction = transaction
extension HTTPClientResponse.Body.Storage: AsyncSequence {
@usableFromInline typealias Element = ByteBuffer

@inlinable func makeAsyncIterator() -> AsyncIterator {
switch self {
case .transaction(let transaction):
return .transaction(transaction.makeAsyncIterator())
case .anyAsyncSequence(let anyAsyncSequence):
return .anyAsyncSequence(anyAsyncSequence.makeAsyncIterator())
}
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body.Storage {
@usableFromInline enum AsyncIterator {
case transaction(TransactionBody.AsyncIterator)
case anyAsyncSequence(AnyAsyncSequence<ByteBuffer>.AsyncIterator)
}
}

deinit {
self.transaction.responseBodyDeinited()
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body.Storage.AsyncIterator: AsyncIteratorProtocol {
@inlinable mutating func next() async throws -> ByteBuffer? {
switch self {
case .transaction(let iterator):
return try await iterator.next()
case .anyAsyncSequence(var iterator):
defer { self = .anyAsyncSequence(iterator) }
return try await iterator.next()
}
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body {
internal class IteratorStream {
struct ID: Hashable {
private let objectID: ObjectIdentifier

init(_ object: IteratorStream) {
self.objectID = ObjectIdentifier(object)
}
}
init(_ body: TransactionBody) {
self.init(.transaction(body))
}

private var id: ID { ID(self) }
private let bag: Transaction
@usableFromInline init(_ storage: Storage) {
self.storage = storage
}

init(bag: Transaction) {
self.bag = bag
}
public init() {
self = .stream(EmptyCollection<ByteBuffer>().async)
}

deinit {
self.bag.responseBodyIteratorDeinited(streamID: self.id)
}
@inlinable public static func stream<SequenceOfBytes>(
_ sequenceOfBytes: SequenceOfBytes
) -> Self where SequenceOfBytes: AsyncSequence & Sendable, SequenceOfBytes.Element == ByteBuffer {
self.init(.anyAsyncSequence(AnyAsyncSequence(sequenceOfBytes.singleIteratorPrecondition)))
}

func next() async throws -> ByteBuffer? {
try await self.bag.nextResponsePart(streamID: self.id)
}
public static func bytes(_ byteBuffer: ByteBuffer) -> Self {
.stream(CollectionOfOne(byteBuffer).async)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics

/// Makes sure that a consumer of this `AsyncSequence` only calls `makeAsyncIterator()` at most once.
/// If `makeAsyncIterator()` is called multiple times, the program crashes.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@usableFromInline struct SingleIteratorPrecondition<Base: AsyncSequence>: AsyncSequence {
@usableFromInline let base: Base
@usableFromInline let didCreateIterator: ManagedAtomic<Bool> = .init(false)
@usableFromInline typealias Element = Base.Element
@inlinable init(base: Base) {
self.base = base
}

@inlinable func makeAsyncIterator() -> Base.AsyncIterator {
precondition(
self.didCreateIterator.exchange(true, ordering: .relaxed) == false,
"makeAsyncIterator() is only allowed to be called at most once."
)
return self.base.makeAsyncIterator()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension SingleIteratorPrecondition: @unchecked Sendable where Base: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncSequence {
@inlinable var singleIteratorPrecondition: SingleIteratorPrecondition<Self> {
.init(base: self)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extension Transaction {
case queued(CheckedContinuation<HTTPClientResponse, Error>, HTTPRequestScheduler)
case deadlineExceededWhileQueued(CheckedContinuation<HTTPClientResponse, Error>)
case executing(ExecutionContext, RequestStreamState, ResponseStreamState)
case finished(error: Error?, HTTPClientResponse.Body.IteratorStream.ID?)
case finished(error: Error?, TransactionBody.AsyncIterator.ID?)
}

fileprivate enum RequestStreamState {
Expand All @@ -52,9 +52,9 @@ extension Transaction {
// We are waiting for the user to create a response body iterator and to call next on
// it for the first time.
case waitingForResponseIterator(CircularBuffer<ByteBuffer>, next: Next)
case buffering(HTTPClientResponse.Body.IteratorStream.ID, CircularBuffer<ByteBuffer>, next: Next)
case waitingForRemote(HTTPClientResponse.Body.IteratorStream.ID, CheckedContinuation<ByteBuffer?, Error>)
case finished(HTTPClientResponse.Body.IteratorStream.ID, CheckedContinuation<ByteBuffer?, Error>)
case buffering(TransactionBody.AsyncIterator.ID, CircularBuffer<ByteBuffer>, next: Next)
case waitingForRemote(TransactionBody.AsyncIterator.ID, CheckedContinuation<ByteBuffer?, Error>)
case finished(TransactionBody.AsyncIterator.ID, CheckedContinuation<ByteBuffer?, Error>)
}

private var state: State
Expand Down Expand Up @@ -510,7 +510,7 @@ extension Transaction {
}
}

mutating func responseBodyIteratorDeinited(streamID: HTTPClientResponse.Body.IteratorStream.ID) -> FailAction {
mutating func responseBodyIteratorDeinited(streamID: TransactionBody.AsyncIterator.ID) -> FailAction {
switch self.state {
case .initialized, .queued, .deadlineExceededWhileQueued, .executing(_, _, .waitingForResponseHead):
preconditionFailure("Got notice about a deinited response body iterator, before we even received a response. Invalid state: \(self.state)")
Expand All @@ -536,7 +536,7 @@ extension Transaction {
}

mutating func consumeNextResponsePart(
streamID: HTTPClientResponse.Body.IteratorStream.ID,
streamID: TransactionBody.AsyncIterator.ID,
continuation: CheckedContinuation<ByteBuffer?, Error>
) -> ConsumeAction {
switch self.state {
Expand Down Expand Up @@ -639,8 +639,8 @@ extension Transaction {
}

private func verifyStreamIDIsEqual(
registered: HTTPClientResponse.Body.IteratorStream.ID,
this: HTTPClientResponse.Body.IteratorStream.ID,
registered: TransactionBody.AsyncIterator.ID,
this: TransactionBody.AsyncIterator.ID,
file: StaticString = #file,
line: UInt = #line
) {
Expand Down
Loading