Skip to content

Commit 554c0d5

Browse files
committed
Add a shared file IO thread pool per HTTPClient
1 parent c5c0e7b commit 554c0d5

File tree

3 files changed

+53
-46
lines changed

3 files changed

+53
-46
lines changed

Sources/AsyncHTTPClient/FileDownloadDelegate.swift

+24-40
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,10 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
3030
public typealias Response = Progress
3131

3232
private let filePath: String
33-
private let io: NonBlockingFileIO
33+
private var io: NonBlockingFileIO?
3434
private let reportHead: ((HTTPResponseHead) -> Void)?
3535
private let reportProgress: ((Progress) -> Void)?
3636

37-
private enum ThreadPool {
38-
case unowned
39-
// if we own the thread pool we also need to shut it down
40-
case owned(NIOThreadPool)
41-
}
42-
43-
private let threadPool: ThreadPool
44-
4537
private var fileHandleFuture: EventLoopFuture<NIOFileHandle>?
4638
private var writeFuture: EventLoopFuture<Void>?
4739

@@ -61,7 +53,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
6153
reportHead: ((HTTPResponseHead) -> Void)? = nil,
6254
reportProgress: ((Progress) -> Void)? = nil
6355
) throws {
64-
try self.init(path: path, sharedThreadPool: pool, reportHead: reportHead, reportProgress: reportProgress)
56+
try self.init(path: path, pool: .some(pool), reportHead: reportHead, reportProgress: reportProgress)
6557
}
6658

6759
/// Initializes a new file download delegate and spawns a new thread for file I/O.
@@ -78,31 +70,36 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
7870
reportHead: ((HTTPResponseHead) -> Void)? = nil,
7971
reportProgress: ((Progress) -> Void)? = nil
8072
) throws {
81-
try self.init(path: path, sharedThreadPool: nil, reportHead: reportHead, reportProgress: reportProgress)
73+
try self.init(path: path, pool: nil, reportHead: reportHead, reportProgress: reportProgress)
8274
}
8375

