Skip to content

Automatically chunk large request bodies #710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-algorithms", from: "1.0.0"),
],
targets: [
.target(name: "CAsyncHTTPClient"),
Expand All @@ -48,6 +49,7 @@ let package = Package(
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "Algorithms", package: "swift-algorithms"),
]
),
.testTarget(
Expand All @@ -64,6 +66,7 @@ let package = Package(
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "Algorithms", package: "swift-algorithms"),
],
resources: [
.copy("Resources/self_signed_cert.pem"),
Expand Down
224 changes: 151 additions & 73 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,23 @@
//
//===----------------------------------------------------------------------===//

import Algorithms
import NIOCore
import NIOHTTP1

@usableFromInline
let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4

#if arch(arm) || arch(i386)
// on 32-bit platforms we can't make use of a whole UInt32.max (as it doesn't fit in an Int)
@usableFromInline
let byteBufferMaxSize = Int.max
#else
// on 64-bit platforms we're good
@usableFromInline
let byteBufferMaxSize = Int(UInt32.max)
#endif

/// A representation of an HTTP request for the Swift Concurrency HTTPClient API.
///
/// This object is similar to ``HTTPClient/Request``, but used for the Swift Concurrency API.
Expand Down Expand Up @@ -93,34 +107,17 @@ extension HTTPClientRequest.Body {

/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `RandomAccessCollection` of bytes.
///
/// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
/// usage of this construction will be double the size of the original collection. The construction
/// of the `ByteBuffer` will be delayed until it's needed.
/// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB.
/// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
/// The construction of the `ByteBuffer` will be delayed until it's needed.
///
/// - parameter bytes: The bytes of the request body.
@inlinable
@preconcurrency
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes)
}

@inlinable
static func _bytes<Bytes: RandomAccessCollection>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: .known(bytes.count),
canBeConsumedMultipleTimes: true
) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
}
// potentially really slow path
return allocator.buffer(bytes: bytes)
})
self.bytes(bytes, length: .known(bytes.count))
}

/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes.
Expand All @@ -146,32 +143,77 @@ extension HTTPClientRequest.Body {
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes, length: length)
Self._bytes(
bytes,
length: length,
bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize,
byteBufferMaxSize: byteBufferMaxSize
)
}

/// internal method to test chunking
@inlinable
static func _bytes<Bytes: Sequence>(
@preconcurrency
static func _bytes<Bytes: Sequence & Sendable>(
_ bytes: Bytes,
length: Length
length: Length,
bagOfBytesToByteBufferConversionChunkSize: Int,
byteBufferMaxSize: Int
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: length.storage,
canBeConsumedMultipleTimes: false
) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
// fast path
let body: Self? = bytes.withContiguousStorageIfAvailable { bufferPointer -> Self in
// `some Sequence<UInt8>` is special as it can't be efficiently chunked lazily.
// Therefore we need to do the chunking eagerly if it implements the fast path withContiguousStorageIfAvailable
// If we do it eagerly, it doesn't make sense to do a bunch of small chunks, so we only chunk if it exceeds
// the maximum size of a ByteBuffer.
if bufferPointer.count <= byteBufferMaxSize {
let buffer = ByteBuffer(bytes: bufferPointer)
return Self(.sequence(
length: length.storage,
canBeConsumedMultipleTimes: true,
makeCompleteBody: { _ in buffer }
))
} else {
// we need to copy `bufferPointer` eagerly as the pointer is only valid during the call to `withContiguousStorageIfAvailable`
let buffers: Array<ByteBuffer> = bufferPointer.chunks(ofCount: byteBufferMaxSize).map { ByteBuffer(bytes: $0) }
return Self(.asyncSequence(
length: length.storage,
makeAsyncIterator: {
var iterator = buffers.makeIterator()
return { _ in
iterator.next()
}
}
))
}
}
if let body = body {
return body
}

// slow path
return Self(.asyncSequence(
length: length.storage
) {
var iterator = bytes.makeIterator()
return { allocator in
var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize)
while buffer.writableBytes > 0, let byte = iterator.next() {
buffer.writeInteger(byte)
}
if buffer.readableBytes > 0 {
return buffer
}
return nil
}
// potentially really slow path
return allocator.buffer(bytes: bytes)
})
}

