@@ -12,227 +12,111 @@ import Combine
1212// MARK: - Operator methods
1313@available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
1414public extension Publisher {
15- /// Merges two publishers into a single publisher by combining each value
16- /// from self with the latest value from the second publisher, if any.
17- ///
18- /// - parameter other: A second publisher source.
19- /// - parameter resultSelector: Function to invoke for each value from the self combined
20- /// with the latest value from the second source, if any.
21- ///
22- /// - returns: A publisher containing the result of combining each value of the self
23- /// with the latest value from the second publisher, if any, using the
24- /// specified result selector function.
25- func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26- resultSelector: @escaping ( Output , Other . Output ) -> Result )
27- -> Publishers . WithLatestFrom < Self , Other , Result > {
28- return . init( upstream: self , second: other, resultSelector: resultSelector)
29- }
30-
31- /// Merges three publishers into a single publisher by combining each value
32- /// from self with the latest value from the second and third publisher, if any.
33- ///
34- /// - parameter other: A second publisher source.
35- /// - parameter other1: A third publisher source.
36- /// - parameter resultSelector: Function to invoke for each value from the self combined
37- /// with the latest value from the second and third source, if any.
38- ///
39- /// - returns: A publisher containing the result of combining each value of the self
40- /// with the latest value from the second and third publisher, if any, using the
41- /// specified result selector function.
42- func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
43- _ other1: Other1 ,
44- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
45- -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , Result >
46- where Other. Failure == Failure , Other1. Failure == Failure {
47- let combined = other. combineLatest ( other1)
48- . eraseToAnyPublisher ( )
49- return . init( upstream: self , second: combined, resultSelector: resultSelector)
50- }
51-
52- /// Merges four publishers into a single publisher by combining each value
53- /// from self with the latest value from the second, third and fourth publisher, if any.
54- ///
55- /// - parameter other: A second publisher source.
56- /// - parameter other1: A third publisher source.
57- /// - parameter other2: A fourth publisher source.
58- /// - parameter resultSelector: Function to invoke for each value from the self combined
59- /// with the latest value from the second, third and fourth source, if any.
60- ///
61- /// - returns: A publisher containing the result of combining each value of the self
62- /// with the latest value from the second, third and fourth publisher, if any, using the
63- /// specified result selector function.
64- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
65- _ other1: Other1 ,
66- _ other2: Other2 ,
67- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
68- -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , Result >
69- where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
70- let combined = other. combineLatest ( other1, other2)
71- . eraseToAnyPublisher ( )
72- return . init( upstream: self , second: combined, resultSelector: resultSelector)
73- }
74-
75- /// Upon an emission from self, emit the latest value from the
76- /// second publisher, if any exists.
77- ///
78- /// - parameter other: A second publisher source.
79- ///
80- /// - returns: A publisher containing the latest value from the second publisher, if any.
81- func withLatestFrom< Other: Publisher > ( _ other: Other )
82- -> Publishers . WithLatestFrom < Self , Other , Other . Output > {
83- return . init( upstream: self , second: other) { $1 }
84- }
85-
86- /// Upon an emission from self, emit the latest value from the
87- /// second and third publisher, if any exists.
88- ///
89- /// - parameter other: A second publisher source.
90- /// - parameter other1: A third publisher source.
91- ///
92- /// - returns: A publisher containing the latest value from the second and third publisher, if any.
93- func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
94- _ other1: Other1 )
95- -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output ) >
96- where Other. Failure == Failure , Other1. Failure == Failure {
97- withLatestFrom ( other, other1) { $1 }
98- }
99-
100- /// Upon an emission from self, emit the latest value from the
101- /// second, third and forth publisher, if any exists.
102- ///
103- /// - parameter other: A second publisher source.
104- /// - parameter other1: A third publisher source.
105- /// - parameter other2: A forth publisher source.
106- ///
107- /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
108- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
109- _ other1: Other1 ,
110- _ other2: Other2 )
111- -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output , Other2 . Output ) >
112- where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
113- withLatestFrom ( other, other1, other2) { $1 }
114- }
115- }
116-
117- // MARK: - Publisher
118- @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
119- public extension Publishers {
120- struct WithLatestFrom < Upstream: Publisher ,
121- Other: Publisher ,
122- Output> : Publisher where Upstream. Failure == Other . Failure {
123- public typealias Failure = Upstream . Failure
124- public typealias ResultSelector = ( Upstream . Output , Other . Output ) -> Output
125-
126- private let upstream : Upstream
127- private let second : Other
128- private let resultSelector : ResultSelector
129- private var latestValue : Other . Output ?
130-
131- init ( upstream: Upstream ,
132- second: Other ,
133- resultSelector: @escaping ResultSelector ) {
134- self . upstream = upstream
135- self . second = second
136- self . resultSelector = resultSelector
137- }
138-
139- public func receive< S: Subscriber > ( subscriber: S ) where Failure == S . Failure , Output == S . Input {
140- subscriber. receive ( subscription: Subscription ( upstream: upstream,
141- downstream: subscriber,
142- second: second,
143- resultSelector: resultSelector) )
15+ /// Merges two publishers into a single publisher by combining each value
16+ /// from self with the latest value from the second publisher, if any.
17+ ///
18+ /// - parameter other: A second publisher source.
19+ /// - parameter resultSelector: Function to invoke for each value from the self combined
20+ /// with the latest value from the second source, if any.
21+ ///
22+ /// - returns: A publisher containing the result of combining each value of the self
23+ /// with the latest value from the second publisher, if any, using the
24+ /// specified result selector function.
25+ func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26+ resultSelector: @escaping ( Output , Other . Output ) -> Result )
27+ -> AnyPublisher < Result , Failure >
28+ where Other. Failure == Failure {
29+ let upstream = share ( )
30+
31+ return other
32+ . map { second in upstream. map { resultSelector ( $0, second) } }
33+ . switchToLatest ( )
34+ . zip ( upstream) // `zip`ping and discarding `\.1` allows for
35+ // upstream completions to be projected down immediately.
36+ . map ( \. 0 )
37+ . eraseToAnyPublisher ( )
14438 }
145- }
146- }
14739
148- // MARK: - Subscription
149- @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
150- private extension Publishers . WithLatestFrom {
151- class Subscription < Downstream: Subscriber > : Combine . Subscription , CustomStringConvertible where Downstream. Input == Output , Downstream. Failure == Failure {
152- private let resultSelector : ResultSelector
153- private var sink : Sink < Upstream , Downstream > ?
154-
155- private let upstream : Upstream
156- private let downstream : Downstream
157- private let second : Other
158-
159- // Secondary (other) publisher
160- private var latestValue : Other . Output ?
161- private var otherSubscription : Cancellable ?
162- private var preInitialDemand = Subscribers . Demand. none
163-
164- init ( upstream: Upstream ,
165- downstream: Downstream ,
166- second: Other ,
167- resultSelector: @escaping ResultSelector ) {
168- self . upstream = upstream
169- self . second = second
170- self . downstream = downstream
171- self . resultSelector = resultSelector
172-
173- trackLatestFromSecond { [ weak self] in
174- guard let self = self else { return }
175- self . request ( self . preInitialDemand)
176- self . preInitialDemand = . none
177- }
40+ /// Merges three publishers into a single publisher by combining each value
41+ /// from self with the latest value from the second and third publisher, if any.
42+ ///
43+ /// - parameter other: A second publisher source.
44+ /// - parameter other1: A third publisher source.
45+ /// - parameter resultSelector: Function to invoke for each value from the self combined
46+ /// with the latest value from the second and third source, if any.
47+ ///
48+ /// - returns: A publisher containing the result of combining each value of the self
49+ /// with the latest value from the second and third publisher, if any, using the
50+ /// specified result selector function.
51+ func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
52+ _ other1: Other1 ,
53+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
54+ -> AnyPublisher < Result , Failure >
55+ where Other. Failure == Failure , Other1. Failure == Failure {
56+ withLatestFrom ( other. combineLatest ( other1) , resultSelector: resultSelector)
17857 }
17958
180- func request( _ demand: Subscribers . Demand ) {
181- guard latestValue != nil else {
182- preInitialDemand += demand
183- return
184- }
185-
186- self . sink? . demand ( demand)
59+ /// Merges four publishers into a single publisher by combining each value
60+ /// from self with the latest value from the second, third and fourth publisher, if any.
61+ ///
62+ /// - parameter other: A second publisher source.
63+ /// - parameter other1: A third publisher source.
64+ /// - parameter other2: A fourth publisher source.
65+ /// - parameter resultSelector: Function to invoke for each value from the self combined
66+ /// with the latest value from the second, third and fourth source, if any.
67+ ///
68+ /// - returns: A publisher containing the result of combining each value of the self
69+ /// with the latest value from the second, third and fourth publisher, if any, using the
70+ /// specified result selector function.
71+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
72+ _ other1: Other1 ,
73+ _ other2: Other2 ,
74+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
75+ -> AnyPublisher < Result , Failure >
76+ where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
77+ withLatestFrom ( other. combineLatest ( other1, other2) , resultSelector: resultSelector)
18778 }
18879
189- // Create an internal subscription to the `Other` publisher,
190- // constantly tracking its latest value
191- private func trackLatestFromSecond( onInitialValue: @escaping ( ) -> Void ) {
192- var gotInitialValue = false
193-
194- let subscriber = AnySubscriber < Other . Output , Other . Failure > (
195- receiveSubscription: { [ weak self] subscription in
196- self ? . otherSubscription = subscription
197- subscription. request ( . unlimited)
198- } ,
199- receiveValue: { [ weak self] value in
200- guard let self = self else { return . none }
201- self . latestValue = value
202-
203- if !gotInitialValue {
204- // When getting initial value, start pulling values
205- // from upstream in the main sink
206- self . sink = Sink ( upstream: self . upstream,
207- downstream: self . downstream,
208- transformOutput: { [ weak self] value in
209- guard let self = self ,
210- let other = self . latestValue else { return nil }
211-
212- return self . resultSelector ( value, other)
213- } ,
214- transformFailure: { $0 } )
215-
216- // Signal initial value to start fulfilling downstream demand
217- gotInitialValue = true
218- onInitialValue ( )
219- }
220-
221- return . unlimited
222- } ,
223- receiveCompletion: nil )
224-
225- self . second. subscribe ( subscriber)
80+ /// Upon an emission from self, emit the latest value from the
81+ /// second publisher, if any exists.
82+ ///
83+ /// - parameter other: A second publisher source.
84+ ///
85+ /// - returns: A publisher containing the latest value from the second publisher, if any.
86+ func withLatestFrom< Other: Publisher > ( _ other: Other )
87+ -> AnyPublisher < Other . Output , Failure >
88+ where Other. Failure == Failure {
89+ withLatestFrom ( other) { $1 }
22690 }
22791
228- var description : String {
229- return " WithLatestFrom.Subscription< \( Output . self) , \( Failure . self) > "
92+ /// Upon an emission from self, emit the latest value from the
93+ /// second and third publisher, if any exists.
94+ ///
95+ /// - parameter other: A second publisher source.
96+ /// - parameter other1: A third publisher source.
97+ ///
98+ /// - returns: A publisher containing the latest value from the second and third publisher, if any.
99+ func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
100+ _ other1: Other1 )
101+ -> AnyPublisher < ( Other . Output , Other1 . Output ) , Failure >
102+ where Other. Failure == Failure , Other1. Failure == Failure {
103+ withLatestFrom ( other, other1) { $1 }
230104 }
231105
232- func cancel( ) {
233- sink = nil
234- otherSubscription? . cancel ( )
106+ /// Upon an emission from self, emit the latest value from the
107+ /// second, third and forth publisher, if any exists.
108+ ///
109+ /// - parameter other: A second publisher source.
110+ /// - parameter other1: A third publisher source.
111+ /// - parameter other2: A forth publisher source.
112+ ///
113+ /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
114+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
115+ _ other1: Other1 ,
116+ _ other2: Other2 )
117+ -> AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Failure >
118+ where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
119+ withLatestFrom ( other, other1, other2) { $1 }
235120 }
236- }
237121}
238122#endif
0 commit comments