Skip to content

Commit 75d7f63

Browse files
authored
Automatically chunk large request bodies (#710)
* ChunkedCollection * Use swift-algorithms * SwiftFormat * test chunking * add documentation * SwiftFormat * fix old swift versions * fix older swift versions second attempt * fix old swift versions third attempt * fix old swift versions fourth attempt * update documentation
1 parent 16f7e62 commit 75d7f63

File tree

4 files changed

+349
-109
lines changed

4 files changed

+349
-109
lines changed

Package.swift

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ let package = Package(
2929
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
3030
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
3131
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
32+
.package(url: "https://github.com/apple/swift-algorithms", from: "1.0.0"),
3233
],
3334
targets: [
3435
.target(name: "CAsyncHTTPClient"),
@@ -48,6 +49,7 @@ let package = Package(
4849
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
4950
.product(name: "Logging", package: "swift-log"),
5051
.product(name: "Atomics", package: "swift-atomics"),
52+
.product(name: "Algorithms", package: "swift-algorithms"),
5153
]
5254
),
5355
.testTarget(
@@ -64,6 +66,7 @@ let package = Package(
6466
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
6567
.product(name: "Logging", package: "swift-log"),
6668
.product(name: "Atomics", package: "swift-atomics"),
69+
.product(name: "Algorithms", package: "swift-algorithms"),
6770
],
6871
resources: [
6972
.copy("Resources/self_signed_cert.pem"),

Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift

+151-73
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,23 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Algorithms
1516
import NIOCore
1617
import NIOHTTP1
1718

19+
@usableFromInline
20+
let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4
21+
22+
#if arch(arm) || arch(i386)
23+
// on 32-bit platforms we can't make use of a whole UInt32.max (as it doesn't fit in an Int)
24+
@usableFromInline
25+
let byteBufferMaxSize = Int.max
26+
#else
27+
// on 64-bit platforms we're good
28+
@usableFromInline
29+
let byteBufferMaxSize = Int(UInt32.max)
30+
#endif
31+
1832
/// A representation of an HTTP request for the Swift Concurrency HTTPClient API.
1933
///
2034
/// This object is similar to ``HTTPClient/Request``, but used for the Swift Concurrency API.
@@ -93,34 +107,17 @@ extension HTTPClientRequest.Body {
93107

94108
/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `RandomAccessCollection` of bytes.
95109
///
96-
/// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
97-
/// usage of this construction will be double the size of the original collection. The construction
98-
/// of the `ByteBuffer` will be delayed until it's needed.
110+
/// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB.
111+
/// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
112+
/// The construction of the `ByteBuffer` will be delayed until it's needed.
99113
///
100114
/// - parameter bytes: The bytes of the request body.
101115
@inlinable
102116
@preconcurrency
103117
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
104118
_ bytes: Bytes
105119
) -> Self where Bytes.Element == UInt8 {
106-
Self._bytes(bytes)
107-
}
108-
109-
@inlinable
110-
static func _bytes<Bytes: RandomAccessCollection>(
111-
_ bytes: Bytes
112-
) -> Self where Bytes.Element == UInt8 {
113-
self.init(.sequence(
114-
length: .known(bytes.count),
115-
canBeConsumedMultipleTimes: true
116-
) { allocator in
117-
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
118-
// fastpath
119-
return buffer
120-
}
121-
// potentially really slow path
122-
return allocator.buffer(bytes: bytes)
123-
})
120+
self.bytes(bytes, length: .known(bytes.count))
124121
}
125122

126123
/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes.
@@ -146,32 +143,77 @@ extension HTTPClientRequest.Body {
146143
_ bytes: Bytes,
147144
length: Length
148145
) -> Self where Bytes.Element == UInt8 {
149-
Self._bytes(bytes, length: length)
146+
Self._bytes(
147+
bytes,
148+
length: length,
149+
bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize,
150+
byteBufferMaxSize: byteBufferMaxSize
151+
)
150152
}
151153

154+
/// internal method to test chunking
152155
@inlinable
153-
static func _bytes<Bytes: Sequence>(
156+
@preconcurrency
157+
static func _bytes<Bytes: Sequence & Sendable>(
154158
_ bytes: Bytes,
155-
length: Length
159+
length: Length,
160+
bagOfBytesToByteBufferConversionChunkSize: Int,
161+
byteBufferMaxSize: Int
156162
) -> Self where Bytes.Element == UInt8 {
157-
self.init(.sequence(
158-
length: length.storage,
159-
canBeConsumedMultipleTimes: false
160-
) { allocator in
161-
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
162-
// fastpath
163-
return buffer
163+
// fast path
164+
let body: Self? = bytes.withContiguousStorageIfAvailable { bufferPointer -> Self in
165+
// `some Sequence<UInt8>` is special as it can't be efficiently chunked lazily.
166+
// Therefore we need to do the chunking eagerly if it implements the fast path withContiguousStorageIfAvailable
167+
// If we do it eagerly, it doesn't make sense to do a bunch of small chunks, so we only chunk if it exceeds
168+
// the maximum size of a ByteBuffer.
169+
if bufferPointer.count <= byteBufferMaxSize {
170+
let buffer = ByteBuffer(bytes: bufferPointer)
171+
return Self(.sequence(
172+
length: length.storage,
173+
canBeConsumedMultipleTimes: true,
174+
makeCompleteBody: { _ in buffer }
175+
))
176+
} else {
177+
// we need to copy `bufferPointer` eagerly as the pointer is only valid during the call to `withContiguousStorageIfAvailable`
178+
let buffers: Array<ByteBuffer> = bufferPointer.chunks(ofCount: byteBufferMaxSize).map { ByteBuffer(bytes: $0) }
179+
return Self(.asyncSequence(
180+
length: length.storage,
181+
makeAsyncIterator: {
182+
var iterator = buffers.makeIterator()
183+
return { _ in
184+
iterator.next()
185+
}
186+
}
187+
))
188+
}
189+
}
190+
if let body = body {
191+
return body
192+
}
193+
194+
// slow path
195+
return Self(.asyncSequence(
196+
length: length.storage
197+
) {
198+
var iterator = bytes.makeIterator()
199+
return { allocator in
200+
var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize)
201+
while buffer.writableBytes > 0, let byte = iterator.next() {
202+
buffer.writeInteger(byte)
203+
}
204+
if buffer.readableBytes > 0 {
205+
return buffer
206+
}
207+
return nil
164208
}
165-
// potentially really slow path
166-
return allocator.buffer(bytes: bytes)
167209
})
168210
}
169211

170212
/// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Collection` of bytes.
171213
///
172-
/// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
173-
/// usage of this construction will be double the size of the original collection. The construction
174-
/// of the `ByteBuffer` will be delayed until it's needed.
214+
/// This construction will flatten the `bytes` into a `ByteBuffer` in chunks of ~4MB.
215+
/// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
216+
/// The construction of the `ByteBuffer` will be delayed until it's needed.
175217
///
176218
/// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
177219
/// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
@@ -186,25 +228,27 @@ extension HTTPClientRequest.Body {
186228
_ bytes: Bytes,
187229
length: Length
188230
) -> Self where Bytes.Element == UInt8 {
189-
Self._bytes(bytes, length: length)
190-
}
191-
192-
@inlinable
193-
static func _bytes<Bytes: Collection>(
194-
_ bytes: Bytes,
195-
length: Length
196-
) -> Self where Bytes.Element == UInt8 {
197-
self.init(.sequence(
198-
length: length.storage,
199-
canBeConsumedMultipleTimes: true
200-
) { allocator in
201-
if let buffer = bytes.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
202-
// fastpath
203-
return buffer
204-
}
205-
// potentially really slow path
206-
return allocator.buffer(bytes: bytes)
207-
})
231+
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
232+
return self.init(.sequence(
233+
length: length.storage,
234+
canBeConsumedMultipleTimes: true
235+
) { allocator in
236+
allocator.buffer(bytes: bytes)
237+
})
238+
} else {
239+
return self.init(.asyncSequence(
240+
length: length.storage,
241+
makeAsyncIterator: {
242+
var iterator = bytes.chunks(ofCount: bagOfBytesToByteBufferConversionChunkSize).makeIterator()
243+
return { allocator in
244+
guard let chunk = iterator.next() else {
245+
return nil
246+
}
247+
return allocator.buffer(bytes: chunk)
248+
}
249+
}
250+
))
251+
}
208252
}
209253

210254
/// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of `ByteBuffer`s.
@@ -223,14 +267,6 @@ extension HTTPClientRequest.Body {
223267
public static func stream<SequenceOfBytes: AsyncSequence & Sendable>(
224268
_ sequenceOfBytes: SequenceOfBytes,
225269
length: Length
226-
) -> Self where SequenceOfBytes.Element == ByteBuffer {
227-
Self._stream(sequenceOfBytes, length: length)
228-
}
229-
230-
@inlinable
231-
static func _stream<SequenceOfBytes: AsyncSequence>(
232-
_ sequenceOfBytes: SequenceOfBytes,
233-
length: Length
234270
) -> Self where SequenceOfBytes.Element == ByteBuffer {
235271
let body = self.init(.asyncSequence(length: length.storage) {
236272
var iterator = sequenceOfBytes.makeAsyncIterator()
@@ -243,7 +279,7 @@ extension HTTPClientRequest.Body {
243279

244280
/// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of bytes.
245281
///
246-
/// This construction will consume 1kB chunks from the `Bytes` and send them at once. This optimizes for
282+
/// This construction will consume 4MB chunks from the `Bytes` and send them at once. This optimizes for
247283
/// `AsyncSequence`s where larger chunks are buffered up and available without actually suspending, such
248284
/// as those provided by `FileHandle`.
249285
///
@@ -259,19 +295,11 @@ extension HTTPClientRequest.Body {
259295
public static func stream<Bytes: AsyncSequence & Sendable>(
260296
_ bytes: Bytes,
261297
length: Length
262-
) -> Self where Bytes.Element == UInt8 {
263-
Self._stream(bytes, length: length)
264-
}
265-
266-
@inlinable
267-
static func _stream<Bytes: AsyncSequence>(
268-
_ bytes: Bytes,
269-
length: Length
270298
) -> Self where Bytes.Element == UInt8 {
271299
let body = self.init(.asyncSequence(length: length.storage) {
272300
var iterator = bytes.makeAsyncIterator()
273301
return { allocator -> ByteBuffer? in
274-
var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number
302+
var buffer = allocator.buffer(capacity: bagOfBytesToByteBufferConversionChunkSize)
275303
while buffer.writableBytes > 0, let byte = try await iterator.next() {
276304
buffer.writeInteger(byte)
277305
}
@@ -313,3 +341,53 @@ extension HTTPClientRequest.Body {
313341
internal var storage: RequestBodyLength
314342
}
315343
}
344+
345+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
346+
extension HTTPClientRequest.Body: AsyncSequence {
347+
public typealias Element = ByteBuffer
348+
349+
@inlinable
350+
public func makeAsyncIterator() -> AsyncIterator {
351+
switch self.mode {
352+
case .asyncSequence(_, let makeAsyncIterator):
353+
return .init(storage: .makeNext(makeAsyncIterator()))
354+
case .sequence(_, _, let makeCompleteBody):
355+
return .init(storage: .byteBuffer(makeCompleteBody(AsyncIterator.allocator)))
356+
case .byteBuffer(let byteBuffer):
357+
return .init(storage: .byteBuffer(byteBuffer))
358+
}
359+
}
360+
}
361+
362+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
363+
extension HTTPClientRequest.Body {
364+
public struct AsyncIterator: AsyncIteratorProtocol {
365+
@usableFromInline
366+
static let allocator = ByteBufferAllocator()
367+
368+
@usableFromInline
369+
enum Storage {
370+
case byteBuffer(ByteBuffer?)
371+
case makeNext((ByteBufferAllocator) async throws -> ByteBuffer?)
372+
}
373+
374+
@usableFromInline
375+
var storage: Storage
376+
377+
@inlinable
378+
init(storage: Storage) {
379+
self.storage = storage
380+
}
381+
382+
@inlinable
383+
public mutating func next() async throws -> ByteBuffer? {
384+
switch self.storage {
385+
case .byteBuffer(let buffer):
386+
self.storage = .byteBuffer(nil)
387+
return buffer
388+
case .makeNext(let makeNext):
389+
return try await makeNext(Self.allocator)
390+
}
391+
}
392+
}
393+
}

Sources/AsyncHTTPClient/HTTPHandler.swift

+32-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Algorithms
1516
import Foundation
1617
import Logging
1718
import NIOConcurrencyHelpers
@@ -44,6 +45,27 @@ extension HTTPClient {
4445
public func write(_ data: IOData) -> EventLoopFuture<Void> {
4546
return self.closure(data)
4647
}
48+
49+
@inlinable
50+
func writeChunks<Bytes: Collection>(of bytes: Bytes, maxChunkSize: Int) -> EventLoopFuture<Void> where Bytes.Element == UInt8 {
51+
let iterator = UnsafeMutableTransferBox(bytes.chunks(ofCount: maxChunkSize).makeIterator())
52+
guard let chunk = iterator.wrappedValue.next() else {
53+
return self.write(IOData.byteBuffer(.init()))
54+
}
55+
56+
@Sendable // can't use closure here as we recursively call ourselves which closures can't do
57+
func writeNextChunk(_ chunk: Bytes.SubSequence) -> EventLoopFuture<Void> {
58+
if let nextChunk = iterator.wrappedValue.next() {
59+
return self.write(.byteBuffer(ByteBuffer(bytes: chunk))).flatMap {
60+
writeNextChunk(nextChunk)
61+
}
62+
} else {
63+
return self.write(.byteBuffer(ByteBuffer(bytes: chunk)))
64+
}
65+
}
66+
67+
return writeNextChunk(chunk)
68+
}
4769
}
4870

4971
/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
@@ -90,7 +112,11 @@ extension HTTPClient {
90112
@inlinable
91113
public static func bytes<Bytes>(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
92114
return Body(length: bytes.count) { writer in
93-
writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
115+
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
116+
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
117+
} else {
118+
return writer.writeChunks(of: bytes, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize)
119+
}
94120
}
95121
}
96122

@@ -100,7 +126,11 @@ extension HTTPClient {
100126
/// - string: Body `String` representation.
101127
public static func string(_ string: String) -> Body {
102128
return Body(length: string.utf8.count) { writer in
103-
writer.write(.byteBuffer(ByteBuffer(string: string)))
129+
if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize {
130+
return writer.write(.byteBuffer(ByteBuffer(string: string)))
131+
} else {
132+
return writer.writeChunks(of: string.utf8, maxChunkSize: bagOfBytesToByteBufferConversionChunkSize)
133+
}
104134
}
105135
}
106136
}

0 commit comments

Comments
 (0)