Skip to content

Commit bae3349

Browse files
authored
Add an internal pausable writer (#1245)
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
1 parent a14c464 commit bae3349

File tree

2 files changed

+606
-0
lines changed

2 files changed

+606
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
import NIOCore
18+
19+
/// An asynchronous writer which forwards messages to a delegate.
20+
///
21+
/// Forwarding of messages to the delegate may be paused and resumed by controlling the writability
22+
/// of the writer. This may be controlled by calls to ``toggleWritability()``. When the writer is
23+
/// paused (by becoming unwritable) calls to ``write(_:)`` may suspend. When the writer is resumed
24+
/// (by becoming writable) any calls which are suspended may be resumed.
25+
///
26+
/// The writer must also be "finished" with a final value: as for writing, calls to ``finish(_:)``
27+
/// may suspend if the writer has been paused.
28+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
29+
@usableFromInline
30+
internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
31+
@usableFromInline
32+
internal typealias Element = Delegate.Element
33+
34+
@usableFromInline
35+
internal typealias End = Delegate.End
36+
37+
/// A value pending a write.
38+
@usableFromInline
39+
internal struct _Pending<Value> {
40+
@usableFromInline
41+
var value: Value
42+
43+
@usableFromInline
44+
var continuation: CheckedContinuation<Void, Error>
45+
46+
@inlinable
47+
internal init(_ value: Value, continuation: CheckedContinuation<Void, Error>) {
48+
self.value = value
49+
self.continuation = continuation
50+
}
51+
}
52+
53+
@usableFromInline
54+
typealias PendingElement = _Pending<Element>
55+
56+
@usableFromInline
57+
typealias PendingEnd = _Pending<End>
58+
59+
@usableFromInline
60+
internal enum _CompletionState {
61+
/// Finish hasn't been called yet. May move to `pending` or `completed`.
62+
case incomplete
63+
/// Finish has been called but the writer is paused. May move to `completed`.
64+
case pending(PendingEnd)
65+
/// The completion message has been sent to the delegate. This is a terminal state.
66+
case completed
67+
68+
/// Move from `pending` to `completed` and return the `PendingCompletion`. Returns `nil` if
69+
/// the state was not `pending`.
70+
@inlinable
71+
mutating func completeIfPending() -> PendingEnd? {
72+
switch self {
73+
case let .pending(pending):
74+
self = .completed
75+
return pending
76+
case .incomplete, .completed:
77+
return nil
78+
}
79+
}
80+
81+
@usableFromInline
82+
var isPendingOrCompleted: Bool {
83+
switch self {
84+
case .incomplete:
85+
return false
86+
case .pending, .completed:
87+
return true
88+
}
89+
}
90+
}
91+
92+
/// The maximum number of pending elements. `pendingElements` must not grow beyond this limit.
93+
@usableFromInline
94+
internal let _maxPendingElements: Int
95+
96+
/// The maximum number of writes to the delegate made in `resume` before yielding to allow other
97+
/// values to be queued.
98+
@usableFromInline
99+
internal let _maxWritesBeforeYield: Int
100+
101+
/// Elements and continuations which have been buffered but are awaiting consumption by the
102+
/// delegate.
103+
@usableFromInline
104+
internal var _pendingElements: CircularBuffer<PendingElement>
105+
106+
/// The completion state of the writer.
107+
@usableFromInline
108+
internal var _completionState: _CompletionState
109+
110+
/// Whether the writer is paused.
111+
@usableFromInline
112+
internal var _isPaused: Bool = false
113+
114+
/// The delegate to process elements. By convention we call the delegate before resuming any
115+
/// continuation.
116+
@usableFromInline
117+
internal let _delegate: Delegate
118+
119+
@inlinable
120+
internal init(
121+
maxPendingElements: Int = 16,
122+
maxWritesBeforeYield: Int = 5,
123+
delegate: Delegate
124+
) {
125+
self._maxPendingElements = maxPendingElements
126+
self._maxWritesBeforeYield = maxWritesBeforeYield
127+
self._pendingElements = CircularBuffer(initialCapacity: maxPendingElements)
128+
self._completionState = .incomplete
129+
self._delegate = delegate
130+
}
131+
132+
deinit {
133+
switch self._completionState {
134+
case .completed:
135+
()
136+
case .incomplete, .pending:
137+
assertionFailure("writer has not completed is pending completion")
138+
}
139+
}
140+
141+
/// As ``toggleWritability()`` but executed asynchronously.
142+
@usableFromInline
143+
internal nonisolated func toggleWritabilityAsynchronously() {
144+
Task {
145+
await self.toggleWritability()
146+
}
147+
}
148+
149+
/// Toggles whether the writer is writable or not. The writer is initially writable.
150+
///
151+
/// If the writer becomes writable then it may resume writes to the delegate. If it becomes
152+
/// unwritable then calls to `write` may suspend until the writability changes again.
153+
///
154+
/// This API does not offer explicit control over the writability state so the caller must ensure
155+
/// calls to this function correspond with changes in writability. The reason for this is that the
156+
/// underlying type is an `actor` and updating its state is therefore asynchronous. However,
157+
/// this functions is not called from an asynchronous context so it is not possible to `await`
158+
/// state updates to complete. Instead, changing the state is via a `nonisolated` function on
159+
/// the `actor` which spawns a new task. If this or a similar API allowed the writability to be
160+
/// explicitly set then calls to that API are not guaranteed to be ordered which may lead to
161+
/// deadlock.
162+
@usableFromInline
163+
internal func toggleWritability() async {
164+
if self._isPaused {
165+
self._isPaused = false
166+
await self.resumeWriting()
167+
} else {
168+
self._isPaused = true
169+
}
170+
}
171+
172+
private func resumeWriting() async {
173+
var writes = 0
174+
175+
while !self._isPaused {
176+
if let pendingElement = self._pendingElements.popFirst() {
177+
self._delegate.write(pendingElement.value)
178+
pendingElement.continuation.resume()
179+
} else if let pendingCompletion = self._completionState.completeIfPending() {
180+
self._delegate.writeEnd(pendingCompletion.value)
181+
pendingCompletion.continuation.resume()
182+
} else {
183+
break
184+
}
185+
186+
// `writes` will never exceed `maxWritesBeforeYield` so unchecked arithmetic is okay here.
187+
writes &+= 1
188+
if writes == self._maxWritesBeforeYield {
189+
writes = 0
190+
// We yield every so often to let the delegate (i.e. 'NIO.Channel') catch up since it may
191+
// decide it is no longer writable.
192+
await Task.yield()
193+
}
194+
}
195+
}
196+
197+
/// As ``cancel()`` but executed asynchronously.
198+
@usableFromInline
199+
internal nonisolated func cancelAsynchronously() {
200+
Task {
201+
await self.cancel()
202+
}
203+
}
204+
205+
/// Cancel all pending writes.
206+
///
207+
/// Any pending writes will be dropped and their continuations will be resumed with
208+
/// a `CancellationError`. Any writes after cancellation has completed will also fail.
209+
@usableFromInline
210+
internal func cancel() {
211+
// If there's an end we should fail that last.
212+
let pendingEnd: PendingEnd?
213+
214+
// Mark our state as completed before resuming any continuations (any future writes should fail
215+
// immediately).
216+
switch self._completionState {
217+
case .incomplete:
218+
pendingEnd = nil
219+
self._completionState = .completed
220+
221+
case let .pending(pending):
222+
pendingEnd = pending
223+
self._completionState = .completed
224+
225+
case .completed:
226+
pendingEnd = nil
227+
}
228+
229+
let cancellationError = CancellationError()
230+
231+
while let pending = self._pendingElements.popFirst() {
232+
pending.continuation.resume(throwing: cancellationError)
233+
}
234+
235+
pendingEnd?.continuation.resume(throwing: cancellationError)
236+
}
237+
238+
/// Write an `element`.
239+
///
240+
/// The call may be suspend if the writer is paused.
241+
///
242+
/// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
243+
/// have been suspended.
244+
@inlinable
245+
internal func write(_ element: Element) async throws {
246+
try await withCheckedThrowingContinuation { continuation in
247+
self._write(element, continuation: continuation)
248+
}
249+
}
250+
251+
@inlinable
252+
internal func _write(_ element: Element, continuation: CheckedContinuation<Void, Error>) {
253+
// There are three outcomes of writing:
254+
// - write the element directly (if the writer isn't paused and no writes are pending)
255+
// - queue the element (the writer is paused or there are writes already pending)
256+
// - error (the writer is complete or the queue is full).
257+
258+
if self._completionState.isPendingOrCompleted {
259+
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
260+
} else if !self._isPaused, self._pendingElements.isEmpty {
261+
self._delegate.write(element)
262+
continuation.resume()
263+
} else if self._pendingElements.count < self._maxPendingElements {
264+
// The continuation will be resumed later.
265+
self._pendingElements.append(PendingElement(element, continuation: continuation))
266+
} else {
267+
continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
268+
}
269+
}
270+
271+
/// Write the final element
272+
@inlinable
273+
internal func finish(_ end: End) async throws {
274+
try await withCheckedThrowingContinuation { continuation in
275+
self._finish(end, continuation: continuation)
276+
}
277+
}
278+
279+
@inlinable
280+
internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
281+
if self._completionState.isPendingOrCompleted {
282+
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
283+
} else if !self._isPaused, self._pendingElements.isEmpty {
284+
self._completionState = .completed
285+
self._delegate.writeEnd(end)
286+
continuation.resume()
287+
} else {
288+
// Either we're paused or there are pending writes which must be consumed first.
289+
self._completionState = .pending(PendingEnd(end, continuation: continuation))
290+
}
291+
}
292+
}
293+
294+
@usableFromInline
295+
internal enum AsyncWriterError: Error, Hashable {
296+
case tooManyPendingWrites
297+
case alreadyFinished
298+
}
299+
300+
@usableFromInline
301+
internal protocol AsyncWriterDelegate: AnyObject {
302+
associatedtype Element
303+
associatedtype End
304+
305+
@inlinable
306+
func write(_ element: Element)
307+
308+
@inlinable
309+
func writeEnd(_ end: End)
310+
}
311+
312+
#endif // compiler(>=5.5)

0 commit comments

Comments
 (0)