/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Collection` of bytes.
///
/// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
/// usage of this construction will be double the size of the original collection. The construction
/// of the `ByteBuffer` will be delayed until it's needed.
/// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB.
/// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
/// The construction of the `ByteBuffer` will be delayed until it's needed.
///
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
Expand All @@ -186,25 +228,27 @@ extension HTTPClientRequest.Body {
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes, length: length)
}

@inlinable
static func _bytes<Bytes: Collection>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: length.storage,
canBeConsumedMultipleTimes: true
) { allocator in
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
// fastpath
return buffer
}
// potentially really slow path
return allocator.buffer(bytes: bytes)
})
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
return self.init(.sequence(
length: length.storage,
canBeConsumedMultipleTimes: true
) { allocator in
allocator.buffer(bytes: bytes)
})
} else {
return self.init(.asyncSequence(
length: length.storage,
makeAsyncIterator: {
var iterator = bytes.chunks(ofCount: bagOfBytesToByteBufferConversionChunkSize).makeIterator()
return { allocator in
guard let chunk = iterator.next() else {
return nil
}
return allocator.buffer(bytes: chunk)
}
}
))
}
}

/// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of `ByteBuffer`s.
Expand All @@ -223,14 +267,6 @@ extension HTTPClientRequest.Body {
public static func stream<SequenceOfBytes: AsyncSequence & Sendable>(
_ sequenceOfBytes: SequenceOfBytes,
length: Length
) -> Self where SequenceOfBytes.Element == ByteBuffer {
Self._stream(sequenceOfBytes, length: length)
}

@inlinable
static func _stream<SequenceOfBytes: AsyncSequence>(
_ sequenceOfBytes: SequenceOfBytes,
length: Length
) -> Self where SequenceOfBytes.Element == ByteBuffer {
let body = self.init(.asyncSequence(length: length.storage) {
var iterator = sequenceOfBytes.makeAsyncIterator()
Expand All @@ -243,7 +279,7 @@ extension HTTPClientRequest.Body {

/// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of bytes.
///
/// This construction will consume 1kB chunks from the `Bytes` and send them at once. This optimizes for
/// This construction will consume 4MB chunks from the `Bytes` and send them at once. This optimizes for
/// `AsyncSequence`s where larger chunks are buffered up and available without actually suspending, such
/// as those provided by `FileHandle`.
///
Expand All @@ -259,19 +295,11 @@ extension HTTPClientRequest.Body {
public static func stream<Bytes: AsyncSequence & Sendable>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._stream(bytes, length: length)
}

@inlinable
static func _stream<Bytes: AsyncSequence>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
let body = self.init(.asyncSequence(length: length.storage) {
var iterator = bytes.makeAsyncIterator()
return { allocator -> ByteBuffer? in
var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number
var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize)
while buffer.writableBytes > 0, let byte = try await iterator.next() {
buffer.writeInteger(byte)
}
Expand Down Expand Up @@ -313,3 +341,53 @@ extension HTTPClientRequest.Body {
internal var storage: RequestBodyLength
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientRequest.Body: AsyncSequence {
public typealias Element = ByteBuffer

@inlinable
public func makeAsyncIterator() -> AsyncIterator {
switch self.mode {
case .asyncSequence(_, let makeAsyncIterator):
return .init(storage: .makeNext(makeAsyncIterator()))
case .sequence(_, _, let makeCompleteBody):
return .init(storage: .byteBuffer(makeCompleteBody(AsyncIterator.allocator)))
case .byteBuffer(let byteBuffer):
return .init(storage: .byteBuffer(byteBuffer))
}
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientRequest.Body {
public struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline
static let allocator = ByteBufferAllocator()

@usableFromInline
enum Storage {
case byteBuffer(ByteBuffer?)
case makeNext((ByteBufferAllocator) async throws -> ByteBuffer?)
}

@usableFromInline
var storage: Storage

@inlinable
init(storage: Storage) {
self.storage = storage
}

@inlinable
public mutating func next() async throws -> ByteBuffer? {
switch self.storage {
case .byteBuffer(let buffer):
self.storage = .byteBuffer(nil)
return buffer
case .makeNext(let makeNext):
return try await makeNext(Self.allocator)
}
}
}
}
34 changes: 32 additions & 2 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Algorithms
import Foundation
import Logging
import NIOConcurrencyHelpers
Expand Down Expand Up @@ -44,6 +45,27 @@ extension HTTPClient {
public func write(_ data: IOData) -> EventLoopFuture<Void> {
return self.closure(data)
}

@inlinable
func writeChunks<Bytes: Collection>(of bytes: Bytes, maxChunkSize: Int) -> EventLoopFuture<Void> where Bytes.Element == UInt8 {
let iterator = UnsafeMutableTransferBox(bytes.chunks(ofCount: maxChunkSize).makeIterator())
guard let chunk = iterator.wrappedValue.next() else {
return self.write(IOData.byteBuffer(.init()))
}

@Sendable // can't use closure here as we recursively call ourselves which closures can't do
func writeNextChunk(_ chunk: Bytes.SubSequence) -> EventLoopFuture<Void> {
if let nextChunk = iterator.wrappedValue.next() {
return self.write(.byteBuffer(ByteBuffer(bytes: chunk))).flatMap {
writeNextChunk(nextChunk)
}
} else {
return self.write(.byteBuffer(ByteBuffer(bytes: chunk)))
}
}

return writeNextChunk(chunk)
}
}

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
Expand Down Expand Up @@ -90,7 +112,11 @@ extension HTTPClient {
@inlinable
public static func bytes<Bytes>(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
return Body(length: bytes.count) { writer in
writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
} else {
return writer.writeChunks(of: bytes, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize)
}
}
}

Expand All @@ -100,7 +126,11 @@ extension HTTPClient {
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(length: string.utf8.count) { writer in
writer.write(.byteBuffer(ByteBuffer(string: string)))
if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(string: string)))
} else {
return writer.writeChunks(of: string.utf8, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize)
}
}
}
}
Expand Down
Loading