@@ -195,28 +195,33 @@ extension MultiProvider {
195195 /// - Parameter body: A closure that receives an async sequence of ``MultiSnapshot`` updates.
196196 /// - Returns: The value returned by the body closure.
197197 /// - Throws: Any error thrown by the nested providers or the body closure.
198- func watchSnapshot< Return> (
199- _ body: ( ConfigUpdatesAsyncSequence < MultiSnapshot , Never > ) async throws -> Return
200- ) async throws -> Return {
198+ nonisolated ( nonsending)
199+ func watchSnapshot< Return> (
200+ _ body: ( ConfigUpdatesAsyncSequence < MultiSnapshot , Never > ) async throws -> Return
201+ ) async throws -> Return
202+ {
201203 let providers = storage. providers
202- let sources :
203- [ @Sendable (
204- ( ConfigUpdatesAsyncSequence < any ConfigSnapshotProtocol , Never > ) async throws -> Void
205- ) async throws -> Void ] = providers. map { $0. watchSnapshot }
206- return try await combineLatestOneOrMore (
207- elementType: ( any ConfigSnapshotProtocol ) . self,
208- sources: sources,
209- updatesHandler: { updateArrays in
210- try await body (
211- ConfigUpdatesAsyncSequence (
212- updateArrays
213- . map { array in
214- MultiSnapshot ( snapshots: array)
215- }
216- )
204+ typealias UpdatesSequence = any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable )
205+ var updateSequences : [ UpdatesSequence ] = [ ]
206+ updateSequences. reserveCapacity ( providers. count)
207+ return try await withProvidersWatchingSnapshot (
208+ providers: ArraySlice ( providers) ,
209+ updateSequences: & updateSequences,
210+ ) { providerUpdateSequences in
211+ let updateArrays = combineLatestMany (
212+ elementType: ( any ConfigSnapshotProtocol ) . self,
213+ failureType: Never . self,
214+ providerUpdateSequences
215+ )
216+ return try await body (
217+ ConfigUpdatesAsyncSequence (
218+ updateArrays
219+ . map { array in
220+ MultiSnapshot ( snapshots: array)
221+ }
217222 )
218- }
219- )
223+ )
224+ }
220225 }
221226
222227 /// Asynchronously resolves a configuration value from nested providers.
@@ -281,52 +286,97 @@ extension MultiProvider {
281286 /// - updatesHandler: A closure that receives an async sequence of combined updates from all providers.
282287 /// - Throws: Any error thrown by the nested providers or the handler closure.
283288 /// - Returns: The value returned by the handler.
284- func watchValue< Return> (
285- forKey key: AbsoluteConfigKey ,
286- type: ConfigType ,
287- updatesHandler: (
288- ConfigUpdatesAsyncSequence < ( [ AccessEvent . ProviderResult ] , Result < ConfigValue ? , any Error > ) , Never >
289+ nonisolated ( nonsending)
290+ func watchValue< Return> (
291+ forKey key: AbsoluteConfigKey ,
292+ type: ConfigType ,
293+ updatesHandler: (
294+ ConfigUpdatesAsyncSequence < ( [ AccessEvent . ProviderResult ] , Result < ConfigValue ? , any Error > ) , Never >
295+ ) async throws -> Return
289296 ) async throws -> Return
290- ) async throws -> Return {
297+ {
291298 let providers = storage. providers
292299 let providerNames = providers. map ( \. providerName)
293- let sources :
294- [ @Sendable (
295- (
296- ConfigUpdatesAsyncSequence < Result < LookupResult , any Error > , Never >
297- ) async throws -> Void
298- ) async throws -> Void ] = providers. map { provider in
299- { handler in
300- _ = try await provider. watchValue ( forKey: key, type: type, updatesHandler: handler)
301- }
302- }
303- return try await combineLatestOneOrMore (
304- elementType: Result < LookupResult , any Error > . self,
305- sources: sources,
306- updatesHandler: { updateArrays in
307- try await updatesHandler (
308- ConfigUpdatesAsyncSequence (
309- updateArrays
310- . map { array in
311- var results : [ AccessEvent . ProviderResult ] = [ ]
312- for (providerIndex, lookupResult) in array. enumerated ( ) {
313- let providerName = providerNames [ providerIndex]
314- results. append ( . init( providerName: providerName, result: lookupResult) )
315- switch lookupResult {
316- case . success( let value) where value. value == nil :
317- // Got a success + nil from a nested provider, keep iterating.
318- continue
319- default :
320- // Got a success + non-nil or an error from a nested provider, propagate that up.
321- return ( results, lookupResult. map { $0. value } )
322- }
300+ typealias UpdatesSequence = any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable )
301+ var updateSequences : [ UpdatesSequence ] = [ ]
302+ updateSequences. reserveCapacity ( providers. count)
303+ return try await withProvidersWatchingValue (
304+ providers: ArraySlice ( providers) ,
305+ updateSequences: & updateSequences,
306+ key: key,
307+ configType: type,
308+ ) { providerUpdateSequences in
309+ let updateArrays = combineLatestMany (
310+ elementType: Result < LookupResult , any Error > . self,
311+ failureType: Never . self,
312+ providerUpdateSequences
313+ )
314+ return try await updatesHandler (
315+ ConfigUpdatesAsyncSequence (
316+ updateArrays
317+ . map { array in
318+ var results : [ AccessEvent . ProviderResult ] = [ ]
319+ for (providerIndex, lookupResult) in array. enumerated ( ) {
320+ let providerName = providerNames [ providerIndex]
321+ results. append ( . init( providerName: providerName, result: lookupResult) )
322+ switch lookupResult {
323+ case . success( let value) where value. value == nil :
324+ // Got a success + nil from a nested provider, keep iterating.
325+ continue
326+ default :
327+ // Got a success + non-nil or an error from a nested provider, propagate that up.
328+ return ( results, lookupResult. map { $0. value } )
323329 }
324- // If all nested results were success + nil, return the same.
325- return ( results, . success( nil ) )
326330 }
327- )
331+ // If all nested results were success + nil, return the same.
332+ return ( results, . success( nil ) )
333+ }
328334 )
329- }
335+ )
336+ }
337+ }
338+ }
339+
340+ @available ( Configuration 1 . 0 , * )
341+ nonisolated ( nonsending) private func withProvidersWatchingValue< ReturnInner> (
342+ providers: ArraySlice < any ConfigProvider > ,
343+ updateSequences: inout [ any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable ) ] ,
344+ key: AbsoluteConfigKey ,
345+ configType: ConfigType ,
346+ body: ( [ any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable ) ] ) async throws -> ReturnInner
347+ ) async throws -> ReturnInner {
348+ guard let provider = providers. first else {
349+ // Recursion termination, once we've collected all update sequences, execute the body.
350+ return try await body ( updateSequences)
351+ }
352+ return try await provider. watchValue ( forKey: key, type: configType) { updates in
353+ updateSequences. append ( updates)
354+ return try await withProvidersWatchingValue (
355+ providers: providers. dropFirst ( ) ,
356+ updateSequences: & updateSequences,
357+ key: key,
358+ configType: configType,
359+ body: body
360+ )
361+ }
362+ }
363+
364+ @available ( Configuration 1 . 0 , * )
365+ nonisolated ( nonsending) private func withProvidersWatchingSnapshot< ReturnInner> (
366+ providers: ArraySlice < any ConfigProvider > ,
367+ updateSequences: inout [ any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable ) ] ,
368+ body: ( [ any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable ) ] ) async throws -> ReturnInner
369+ ) async throws -> ReturnInner {
370+ guard let provider = providers. first else {
371+ // Recursion termination, once we've collected all update sequences, execute the body.
372+ return try await body ( updateSequences)
373+ }
374+ return try await provider. watchSnapshot { updates in
375+ updateSequences. append ( updates)
376+ return try await withProvidersWatchingSnapshot (
377+ providers: providers. dropFirst ( ) ,
378+ updateSequences: & updateSequences,
379+ body: body
330380 )
331381 }
332382}
0 commit comments