diff --git a/Package.swift b/Package.swift index 7e80f853e..db458583c 100644 --- a/Package.swift +++ b/Package.swift @@ -29,6 +29,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-algorithms", from: "1.0.0"), ], targets: [ .target(name: "CAsyncHTTPClient"), @@ -48,6 +49,7 @@ let package = Package( .product(name: "NIOTransportServices", package: "swift-nio-transport-services"), .product(name: "Logging", package: "swift-log"), .product(name: "Atomics", package: "swift-atomics"), + .product(name: "Algorithms", package: "swift-algorithms"), ] ), .testTarget( @@ -64,6 +66,7 @@ let package = Package( .product(name: "NIOSOCKS", package: "swift-nio-extras"), .product(name: "Logging", package: "swift-log"), .product(name: "Atomics", package: "swift-atomics"), + .product(name: "Algorithms", package: "swift-algorithms"), ], resources: [ .copy("Resources/self_signed_cert.pem"), diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift index 278be7f84..a5b0a0061 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift @@ -12,9 +12,23 @@ // //===----------------------------------------------------------------------===// +import Algorithms import NIOCore import NIOHTTP1 +@usableFromInline +let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4 + +#if arch(arm) || arch(i386) +// on 32-bit platforms we can't make use of a whole UInt32.max (as it doesn't fit in an Int) +@usableFromInline +let byteBufferMaxSize = Int.max +#else +// on 64-bit platforms we're good +@usableFromInline +let byteBufferMaxSize = Int(UInt32.max) +#endif + /// A representation of an HTTP request for the Swift Concurrency HTTPClient API. /// /// This object is similar to ``HTTPClient/Request``, but used for the Swift Concurrency API. @@ -93,9 +107,9 @@ extension HTTPClientRequest.Body { /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `RandomAccessCollection` of bytes. /// - /// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory - /// usage of this construction will be double the size of the original collection. The construction - /// of the `ByteBuffer` will be delayed until it's needed. + /// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB. + /// As a result, the peak memory usage of this construction will be a small multiple of ~4MB. + /// The construction of the `ByteBuffer` will be delayed until it's needed. /// /// - parameter bytes: The bytes of the request body. @inlinable @@ -103,24 +117,7 @@ extension HTTPClientRequest.Body { public static func bytes( _ bytes: Bytes ) -> Self where Bytes.Element == UInt8 { - Self._bytes(bytes) - } - - @inlinable - static func _bytes( - _ bytes: Bytes - ) -> Self where Bytes.Element == UInt8 { - self.init(.sequence( - length: .known(bytes.count), - canBeConsumedMultipleTimes: true - ) { allocator in - if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { - // fastpath - return buffer - } - // potentially really slow path - return allocator.buffer(bytes: bytes) - }) + self.bytes(bytes, length: .known(bytes.count)) } /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes. @@ -146,32 +143,77 @@ extension HTTPClientRequest.Body { _ bytes: Bytes, length: Length ) -> Self where Bytes.Element == UInt8 { - Self._bytes(bytes, length: length) + Self._bytes( + bytes, + length: length, + bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize, + byteBufferMaxSize: byteBufferMaxSize + ) } + /// internal method to test chunking @inlinable - static func _bytes( + @preconcurrency + static func _bytes( _ bytes: Bytes, - length: Length + length: Length, + bagOfBytesToByteBufferConversionChunkSize: Int, + byteBufferMaxSize: Int ) -> Self where Bytes.Element == UInt8 { - self.init(.sequence( - length: length.storage, - canBeConsumedMultipleTimes: false - ) { allocator in - if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { - // fastpath - return buffer + // fast path + let body: Self? = bytes.withContiguousStorageIfAvailable { bufferPointer -> Self in + // `some Sequence` is special as it can't be efficiently chunked lazily. + // Therefore we need to do the chunking eagerly if it implements the fast path withContiguousStorageIfAvailable + // If we do it eagerly, it doesn't make sense to do a bunch of small chunks, so we only chunk if it exceeds + // the maximum size of a ByteBuffer. + if bufferPointer.count <= byteBufferMaxSize { + let buffer = ByteBuffer(bytes: bufferPointer) + return Self(.sequence( + length: length.storage, + canBeConsumedMultipleTimes: true, + makeCompleteBody: { _ in buffer } + )) + } else { + // we need to copy `bufferPointer` eagerly as the pointer is only valid during the call to `withContiguousStorageIfAvailable` + let buffers: Array = bufferPointer.chunks(ofCount: byteBufferMaxSize).map { ByteBuffer(bytes: $0) } + return Self(.asyncSequence( + length: length.storage, + makeAsyncIterator: { + var iterator = buffers.makeIterator() + return { _ in + iterator.next() + } + } + )) + } + } + if let body = body { + return body + } + + // slow path + return Self(.asyncSequence( + length: length.storage + ) { + var iterator = bytes.makeIterator() + return { allocator in + var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize) + while buffer.writableBytes > 0, let byte = iterator.next() { + buffer.writeInteger(byte) + } + if buffer.readableBytes > 0 { + return buffer + } + return nil } - // potentially really slow path - return allocator.buffer(bytes: bytes) }) } /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Collection` of bytes. /// - /// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory - /// usage of this construction will be double the size of the original collection. The construction - /// of the `ByteBuffer` will be delayed until it's needed. + /// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB. + /// As a result, the peak memory usage of this construction will be a small multiple of ~4MB. + /// The construction of the `ByteBuffer` will be delayed until it's needed. /// /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload @@ -186,25 +228,27 @@ extension HTTPClientRequest.Body { _ bytes: Bytes, length: Length ) -> Self where Bytes.Element == UInt8 { - Self._bytes(bytes, length: length) - } - - @inlinable - static func _bytes( - _ bytes: Bytes, - length: Length - ) -> Self where Bytes.Element == UInt8 { - self.init(.sequence( - length: length.storage, - canBeConsumedMultipleTimes: true - ) { allocator in - if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) { - // fastpath - return buffer - } - // potentially really slow path - return allocator.buffer(bytes: bytes) - }) + if bytes.count <= bagOfBytesToByteBufferConversionChunkSize { + return self.init(.sequence( + length: length.storage, + canBeConsumedMultipleTimes: true + ) { allocator in + allocator.buffer(bytes: bytes) + }) + } else { + return self.init(.asyncSequence( + length: length.storage, + makeAsyncIterator: { + var iterator = bytes.chunks(ofCount: bagOfBytesToByteBufferConversionChunkSize).makeIterator() + return { allocator in + guard let chunk = iterator.next() else { + return nil + } + return allocator.buffer(bytes: chunk) + } + } + )) + } } /// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of `ByteBuffer`s. @@ -223,14 +267,6 @@ extension HTTPClientRequest.Body { public static func stream( _ sequenceOfBytes: SequenceOfBytes, length: Length - ) -> Self where SequenceOfBytes.Element == ByteBuffer { - Self._stream(sequenceOfBytes, length: length) - } - - @inlinable - static func _stream( - _ sequenceOfBytes: SequenceOfBytes, - length: Length ) -> Self where SequenceOfBytes.Element == ByteBuffer { let body = self.init(.asyncSequence(length: length.storage) { var iterator = sequenceOfBytes.makeAsyncIterator() @@ -243,7 +279,7 @@ extension HTTPClientRequest.Body { /// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of bytes. /// - /// This construction will consume 1kB chunks from the `Bytes` and send them at once. This optimizes for + /// This construction will consume 4MB chunks from the `Bytes` and send them at once. This optimizes for /// `AsyncSequence`s where larger chunks are buffered up and available without actually suspending, such /// as those provided by `FileHandle`. /// @@ -259,19 +295,11 @@ extension HTTPClientRequest.Body { public static func stream( _ bytes: Bytes, length: Length - ) -> Self where Bytes.Element == UInt8 { - Self._stream(bytes, length: length) - } - - @inlinable - static func _stream( - _ bytes: Bytes, - length: Length ) -> Self where Bytes.Element == UInt8 { let body = self.init(.asyncSequence(length: length.storage) { var iterator = bytes.makeAsyncIterator() return { allocator -> ByteBuffer? in - var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number + var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize) while buffer.writableBytes > 0, let byte = try await iterator.next() { buffer.writeInteger(byte) } @@ -313,3 +341,53 @@ extension HTTPClientRequest.Body { internal var storage: RequestBodyLength } } + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension HTTPClientRequest.Body: AsyncSequence { + public typealias Element = ByteBuffer + + @inlinable + public func makeAsyncIterator() -> AsyncIterator { + switch self.mode { + case .asyncSequence(_, let makeAsyncIterator): + return .init(storage: .makeNext(makeAsyncIterator())) + case .sequence(_, _, let makeCompleteBody): + return .init(storage: .byteBuffer(makeCompleteBody(AsyncIterator.allocator))) + case .byteBuffer(let byteBuffer): + return .init(storage: .byteBuffer(byteBuffer)) + } + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension HTTPClientRequest.Body { + public struct AsyncIterator: AsyncIteratorProtocol { + @usableFromInline + static let allocator = ByteBufferAllocator() + + @usableFromInline + enum Storage { + case byteBuffer(ByteBuffer?) + case makeNext((ByteBufferAllocator) async throws -> ByteBuffer?) + } + + @usableFromInline + var storage: Storage + + @inlinable + init(storage: Storage) { + self.storage = storage + } + + @inlinable + public mutating func next() async throws -> ByteBuffer? { + switch self.storage { + case .byteBuffer(let buffer): + self.storage = .byteBuffer(nil) + return buffer + case .makeNext(let makeNext): + return try await makeNext(Self.allocator) + } + } + } +} diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index c5a3b3a4a..33e68995e 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Algorithms import Foundation import Logging import NIOConcurrencyHelpers @@ -44,6 +45,27 @@ extension HTTPClient { public func write(_ data: IOData) -> EventLoopFuture { return self.closure(data) } + + @inlinable + func writeChunks(of bytes: Bytes, maxChunkSize: Int) -> EventLoopFuture where Bytes.Element == UInt8 { + let iterator = UnsafeMutableTransferBox(bytes.chunks(ofCount: maxChunkSize).makeIterator()) + guard let chunk = iterator.wrappedValue.next() else { + return self.write(IOData.byteBuffer(.init())) + } + + @Sendable // can't use closure here as we recursively call ourselves which closures can't do + func writeNextChunk(_ chunk: Bytes.SubSequence) -> EventLoopFuture { + if let nextChunk = iterator.wrappedValue.next() { + return self.write(.byteBuffer(ByteBuffer(bytes: chunk))).flatMap { + writeNextChunk(nextChunk) + } + } else { + return self.write(.byteBuffer(ByteBuffer(bytes: chunk))) + } + } + + return writeNextChunk(chunk) + } } /// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length` @@ -90,7 +112,11 @@ extension HTTPClient { @inlinable public static func bytes(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 { return Body(length: bytes.count) { writer in - writer.write(.byteBuffer(ByteBuffer(bytes: bytes))) + if bytes.count <= bagOfBytesToByteBufferConversionChunkSize { + return writer.write(.byteBuffer(ByteBuffer(bytes: bytes))) + } else { + return writer.writeChunks(of: bytes, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize) + } } } @@ -100,7 +126,11 @@ extension HTTPClient { /// - string: Body `String` representation. public static func string(_ string: String) -> Body { return Body(length: string.utf8.count) { writer in - writer.write(.byteBuffer(ByteBuffer(string: string))) + if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize { + return writer.write(.byteBuffer(ByteBuffer(string: string))) + } else { + return writer.writeChunks(of: string.utf8, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize) + } } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift index aa1071de6..3536160fd 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Algorithms @testable import AsyncHTTPClient import NIOCore import XCTest @@ -393,7 +394,7 @@ class HTTPClientRequestTests: XCTestCase { request.method = .POST let asyncSequence = ByteBuffer(string: "post body") .readableBytesView - .chunked(maxChunkSize: 2) + .chunks(ofCount: 2) .async .map { ByteBuffer($0) } @@ -433,7 +434,7 @@ class HTTPClientRequestTests: XCTestCase { request.method = .POST let asyncSequence = ByteBuffer(string: "post body") .readableBytesView - .chunked(maxChunkSize: 2) + .chunks(ofCount: 2) .async .map { ByteBuffer($0) } @@ -465,6 +466,166 @@ class HTTPClientRequestTests: XCTestCase { XCTAssertEqual(buffer, .init(string: "post body")) } } + + func testChunkingRandomAccessCollection() async throws { + let body = try await HTTPClientRequest.Body.bytes( + Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize) + ).collect() + + let expectedChunks = [ + ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize), + ] + + XCTAssertEqual(body, expectedChunks) + } + + func testChunkingCollection() async throws { + let body = try await HTTPClientRequest.Body.bytes( + ( + String(repeating: "0", count: bagOfBytesToByteBufferConversionChunkSize) + + String(repeating: "1", count: bagOfBytesToByteBufferConversionChunkSize) + + String(repeating: "2", count: bagOfBytesToByteBufferConversionChunkSize) + ).utf8, + length: .known(bagOfBytesToByteBufferConversionChunkSize * 3) + ).collect() + + let expectedChunks = [ + ByteBuffer(repeating: UInt8(ascii: "0"), count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: UInt8(ascii: "1"), count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: UInt8(ascii: "2"), count: bagOfBytesToByteBufferConversionChunkSize), + ] + + XCTAssertEqual(body, expectedChunks) + } + + func testChunkingSequenceThatDoesNotImplementWithContiguousStorageIfAvailable() async throws { + let bagOfBytesToByteBufferConversionChunkSize = 8 + let body = try await HTTPClientRequest.Body._bytes( + AnySequence( + Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) + ), + length: .known(bagOfBytesToByteBufferConversionChunkSize * 3), + bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize, + byteBufferMaxSize: byteBufferMaxSize + ).collect() + + let expectedChunks = [ + ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize), + ] + + XCTAssertEqual(body, expectedChunks) + } + + #if swift(>=5.7) + func testChunkingSequenceFastPath() async throws { + func makeBytes() -> some Sequence & Sendable { + Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize) + } + let body = try await HTTPClientRequest.Body.bytes( + makeBytes(), + length: .known(bagOfBytesToByteBufferConversionChunkSize * 3) + ).collect() + + var firstChunk = ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + firstChunk.writeImmutableBuffer(ByteBuffer(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize)) + firstChunk.writeImmutableBuffer(ByteBuffer(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize)) + let expectedChunks = [ + firstChunk, + ] + + XCTAssertEqual(body, expectedChunks) + } + + func testChunkingSequenceFastPathExceedingByteBufferMaxSize() async throws { + let bagOfBytesToByteBufferConversionChunkSize = 8 + let byteBufferMaxSize = 16 + func makeBytes() -> some Sequence & Sendable { + Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize) + } + let body = try await HTTPClientRequest.Body._bytes( + makeBytes(), + length: .known(bagOfBytesToByteBufferConversionChunkSize * 3), + bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize, + byteBufferMaxSize: byteBufferMaxSize + ).collect() + + var firstChunk = ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + firstChunk.writeImmutableBuffer(ByteBuffer(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize)) + let secondChunk = ByteBuffer(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize) + let expectedChunks = [ + firstChunk, + secondChunk, + ] + + XCTAssertEqual(body, expectedChunks) + } + #endif + + func testBodyStringChunking() throws { + let body = try HTTPClient.Body.string( + String(repeating: "0", count: bagOfBytesToByteBufferConversionChunkSize) + + String(repeating: "1", count: bagOfBytesToByteBufferConversionChunkSize) + + String(repeating: "2", count: bagOfBytesToByteBufferConversionChunkSize) + ).collect().wait() + + let expectedChunks = [ + ByteBuffer(repeating: UInt8(ascii: "0"), count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: UInt8(ascii: "1"), count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: UInt8(ascii: "2"), count: bagOfBytesToByteBufferConversionChunkSize), + ] + + XCTAssertEqual(body, expectedChunks) + } + + func testBodyChunkingRandomAccessCollection() throws { + let body = try HTTPClient.Body.bytes( + Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) + + Array(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize) + ).collect().wait() + + let expectedChunks = [ + ByteBuffer(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize), + ByteBuffer(repeating: 2, count: bagOfBytesToByteBufferConversionChunkSize), + ] + + XCTAssertEqual(body, expectedChunks) + } +} + +extension AsyncSequence { + func collect() async throws -> [Element] { + try await self.reduce(into: []) { $0 += CollectionOfOne($1) } + } +} + +extension HTTPClient.Body { + func collect() -> EventLoopFuture<[ByteBuffer]> { + let eelg = EmbeddedEventLoopGroup(loops: 1) + let el = eelg.next() + var body = [ByteBuffer]() + let writer = StreamWriter { + switch $0 { + case .byteBuffer(let byteBuffer): + body.append(byteBuffer) + case .fileRegion: + fatalError("file region not supported") + } + return el.makeSucceededVoidFuture() + } + return self.stream(writer).map { _ in body } + } } private struct LengthMismatch: Error { @@ -502,35 +663,3 @@ extension Optional where Wrapped == HTTPClientRequest.Prepared.Body { } } } - -struct ChunkedSequence: Sequence { - struct Iterator: IteratorProtocol { - fileprivate var remainingElements: Wrapped.SubSequence - fileprivate let maxChunkSize: Int - mutating func next() -> Wrapped.SubSequence? { - guard !self.remainingElements.isEmpty else { - return nil - } - let chunk = self.remainingElements.prefix(self.maxChunkSize) - self.remainingElements = self.remainingElements.dropFirst(self.maxChunkSize) - return chunk - } - } - - fileprivate let wrapped: Wrapped - fileprivate let maxChunkSize: Int - - func makeIterator() -> Iterator { - .init(remainingElements: self.wrapped[...], maxChunkSize: self.maxChunkSize) - } -} - -extension ChunkedSequence: Sendable where Wrapped: Sendable {} - -extension Collection { - /// Lazily splits `self` into `SubSequence`s with `maxChunkSize` elements. - /// - Parameter maxChunkSize: size of each chunk except the last one which can be smaller if not enough elements are remaining. - func chunked(maxChunkSize: Int) -> ChunkedSequence { - .init(wrapped: self, maxChunkSize: maxChunkSize) - } -}