Skip to content

Commit 7ad0403

Browse files
committed
Add an internal pausable writer
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
1 parent a14c464 commit 7ad0403

File tree

2 files changed

+519
-0
lines changed

2 files changed

+519
-0
lines changed
Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.5)
18+
import NIOConcurrencyHelpers
19+
import NIOCore
20+
21+
/// An asynchronous writer which forwards messages to a delegate.
22+
///
23+
/// Forwarding of messages to the delegate may be paused and resumed by controlling the writability
24+
/// of the writer. This may be controlled by calls to ``toggleWritability()``. When the writer is
25+
/// paused (by becoming unwritable) calls to ``write(_:)`` may suspend. When the writer is resumed
26+
/// (by becoming writable) any calls which are suspended may be resumed.
27+
///
28+
/// The writer must also be "finished" with a final value: as for writing, calls to ``finish(_:)``
29+
/// may suspend if the writer has been paused.
30+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
31+
@usableFromInline
32+
internal struct AsyncWriter<Delegate: AsyncWriterDelegate> {
33+
@usableFromInline
34+
internal typealias Element = Delegate.Element
35+
36+
@usableFromInline
37+
internal typealias End = Delegate.End
38+
39+
@usableFromInline
40+
internal let _storage: AsyncWriterStorage<Delegate>
41+
42+
@inlinable
43+
internal init(maxPendingElements: Int = 16, maxWritesBeforeYield: Int = 5, delegate: Delegate) {
44+
self._storage = AsyncWriterStorage(
45+
maxPendingElements: maxPendingElements,
46+
maxWritesBeforeYield: maxWritesBeforeYield,
47+
delegate: delegate
48+
)
49+
}
50+
51+
/// Toggles whether the writer is writable or not. The writer is initially writable.
52+
///
53+
/// If the writer becomes writable then it may resume writes to the delegate. If it becomes
54+
/// unwritable then calls to `write` may suspend until the writability changes again.
55+
///
56+
/// This API does not offer explicit control over the writability state so the caller must ensure
57+
/// calls to this function correspond with changes in writability. The reason for this is that the
58+
/// underlying type is an `actor` and updating its state is therefore asynchronous. However,
59+
/// this functions is not called from an asynchronous context so it is not possible to `await`
60+
/// state updates to complete. Instead, changing the state on the underlying type is via
61+
/// a `nonisolated` function on the `actor` which spawns a new task. If this or a similar API
62+
/// allowed the writability to be explicitly set then calls to that API are not guaranteed to be
63+
/// ordered which may lead to deadlock.
64+
@usableFromInline
65+
internal func toggleWritability() {
66+
self._storage.toggleWritability()
67+
}
68+
69+
/// As ``toggleWritability`` but `async` -- intended for testing only.
70+
internal func testingOnly_toggleWritability() async {
71+
await self._storage._toggleWritability()
72+
}
73+
74+
/// Write an `element`.
75+
///
76+
/// The call may be suspend if the writer is paused.
77+
///
78+
/// Throws: ``AyncWriterError`` if the writer has already been finished or too many write tasks
79+
/// have been suspended.
80+
@inlinable
81+
internal func write(_ element: Element) async throws {
82+
try await self._storage.write(element)
83+
}
84+
85+
/// Write the final element
86+
@inlinable
87+
internal func finish(_ end: End) async throws {
88+
try await self._storage.finish(end)
89+
}
90+
}
91+
92+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
93+
@usableFromInline
94+
internal final actor AsyncWriterStorage<Delegate: AsyncWriterDelegate> {
95+
@usableFromInline
96+
internal typealias Element = Delegate.Element
97+
98+
@usableFromInline
99+
internal typealias End = Delegate.End
100+
101+
/// A value pending a write.
102+
@usableFromInline
103+
internal struct _Pending<Value> {
104+
@usableFromInline
105+
var value: Value
106+
107+
@usableFromInline
108+
var continuation: CheckedContinuation<Void, Error>
109+
110+
@inlinable
111+
internal init(_ value: Value, continuation: CheckedContinuation<Void, Error>) {
112+
self.value = value
113+
self.continuation = continuation
114+
}
115+
}
116+
117+
@usableFromInline
118+
typealias PendingElement = _Pending<Element>
119+
120+
@usableFromInline
121+
typealias PendingEnd = _Pending<End>
122+
123+
@usableFromInline
124+
internal enum _CompletionState {
125+
/// Finish hasn't been called yet. May move to `pending` or `completed`.
126+
case incomplete
127+
/// Finish has been called but the writer is paused. May move to `completed`.
128+
case pending(PendingEnd)
129+
/// The completion message has been sent to the delegate. This is a terminal state.
130+
case completed
131+
132+
/// Move from `pending` to `completed` and return the `PendingCompletion`. Returns `nil` if
133+
/// the state was not `pending`.
134+
@inlinable
135+
mutating func completeIfPending() -> PendingEnd? {
136+
switch self {
137+
case let .pending(pending):
138+
self = .completed
139+
return pending
140+
case .incomplete, .completed:
141+
return nil
142+
}
143+
}
144+
145+
@usableFromInline
146+
var isPendingOrCompleted: Bool {
147+
switch self {
148+
case .incomplete:
149+
return false
150+
case .pending, .completed:
151+
return true
152+
}
153+
}
154+
}
155+
156+
/// The maximum number of pending elements. `pendingElements` must not grow beyond this limit.
157+
@usableFromInline
158+
internal let _maxPendingElements: Int
159+
160+
/// The maximum number of writes to the delegate made in `resume` before yielding to allow other
161+
/// values to be queued.
162+
@usableFromInline
163+
internal let _maxWritesBeforeYield: Int
164+
165+
/// Elements and continuations which have been buffered but are awaiting consumption by the
166+
/// delegate.
167+
@usableFromInline
168+
internal var _pendingElements: CircularBuffer<PendingElement>
169+
170+
/// The completion state of the writer.
171+
@usableFromInline
172+
internal var _completionState: _CompletionState
173+
174+
/// Whether the writer is paused.
175+
@usableFromInline
176+
internal var _isPaused: Bool = false
177+
178+
/// The delegate to process elements. By convention we call the delegate before resuming any
179+
/// continuation.
180+
@usableFromInline
181+
internal let _delegate: Delegate
182+
183+
@inlinable
184+
internal init(
185+
maxPendingElements: Int,
186+
maxWritesBeforeYield: Int,
187+
delegate: Delegate
188+
) {
189+
self._maxPendingElements = maxPendingElements
190+
self._maxWritesBeforeYield = maxWritesBeforeYield
191+
self._pendingElements = CircularBuffer(initialCapacity: maxPendingElements)
192+
self._completionState = .incomplete
193+
self._delegate = delegate
194+
}
195+
196+
deinit {
197+
switch self._completionState {
198+
case .completed:
199+
()
200+
case .incomplete, .pending:
201+
assertionFailure("writer has not completed is pending completion")
202+
}
203+
}
204+
205+
@usableFromInline
206+
internal nonisolated func toggleWritability() {
207+
Task {
208+
await self._toggleWritability()
209+
}
210+
}
211+
212+
@usableFromInline
213+
internal func _toggleWritability() async {
214+
if self._isPaused {
215+
self._isPaused = false
216+
await self.resumeWriting()
217+
} else {
218+
self._isPaused = true
219+
}
220+
}
221+
222+
private func resumeWriting() async {
223+
var writes = 0
224+
225+
while !self._isPaused {
226+
if let pendingElement = self._pendingElements.popFirst() {
227+
self._delegate.write(pendingElement.value)
228+
pendingElement.continuation.resume()
229+
} else if let pendingCompletion = self._completionState.completeIfPending() {
230+
self._delegate.writeEnd(pendingCompletion.value)
231+
pendingCompletion.continuation.resume()
232+
} else {
233+
break
234+
}
235+
236+
// `writes` will never exceed `maxWritesBeforeYield` so unchecked arithmetic is okay here.
237+
writes &+= 1
238+
if writes == self._maxWritesBeforeYield {
239+
writes = 0
240+
// We yield every so often to let the delegate (i.e. 'NIO.Channel') catch up since it may
241+
// decide it is no longer writable.
242+
await Task.yield()
243+
}
244+
}
245+
}
246+
247+
@inlinable
248+
internal func write(_ element: Element) async throws {
249+
try await withCheckedThrowingContinuation { continuation in
250+
self._write(element, continuation: continuation)
251+
}
252+
}
253+
254+
@inlinable
255+
internal func _write(_ element: Element, continuation: CheckedContinuation<Void, Error>) {
256+
// There are three outcomes of writing:
257+
// - write the element directly (if the writer isn't paused and no writes are pending)
258+
// - queue the element (the writer is paused or there are writes already pending)
259+
// - error (the writer is complete or the queue is full).
260+
261+
if self._completionState.isPendingOrCompleted {
262+
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
263+
} else if !self._isPaused, self._pendingElements.isEmpty {
264+
self._delegate.write(element)
265+
continuation.resume()
266+
} else if self._pendingElements.count < self._maxPendingElements {
267+
// The continuation will be resumed later.
268+
self._pendingElements.append(PendingElement(element, continuation: continuation))
269+
} else {
270+
continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
271+
}
272+
}
273+
274+
@inlinable
275+
internal func finish(_ end: End) async throws {
276+
try await withCheckedThrowingContinuation { continuation in
277+
self._finish(end, continuation: continuation)
278+
}
279+
}
280+
281+
@inlinable
282+
internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
283+
if self._completionState.isPendingOrCompleted {
284+
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
285+
} else if !self._isPaused, self._pendingElements.isEmpty {
286+
self._completionState = .completed
287+
self._delegate.writeEnd(end)
288+
continuation.resume()
289+
} else {
290+
// Either we're paused or there are pending writes which must be consumed first.
291+
self._completionState = .pending(PendingEnd(end, continuation: continuation))
292+
}
293+
}
294+
}
295+
296+
@usableFromInline
297+
internal enum AsyncWriterError: Error, Hashable {
298+
case tooManyPendingWrites
299+
case alreadyFinished
300+
}
301+
302+
@usableFromInline
303+
internal protocol AsyncWriterDelegate: AnyObject {
304+
associatedtype Element
305+
associatedtype End
306+
307+
@inlinable
308+
func write(_ element: Element)
309+
310+
@inlinable
311+
func writeEnd(_ end: End)
312+
}
313+
314+
#endif // compiler(>=5.5)

0 commit comments

Comments
 (0)