Skip to content

Commit 03010c7

Browse files
authored
Extend lifetime of client interceptor pipeline (grpc#1265)
Motivation: A client call (i.e. the object the user holds) may live longer than the transport associated with it (roughly speaking, the http/2 stream channel). An example of this is when interceptors are use to retry and RPC and redirect responses back to the original call. However, the interceptor pipeline is held by the transport and is currently set to nil when the transport is removed from the channel. This means events invoked from the call object (such as cancellation) which go via the transport (holding the interceptor pipeline) are incorrectly failed. Modifications: - Have the client interceptor pipeline break the ref cycle between the transport and itself when the interceptor pipeline closes rather than when the transport is closed - Emit a cancellation status rater than error on cancellation - Update the ordering of when close is called in the interceptor pipeline. - Add and update tests Result: "sub"-RPCs may be cancelled.
1 parent 8189eee commit 03010c7

File tree

6 files changed

+324
-25
lines changed

6 files changed

+324
-25
lines changed

Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,17 @@ internal final class ClientInterceptorPipeline<Request, Response> {
7979
internal let _errorDelegate: ClientErrorDelegate?
8080

8181
@usableFromInline
82-
internal let _onError: (Error) -> Void
82+
internal private(set) var _onError: ((Error) -> Void)?
8383

8484
@usableFromInline
85-
internal let _onCancel: (EventLoopPromise<Void>?) -> Void
85+
internal private(set) var _onCancel: ((EventLoopPromise<Void>?) -> Void)?
8686

8787
@usableFromInline
88-
internal let _onRequestPart: (GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void
88+
internal private(set) var _onRequestPart:
89+
((GRPCClientRequestPart<Request>, EventLoopPromise<Void>?) -> Void)?
8990

9091
@usableFromInline
91-
internal let _onResponsePart: (GRPCClientResponsePart<Response>) -> Void
92+
internal private(set) var _onResponsePart: ((GRPCClientResponsePart<Response>) -> Void)?
9293

9394
/// The index after the last user interceptor context index. (i.e. `_userContexts.endIndex`).
9495
@usableFromInline
@@ -217,9 +218,13 @@ internal final class ClientInterceptorPipeline<Request, Response> {
217218

218219
case self._tailIndex:
219220
if part.isEnd {
221+
// Update our state before handling the response part.
222+
self._isOpen = false
223+
self._onResponsePart?(part)
220224
self.close()
225+
} else {
226+
self._onResponsePart?(part)
221227
}
222-
self._onResponsePart(part)
223228

224229
default:
225230
self._userContexts[index].invokeReceive(part)
@@ -275,9 +280,8 @@ internal final class ClientInterceptorPipeline<Request, Response> {
275280
/// Handles a caught error which has traversed the interceptor pipeline.
276281
@usableFromInline
277282
internal func _errorCaught(_ error: Error) {
278-
// We're about to complete, close the pipeline.
279-
self.close()
280-
283+
// We're about to call out to an error handler: update our state first.
284+
self._isOpen = false
281285
var unwrappedError: Error
282286

283287
// Unwrap the error, if possible.
@@ -295,7 +299,10 @@ internal final class ClientInterceptorPipeline<Request, Response> {
295299
}
296300

297301
// Emit the unwrapped error.
298-
self._onError(unwrappedError)
302+
self._onError?(unwrappedError)
303+
304+
// Close the pipeline.
305+
self.close()
299306
}
300307

301308
/// Writes a request message into the interceptor pipeline.
@@ -351,7 +358,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
351358
) {
352359
switch index {
353360
case self._headIndex:
354-
self._onRequestPart(part, promise)
361+
self._onRequestPart?(part, promise)
355362

356363
case self._tailIndex:
357364
self._invokeSend(
@@ -407,7 +414,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
407414
) {
408415
switch index {
409416
case self._headIndex:
410-
self._onCancel(promise)
417+
self._onCancel?(promise)
411418

412419
case self._tailIndex:
413420
self._invokeCancel(
@@ -425,7 +432,7 @@ internal final class ClientInterceptorPipeline<Request, Response> {
425432

426433
extension ClientInterceptorPipeline {
427434
/// Closes the pipeline. This should be called once, by the tail interceptor, to indicate that
428-
/// the RPC has completed.
435+
/// the RPC has completed. If this is not called, we will leak.
429436
/// - Important: This *must* to be called from the `eventLoop`.
430437
@inlinable
431438
internal func close() {
@@ -437,7 +444,14 @@ extension ClientInterceptorPipeline {
437444
self._scheduledClose = nil
438445

439446
// Cancel the transport.
440-
self._onCancel(nil)
447+
self._onCancel?(nil)
448+
449+
// `ClientTransport` holds a reference to us and references to itself via these callbacks. Break
450+
// these references now by replacing the callbacks.
451+
self._onError = nil
452+
self._onCancel = nil
453+
self._onRequestPart = nil
454+
self._onResponsePart = nil
441455
}
442456

443457
/// Sets up a deadline for the pipeline.

Sources/GRPC/Interceptor/ClientTransport.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ internal final class ClientTransport<Request, Response> {
8585
// trailers here and only forward them when we receive the status.
8686
private var trailers: HPACKHeaders?
8787

88-
/// The interceptor pipeline connected to this transport. This must be set to `nil` when removed
89-
/// from the `ChannelPipeline` in order to break reference cycles.
88+
/// The interceptor pipeline connected to this transport. The pipeline also holds references
89+
/// to `self` which are dropped when the interceptor pipeline is closed.
9090
@usableFromInline
9191
internal var _pipeline: ClientInterceptorPipeline<Request, Response>?
9292

@@ -118,6 +118,7 @@ internal final class ClientTransport<Request, Response> {
118118
self.logger = logger
119119
self.serializer = serializer
120120
self.deserializer = deserializer
121+
// The references to self held by the pipeline are dropped when it is closed.
121122
self._pipeline = ClientInterceptorPipeline(
122123
eventLoop: eventLoop,
123124
details: details,
@@ -241,7 +242,8 @@ extension ClientTransport {
241242

242243
if self.state.cancel() {
243244
let error = GRPCError.RPCCancelledByClient()
244-
self.forwardErrorToInterceptors(error)
245+
let status = error.makeGRPCStatus()
246+
self.forwardToInterceptors(.end(status, [:]))
245247
self.failBufferedWrites(with: error)
246248
self.channel?.close(mode: .all, promise: nil)
247249
self.channelPromise?.fail(error)
@@ -363,11 +365,9 @@ extension ClientTransport {
363365
private func dropReferences() {
364366
if self.callEventLoop.inEventLoop {
365367
self.channel = nil
366-
self._pipeline = nil
367368
} else {
368369
self.callEventLoop.execute {
369370
self.channel = nil
370-
self._pipeline = nil
371371
}
372372
}
373373
}

Tests/GRPCTests/ClientCallTests.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,7 @@ class ClientCallTests: GRPCTestCase {
197197
// Cancellation should succeed.
198198
assertThat(try get.cancel().wait(), .doesNotThrow())
199199

200-
// The status promise will fail.
201-
assertThat(
202-
try promise.futureResult.wait(),
203-
.throws(.instanceOf(GRPCError.RPCCancelledByClient.self))
204-
)
200+
assertThat(try promise.futureResult.wait(), .hasCode(.cancelled))
205201

206202
// Cancellation should now fail, we've already cancelled.
207203
assertThat(try get.cancel().wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))

Tests/GRPCTests/ClientCancellingTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class ClientCancellingTests: EchoTestCaseBase {
2727
call.cancel(promise: nil)
2828

2929
call.response.whenFailure { error in
30-
XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
30+
XCTAssertEqual((error as? GRPCStatus)?.code, .cancelled)
3131
responseReceived.fulfill()
3232
}
3333

@@ -47,7 +47,7 @@ class ClientCancellingTests: EchoTestCaseBase {
4747
call.cancel(promise: nil)
4848

4949
call.response.whenFailure { error in
50-
XCTAssertTrue(error is GRPCError.RPCCancelledByClient)
50+
XCTAssertEqual((error as? GRPCStatus)?.code, .cancelled)
5151
responseReceived.fulfill()
5252
}
5353

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import EchoModel
17+
import GRPC
18+
19+
// MARK: - Client
20+
21+
internal final class EchoClientInterceptors: Echo_EchoClientInterceptorFactoryProtocol {
22+
internal typealias Factory = () -> ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>
23+
private var factories: [Factory] = []
24+
25+
internal init(_ factories: Factory...) {
26+
self.factories = factories
27+
}
28+
29+
internal func register(_ factory: @escaping Factory) {
30+
self.factories.append(factory)
31+
}
32+
33+
private func makeInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
34+
return self.factories.map { $0() }
35+
}
36+
37+
func makeGetInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
38+
return self.makeInterceptors()
39+
}
40+
41+
func makeExpandInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
42+
return self.makeInterceptors()
43+
}
44+
45+
func makeCollectInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
46+
return self.makeInterceptors()
47+
}
48+
49+
func makeUpdateInterceptors() -> [ClientInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
50+
return self.makeInterceptors()
51+
}
52+
}
53+
54+
// MARK: - Server
55+
56+
internal final class EchoServerInterceptors: Echo_EchoServerInterceptorFactoryProtocol {
57+
internal typealias Factory = () -> ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>
58+
private var factories: [Factory] = []
59+
60+
internal init(_ factories: Factory...) {
61+
self.factories = factories
62+
}
63+
64+
internal func register(_ factory: @escaping Factory) {
65+
self.factories.append(factory)
66+
}
67+
68+
private func makeInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
69+
return self.factories.map { $0() }
70+
}
71+
72+
func makeGetInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
73+
return self.makeInterceptors()
74+
}
75+
76+
func makeExpandInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
77+
return self.makeInterceptors()
78+
}
79+
80+
func makeCollectInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
81+
return self.makeInterceptors()
82+
}
83+
84+
func makeUpdateInterceptors() -> [ServerInterceptor<Echo_EchoRequest, Echo_EchoResponse>] {
85+
return self.makeInterceptors()
86+
}
87+
}

0 commit comments

Comments
 (0)