8476
private init(
8577
path: String,
86-
sharedThreadPool: NIOThreadPool?,
78+
pool: NIOThreadPool?,
8779
reportHead: ((HTTPResponseHead) -> Void)? = nil,
8880
reportProgress: ((Progress) -> Void)? = nil
8981
) throws {
90-
let pool: NIOThreadPool
91-
if let sharedThreadPool = sharedThreadPool {
92-
pool = sharedThreadPool
93-
self.threadPool = .unowned
82+
if let pool = pool {
83+
self.io = NonBlockingFileIO(threadPool: pool)
9484
} else {
95-
pool = NIOThreadPool(numberOfThreads: 1)
96-
self.threadPool = .owned(pool)
85+
// we should use the shared thread pool from the HTTPClient which
86+
// we will get shortly through a call to provideSharedThreadPool(fileIOPool:)
87+
self.io = nil
9788
}
98-
99-
pool.start()
100-
self.io = NonBlockingFileIO(threadPool: pool)
89+
10190
self.filePath = path
10291

10392
self.reportHead = reportHead
10493
self.reportProgress = reportProgress
10594
}
95+
96+
public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {
97+
guard self.io == nil else {
98+
// user has provided their own thread pool
99+
return
100+
}
101+
self.io = NonBlockingFileIO(threadPool: fileIOPool)
102+
}
106103

107104
public func didReceiveHead(
108105
task: HTTPClient.Task<Response>,
@@ -122,24 +119,27 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
122119
task: HTTPClient.Task<Response>,
123120
_ buffer: ByteBuffer
124121
) -> EventLoopFuture<Void> {
122+
guard let io = io else {
123+
preconditionFailure("thread pool not provided by HTTPClient before calling \(#function)")
124+
}
125125
self.progress.receivedBytes += buffer.readableBytes
126126
self.reportProgress?(self.progress)
127127

128128
let writeFuture: EventLoopFuture<Void>
129129
if let fileHandleFuture = self.fileHandleFuture {
130130
writeFuture = fileHandleFuture.flatMap {
131-
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
131+
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
132132
}
133133
} else {
134-
let fileHandleFuture = self.io.openFile(
134+
let fileHandleFuture = io.openFile(
135135
path: self.filePath,
136136
mode: .write,
137137
flags: .allowFileCreation(),
138138
eventLoop: task.eventLoop
139139
)
140140
self.fileHandleFuture = fileHandleFuture
141141
writeFuture = fileHandleFuture.flatMap {
142-
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
142+
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
143143
}
144144
}
145145

@@ -150,12 +150,6 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
150150
private func close(fileHandle: NIOFileHandle) {
151151
try! fileHandle.close()
152152
self.fileHandleFuture = nil
153-
switch self.threadPool {
154-
case .unowned:
155-
break
156-
case .owned(let pool):
157-
try! pool.syncShutdownGracefully()
158-
}
159153
}
160154

161155
private func finalize() {
@@ -177,14 +171,4 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
177171
self.finalize()
178172
return self.progress
179173
}
180-
181-
deinit {
182-
switch threadPool {
183-
case .unowned:
184-
break
185-
case .owned(let pool):
186-
// if the delegate is unused we still need to shutdown the thread pool
187-
try! pool.syncShutdownGracefully()
188-
}
189-
}
190174
}

Sources/AsyncHTTPClient/HTTPClient.swift

+13-4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ public class HTTPClient {
7272
let eventLoopGroupProvider: EventLoopGroupProvider
7373
let configuration: Configuration
7474
let poolManager: HTTPConnectionPool.Manager
75+
76+
/// Shared thread pool used for file IO. It is given to the user through ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-6phmu``
77+
private let fileIOThreadPool = NIOThreadPool(numberOfThreads: 1)
7578
private var state: State
7679
private let stateLock = Lock()
7780

@@ -97,6 +100,7 @@ public class HTTPClient {
97100
public required init(eventLoopGroupProvider: EventLoopGroupProvider,
98101
configuration: Configuration = Configuration(),
99102
backgroundActivityLogger: Logger) {
103+
self.fileIOThreadPool.start()
100104
self.eventLoopGroupProvider = eventLoopGroupProvider
101105
switch self.eventLoopGroupProvider {
102106
case .shared(let group):
@@ -241,11 +245,13 @@ public class HTTPClient {
241245
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
242246
return (callback, error)
243247
}
244-
245-
self.shutdownEventLoop(queue: queue) { error in
246-
let reportedError = error ?? uncleanError
247-
callback(reportedError)
248+
self.fileIOThreadPool.shutdownGracefully(queue: queue) { ioThreadPoolError in
249+
self.shutdownEventLoop(queue: queue) { error in
250+
let reportedError = error ?? ioThreadPoolError ?? uncleanError
251+
callback(reportedError)
252+
}
248253
}
254+
249255
}
250256
}
251257
}
@@ -562,9 +568,12 @@ public class HTTPClient {
562568
case .testOnly_exact(_, delegateOn: let delegateEL):
563569
taskEL = delegateEL
564570
}
571+
565572
logger.trace("selected EventLoop for task given the preference",
566573
metadata: ["ahc-eventloop": "\(taskEL)",
567574
"ahc-el-preference": "\(eventLoopPreference)"])
575+
576+
delegate.provideSharedThreadPool(fileIOPool: self.fileIOThreadPool)
568577

569578
let failedTask: Task<Delegate.Response>? = self.stateLock.withLock {
570579
switch state {

Sources/AsyncHTTPClient/HTTPHandler.swift

+16-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Foundation
1616
import Logging
1717
import NIOConcurrencyHelpers
1818
import NIOCore
19+
import NIOPosix
1920
import NIOHTTP1
2021
import NIOSSL
2122

@@ -430,7 +431,14 @@ public class ResponseAccumulator: HTTPClientResponseDelegate {
430431
/// object that implements this protocol, but may do so if needed.
431432
public protocol HTTPClientResponseDelegate: AnyObject {
432433
associatedtype Response
433-
434+
435+
/// Called exactly once before any other methods of this delegate are called.
436+
/// It is used to give access to the shared thread pool of the ``HTTPClient`` the request is executed on.
437+
/// Use this thread pool to do file IO associated with the request e.g. to save a response to disk.
438+
///
439+
/// - Parameter fileIOPool: File IO Pool
440+
func provideSharedThreadPool(fileIOPool: NIOThreadPool)
441+
434442
/// Called when the request head is sent. Will be called once.
435443
///
436444
/// - parameters:
@@ -502,7 +510,13 @@ public protocol HTTPClientResponseDelegate: AnyObject {
502510
}
503511

504512
extension HTTPClientResponseDelegate {
505-
/// Default implementation of ``HTTPClientResponseDelegate/didSendRequestHead(task:_:)-6khai``.
513+
514+
/// Default implementation of ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-8y1b``
515+
///
516+
/// By default, this does nothing.
517+
public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {}
518+
519+
/// Default implementation of ``HTTPClientResponseDelegate/didSendRequest(task:)-9od5p``.
506520
///
507521
/// By default, this does nothing.
508522
public func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {}

0 commit comments

Comments
 (0)