Skip to content

Async-await: Base types for client implementation #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
20511ca
Async-await: Implement client call types
simonjbeaumont Jul 6, 2021
09a8d2f
fixup: Apply some of swiftformat suggestions
simonjbeaumont Aug 20, 2021
3cd7a9a
fixup: Ignore redundantGet swiftformat rule on effectful properties
simonjbeaumont Aug 20, 2021
54fd85e
fixup: Add missing compiler guards to AsyncClientCall.swift
simonjbeaumont Aug 20, 2021
d37ace0
fixup: Remove subchannel property from Call APIs
simonjbeaumont Aug 25, 2021
a5d683b
fixup: Prefix all new types with GRPC
simonjbeaumont Aug 25, 2021
6f4a32f
fixup: Put case statements on own line
simonjbeaumont Aug 25, 2021
a94d5cd
fixup: Add TODO to look at the suspension in sendMessage
simonjbeaumont Aug 25, 2021
43eab0b
fixup: Rename {Server,Bidirectional}StreamingCall.responseStream to .…
simonjbeaumont Aug 25, 2021
7b88a7f
fixup: Drop Payload suffix from {Request,Response}Payload type params
simonjbeaumont Aug 25, 2021
c4ad3d8
fixup: Remove vestigial comment about future
simonjbeaumont Aug 25, 2021
9c5f430
fixup: Use map reduce on async sequence in tests
simonjbeaumont Aug 25, 2021
d639a32
fixup: Remove sleeps in tests
simonjbeaumont Aug 25, 2021
a04ef2a
fixup: remove public from XCTAsyncTest and move to XCTestHelpers
simonjbeaumont Aug 25, 2021
e3e3f07
fixup: Improve XCTAsyncTest logging and timeout
simonjbeaumont Aug 25, 2021
770106e
fixup: Rename GRPCClient+Extensions and GRPCChannel+Extensions
simonjbeaumont Aug 25, 2021
6896061
fixup: Rerun swiftformat
simonjbeaumont Aug 25, 2021
da1cd58
fixup: Use makeAndInvoke pattern across all RPC types
simonjbeaumont Aug 26, 2021
6a34d96
fixup: Rename GRPCAsyncStream -> GRPCAsyncRequestStream
simonjbeaumont Aug 26, 2021
99a6292
fixup: Remove all the protocols
simonjbeaumont Aug 26, 2021
497c8a0
fixup: Rename GRPCAsyncRequestStream to GRPCAsyncResponseStream
simonjbeaumont Aug 26, 2021
135935c
fixup: Add API docs incl. warning about awaiting status/trailers
simonjbeaumont Aug 26, 2021
e8c1059
fixup: Drop __consuming from makeAsyncIterator
simonjbeaumont Aug 31, 2021
8babddc
fixup: Move #if compiler guards above imports
simonjbeaumont Aug 31, 2021
ebb4715
fixup: Remove duplicate test
simonjbeaumont Aug 31, 2021
21a06b6
fixup: Remove all the unnecessary imports
simonjbeaumont Aug 31, 2021
4c99b1f
fixup: Extend tests with checks for response headers and trailers
simonjbeaumont Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ let package = Package(
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "_NIOConcurrency", package: "swift-nio"),
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
.product(name: "NIOHTTP1", package: "swift-nio"),
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

import _NIOConcurrency
import NIOHPACK

/// Async-await variant of BidirectionalStreamingCall.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
private let call: Call<Request, Response>
private let responseParts: StreamingResponseParts<Response>

/// The stream of responses from the server.
public let responses: GRPCAsyncResponseStream<Response>

/// The options used to make the RPC.
public var options: CallOptions {
return self.call.options
}

/// Cancel this RPC if it hasn't already completed.
public func cancel() async throws {
try await self.call.cancel().get()
}

// MARK: - Response Parts

/// The initial metadata returned from the server.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
try await self.responseParts.initialMetadata.get()
}
}

/// The trailing metadata returned from the server.
///
/// - Important: Awaiting this property will suspend until the responses have been consumed.
public var trailingMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
try await self.responseParts.trailingMetadata.get()
}
}

/// The final status of the the RPC.
///
/// - Important: Awaiting this property will suspend until the responses have been consumed.
public var status: GRPCStatus {
// swiftformat:disable:next redundantGet
get async {
// force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
try! await self.responseParts.status.get()
}
}

private init(call: Call<Request, Response>) {
self.call = call
// Initialise `responseParts` with an empty response handler because we
// provide the responses as an AsyncSequence in `responseStream`.
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }

// Call and StreamingResponseParts are reference types so we grab a
// referecence to them here to avoid capturing mutable self in the closure
// passed to the AsyncThrowingStream initializer.
//
// The alternative would be to declare the responseStream as:
// ```
// public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
// ```
//
// UPDATE: Additionally we expect to replace this soon with an AsyncSequence
// implementation that supports yielding values from outside the closure.
let call = self.call
let responseParts = self.responseParts
let responseStream = AsyncThrowingStream(Response.self) { continuation in
call.invokeStreamingRequests { error in
responseParts.handleError(error)
continuation.finish(throwing: error)
} onResponsePart: { responsePart in
responseParts.handle(responsePart)
switch responsePart {
case let .message(response):
continuation.yield(response)
case .metadata:
break
case .end:
continuation.finish()
}
}
}
self.responses = .init(responseStream)
}

/// We expose this as the only non-private initializer so that the caller
/// knows that invocation is part of initialisation.
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
Self(call: call)
}

// MARK: - Requests

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - message: The message to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did we land with GRPCAsync{Client,Bidirectional}StreamingCall having a request-stream or similar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an unresolved thread above which I found a reply that I wrote but forgot to send. It's here: #1243 (comment).

I thought we were deferring this until we had the writer that supports backpressure?

_ message: Request,
compression: Compression = .deferToCallDefault
) async throws {
let compress = self.call.compress(compression)
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
try await promise.futureResult.get()
}

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessages<S>(
_ messages: S,
compression: Compression = .deferToCallDefault
) async throws where S: Sequence, S.Element == Request {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.sendMessages(messages, compression: compression, promise: promise)
try await promise.futureResult.get()
}

/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
public func sendEnd() async throws {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.end, promise: promise)
try await promise.futureResult.get()
}
}

#endif
138 changes: 138 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

import NIOHPACK

/// Async-await variant of `ClientStreamingCall`.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncClientStreamingCall<Request, Response> {
private let call: Call<Request, Response>
private let responseParts: UnaryResponseParts<Response>

/// The options used to make the RPC.
public var options: CallOptions {
return self.call.options
}

/// Cancel this RPC if it hasn't already completed.
public func cancel() async throws {
try await self.call.cancel().get()
}

// MARK: - Response Parts

/// The initial metadata returned from the server.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
try await self.responseParts.initialMetadata.get()
}
}

/// The response returned by the server.
public var response: Response {
// swiftformat:disable:next redundantGet
get async throws {
try await self.responseParts.response.get()
}
}

/// The trailing metadata returned from the server.
///
/// - Important: Awaiting this property will suspend until the responses have been consumed.
public var trailingMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
try await self.responseParts.trailingMetadata.get()
}
}

/// The final status of the the RPC.
///
/// - Important: Awaiting this property will suspend until the responses have been consumed.
public var status: GRPCStatus {
// swiftformat:disable:next redundantGet
get async {
// force-try acceptable because any error is encapsulated in a successful GRPCStatus future.
try! await self.responseParts.status.get()
}
}

private init(call: Call<Request, Response>) {
self.call = call
self.responseParts = UnaryResponseParts(on: call.eventLoop)
self.call.invokeStreamingRequests(
onError: self.responseParts.handleError(_:),
onResponsePart: self.responseParts.handle(_:)
)
}

/// We expose this as the only non-private initializer so that the caller
/// knows that invocation is part of initialisation.
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
Self(call: call)
}

// MARK: - Requests

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - message: The message to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessage(
_ message: Request,
compression: Compression = .deferToCallDefault
) async throws {
let compress = self.call.compress(compression)
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
try await promise.futureResult.get()
}

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
public func sendMessages<S>(
_ messages: S,
compression: Compression = .deferToCallDefault
) async throws where S: Sequence, S.Element == Request {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.sendMessages(messages, compression: compression, promise: promise)
try await promise.futureResult.get()
}

/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
public func sendEnd() async throws {
let promise = self.call.eventLoop.makePromise(of: Void.self)
self.call.send(.end, promise: promise)
try await promise.futureResult.get()
}
}

#endif
52 changes: 52 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncResponseStream<Element>: AsyncSequence {
@usableFromInline
internal typealias WrappedStream = AsyncThrowingStream<Element, Error>

@usableFromInline
internal let stream: WrappedStream

@inlinable
internal init(_ stream: WrappedStream) {
self.stream = stream
}

public func makeAsyncIterator() -> Iterator {
Self.AsyncIterator(self.stream)
}

public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: WrappedStream.AsyncIterator

fileprivate init(_ stream: WrappedStream) {
self.iterator = stream.makeAsyncIterator()
}

@inlinable
public mutating func next() async throws -> Element? {
try await self.iterator.next()
}
}
}

#endif
Loading