Skip to content

Commit 42e0703

Browse files
Make use of new stream source and pausable writer
Signed-off-by: Si Beaumont <[email protected]>
1 parent bc6bc0d commit 42e0703

6 files changed

+315
-114
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.5)
18+
19+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
20+
extension CancellationError: GRPCStatusTransformable {
21+
public func makeGRPCStatus() -> GRPCStatus {
22+
return GRPCStatus(code: .unavailable, message: nil)
23+
}
24+
}
25+
26+
#endif

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
2222
public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
2323
@usableFromInline
24-
internal typealias _WrappedStream = AsyncThrowingStream<Element, Error>
24+
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
2525

2626
@usableFromInline
2727
internal let _stream: _WrappedStream

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift

+42-8
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,48 @@
1717
#if compiler(>=5.5)
1818

1919
/// Writer for server-streaming RPC handlers to provide responses.
20-
///
21-
/// NOTE: This will be replaced by a pausible writer that is currently being worked on in parallel.
2220
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
2321
public struct GRPCAsyncResponseStreamWriter<Response> {
22+
@usableFromInline
23+
internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
24+
25+
@usableFromInline
26+
internal let _asyncWriter: AsyncWriter<Delegate>
27+
28+
@inlinable
29+
internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
30+
self._asyncWriter = asyncWriter
31+
}
32+
33+
@inlinable
34+
public func send(
35+
_ response: Response,
36+
compression: Compression = .deferToCallDefault
37+
) async throws {
38+
try await _asyncWriter.write((response, compression))
39+
}
40+
}
41+
42+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
43+
@usableFromInline
44+
internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
2445
@usableFromInline
2546
internal let _context: GRPCAsyncServerCallContext
2647

2748
@usableFromInline
28-
internal let _send: (Response, MessageMetadata) async throws -> Void
49+
internal let _send: (Response, MessageMetadata) -> Void
2950

3051
@usableFromInline
3152
internal let _compressionEnabledOnServer: Bool
3253

33-
// Create a new AsyncResponseStreamWriter.
54+
// Create a new AsyncResponseStreamWriterDelegate.
3455
//
3556
// - Important: the `send` closure must be thread-safe.
3657
@inlinable
3758
internal init(
3859
context: GRPCAsyncServerCallContext,
3960
compressionIsEnabled: Bool,
40-
send: @escaping (Response, MessageMetadata) async throws -> Void
61+
send: @escaping (Response, MessageMetadata) -> Void
4162
) {
4263
self._context = context
4364
self._compressionEnabledOnServer = compressionIsEnabled
@@ -53,12 +74,25 @@ public struct GRPCAsyncResponseStreamWriter<Response> {
5374
}
5475

5576
@inlinable
56-
public func send(
77+
internal func send(
5778
_ response: Response,
5879
compression: Compression = .deferToCallDefault
59-
) async throws {
80+
) {
6081
let compress = self.shouldCompress(compression)
61-
try await self._send(response, .init(compress: compress, flush: true))
82+
self._send(response, .init(compress: compress, flush: true))
83+
}
84+
85+
// MARK: - AsyncWriterDelegate conformance.
86+
87+
@inlinable
88+
internal func write(_ response: (Response, Compression)) {
89+
self.send(response.0, compression: response.1)
90+
}
91+
92+
@inlinable
93+
internal func writeEnd(_ end: Void) {
94+
// meh.
95+
// TODO: is this where will move the state on to completed somehow?
6296
}
6397
}
6498

0 commit comments

Comments
 (0)