@@ -28,35 +28,43 @@ import NIOCore
28
28
/// The source must be finished exactly once by calling ``finish()`` or ``finish(throwing:)`` to
29
29
/// indicate that the sequence should end with an error.
30
30
@available ( macOS 12 , iOS 15 , tvOS 15 , watchOS 8 , * )
31
+ @usableFromInline
31
32
internal final class PassthroughMessageSource < Element, Failure: Error > {
32
- private typealias ContinuationResult = Result < Element ? , Error >
33
+ @usableFromInline
34
+ internal typealias _ContinuationResult = Result < Element ? , Error >
33
35
34
36
/// All state in this class must be accessed via the lock.
35
37
///
36
38
/// - Important: We use a `class` with a lock rather than an `actor` as we must guarantee that
37
39
/// calls to ``yield(_:)`` are not reordered.
38
- private let lock : Lock
40
+ @usableFromInline
41
+ internal let _lock : Lock
39
42
40
43
/// A queue of elements which may be consumed as soon as there is demand.
41
- private var continuationResults : CircularBuffer < ContinuationResult >
44
+ @usableFromInline
45
+ internal var _continuationResults : CircularBuffer < _ContinuationResult >
42
46
43
47
/// A continuation which will be resumed in the future. The continuation must be `nil`
44
48
/// if ``continuationResults`` is not empty.
45
- private var continuation : Optional < CheckedContinuation < Element ? , Error > >
49
+ @usableFromInline
50
+ internal var _continuation : Optional < CheckedContinuation < Element ? , Error > >
46
51
47
52
/// True if a terminal continuation result (`.success(nil)` or `.failure()`) has been seen.
48
53
/// No more values may be enqueued to `continuationResults` if this is `true`.
49
- private var isTerminated : Bool
54
+ @usableFromInline
55
+ internal var _isTerminated : Bool
50
56
57
+ @usableFromInline
51
58
internal init ( initialBufferCapacity: Int = 16 ) {
52
- self . lock = Lock ( )
53
- self . continuationResults = CircularBuffer ( initialCapacity: initialBufferCapacity)
54
- self . continuation = nil
55
- self . isTerminated = false
59
+ self . _lock = Lock ( )
60
+ self . _continuationResults = CircularBuffer ( initialCapacity: initialBufferCapacity)
61
+ self . _continuation = nil
62
+ self . _isTerminated = false
56
63
}
57
64
58
65
// MARK: - Append / Yield
59
66
67
+ @usableFromInline
60
68
internal enum YieldResult : Hashable {
61
69
/// The value was accepted. The `queueDepth` indicates how many elements are waiting to be
62
70
/// consumed.
@@ -68,17 +76,20 @@ internal final class PassthroughMessageSource<Element, Failure: Error> {
68
76
case dropped
69
77
}
70
78
79
+ @inlinable
71
80
internal func yield( _ element: Element ) -> YieldResult {
72
- let continuationResult : ContinuationResult = . success( element)
73
- return self . yield ( continuationResult, isTerminator: false )
81
+ let continuationResult : _ContinuationResult = . success( element)
82
+ return self . _yield ( continuationResult, isTerminator: false )
74
83
}
75
84
85
+ @inlinable
76
86
internal func finish( throwing error: Failure ? = nil ) -> YieldResult {
77
- let continuationResult : ContinuationResult = error. map { . failure( $0) } ?? . success( nil )
78
- return self . yield ( continuationResult, isTerminator: true )
87
+ let continuationResult : _ContinuationResult = error. map { . failure( $0) } ?? . success( nil )
88
+ return self . _yield ( continuationResult, isTerminator: true )
79
89
}
80
90
81
- private enum _YieldResult {
91
+ @usableFromInline
92
+ internal enum _YieldResult {
82
93
/// The sequence has already been terminated; drop the element.
83
94
case alreadyTerminated
84
95
/// The element was added to the queue to be consumed later.
@@ -88,10 +99,25 @@ internal final class PassthroughMessageSource<Element, Failure: Error> {
88
99
case resume( CheckedContinuation < Element ? , Error > )
89
100
}
90
101
91
- private func yield( _ continuationResult: ContinuationResult , isTerminator: Bool ) -> YieldResult {
92
- let yieldResult : YieldResult
102
+ @inlinable
103
+ internal func _yield(
104
+ _ continuationResult: _ContinuationResult , isTerminator: Bool
105
+ ) -> YieldResult {
106
+ let result : _YieldResult = self . _lock. withLock {
107
+ if self . _isTerminated {
108
+ return . alreadyTerminated
109
+ } else if let continuation = self . _continuation {
110
+ self . _continuation = nil
111
+ return . resume( continuation)
112
+ } else {
113
+ self . _isTerminated = isTerminator
114
+ self . _continuationResults. append ( continuationResult)
115
+ return . queued( self . _continuationResults. count)
116
+ }
117
+ }
93
118
94
- switch self . _yield ( continuationResult, isTerminator: isTerminator) {
119
+ let yieldResult : YieldResult
120
+ switch result {
95
121
case let . queued( size) :
96
122
yieldResult = . accepted( queueDepth: size)
97
123
case let . resume( continuation) :
@@ -105,40 +131,24 @@ internal final class PassthroughMessageSource<Element, Failure: Error> {
105
131
return yieldResult
106
132
}
107
133
108
- private func _yield(
109
- _ continuationResult: ContinuationResult ,
110
- isTerminator: Bool
111
- ) -> _YieldResult {
112
- return self . lock. withLock {
113
- if self . isTerminated {
114
- return . alreadyTerminated
115
- } else if let continuation = self . continuation {
116
- self . continuation = nil
117
- return . resume( continuation)
118
- } else {
119
- self . isTerminated = isTerminator
120
- self . continuationResults. append ( continuationResult)
121
- return . queued( self . continuationResults. count)
122
- }
123
- }
124
- }
125
-
126
134
// MARK: - Next
127
135
136
+ @inlinable
128
137
internal func consumeNextElement( ) async throws -> Element ? {
129
138
return try await withCheckedThrowingContinuation {
130
- self . consumeNextElement ( continuation: $0)
139
+ self . _consumeNextElement ( continuation: $0)
131
140
}
132
141
}
133
142
134
- private func consumeNextElement( continuation: CheckedContinuation < Element ? , Error > ) {
135
- let continuationResult : ContinuationResult ? = self . lock. withLock {
136
- if let nextResult = self . continuationResults. popFirst ( ) {
143
+ @inlinable
144
+ internal func _consumeNextElement( continuation: CheckedContinuation < Element ? , Error > ) {
145
+ let continuationResult : _ContinuationResult ? = self . _lock. withLock {
146
+ if let nextResult = self . _continuationResults. popFirst ( ) {
137
147
return nextResult
138
148
} else {
139
149
// Nothing buffered and not terminated yet: save the continuation for later.
140
- assert ( self . continuation == nil )
141
- self . continuation = continuation
150
+ assert ( self . _continuation == nil )
151
+ self . _continuation = continuation
142
152
return nil
143
153
}
144
154
}
0 commit comments