Skip to content

Commit ae85b57

Browse files
committed
Add asynchronous event source
1 parent d3e59a0 commit ae85b57

File tree

2 files changed

+217
-0
lines changed

2 files changed

+217
-0
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2019-2024 Spotify AB.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import Foundation
16+
17+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
18+
public extension CompositeEventSourceBuilder where Event: Sendable {
19+
/// Returns a new `CompositeEventSourceBuilder` with the specified `AsyncSequence` added to it.
20+
///
21+
/// - Note: The `consumerQueue` parameter is intended to be used when building a `MobiusLoop`.
22+
/// It can safely be omitted when building a `MobiusController`, which automatically handles sending events to the loop queue.
23+
///
24+
/// - Parameter sequence: An `AsyncSequence` producing `Event`s.
25+
/// - Parameter consumerQueue: An optional callback queue to consume events on.
26+
/// - Returns: A `CompositeEventSourceBuilder` that includes the given event source.
27+
func addEventSource<Sequence: AsyncSequence & Sendable>(
28+
_ sequence: Sequence,
29+
receiveOn consumerQueue: DispatchQueue? = nil
30+
) -> CompositeEventSourceBuilder<Event> where Sequence.Element == Event {
31+
addEventSource(AsyncSequenceEventSource(sequence: sequence, consumerQueue: consumerQueue))
32+
}
33+
}
34+
35+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
36+
private final class AsyncSequenceEventSource<Sequence: AsyncSequence & Sendable>: EventSource where Sequence.Element: Sendable {
37+
private struct SendableConsumer<Output>: @unchecked Sendable {
38+
let wrappedValue: Consumer<Output>
39+
}
40+
41+
private let sequence: Sequence
42+
private let consumerQueue: DispatchQueue?
43+
44+
init(sequence: Sequence, consumerQueue: DispatchQueue? = nil) {
45+
self.sequence = sequence
46+
self.consumerQueue = consumerQueue
47+
}
48+
49+
func subscribe(consumer: @escaping Consumer<Sequence.Element>) -> Disposable {
50+
// Prevents sending events after dispose by wrapping the consumer to enforce synchronous access.
51+
let sendableConsumer = SendableConsumer(wrappedValue: consumer)
52+
let protectedConsumer = Synchronized<SendableConsumer<Sequence.Element>?>(value: sendableConsumer)
53+
let threadSafeConsumer: @Sendable (Sequence.Element) -> Void = { event in
54+
protectedConsumer.read { consumer in consumer?.wrappedValue(event) }
55+
}
56+
57+
let task = Task { [sequence, consumerQueue] in
58+
for try await event in sequence {
59+
if let consumerQueue {
60+
consumerQueue.async { threadSafeConsumer(event) }
61+
} else {
62+
threadSafeConsumer(event)
63+
}
64+
}
65+
}
66+
67+
return AnonymousDisposable {
68+
protectedConsumer.value = nil
69+
task.cancel()
70+
}
71+
}
72+
}
73+
74+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
75+
extension Synchronized: @unchecked Sendable where Value: Sendable {}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2019-2024 Spotify AB.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import Foundation
16+
import MobiusCore
17+
import Nimble
18+
import Quick
19+
20+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
21+
final class CompositeEventSourceBuilder_ConcurrencyTests: QuickSpec {
22+
// swiftlint:disable:next function_body_length
23+
override func spec() {
24+
describe("CompositeEventSourceBuilder") {
25+
var sequence: AsyncStream<String>!
26+
var elementProducer: AsyncStream<String>.Continuation!
27+
28+
beforeEach {
29+
sequence = AsyncStream<String> { continuation in
30+
elementProducer = continuation
31+
}
32+
}
33+
34+
context("when configuring the composite event source builder") {
35+
var compositeEventSource: AnyEventSource<String>!
36+
var disposable: Disposable!
37+
var receivedEvents: [String]!
38+
39+
context("with an AsyncSequence event source") {
40+
beforeEach {
41+
let sut = CompositeEventSourceBuilder<String>()
42+
.addEventSource(sequence, receiveOn: .main)
43+
44+
compositeEventSource = sut.build()
45+
receivedEvents = []
46+
47+
disposable = compositeEventSource.subscribe {
48+
receivedEvents.append($0)
49+
}
50+
}
51+
52+
afterEach {
53+
disposable.dispose()
54+
}
55+
56+
it("should receive events from the sequence") {
57+
elementProducer.yield("foo")
58+
expect(receivedEvents).toEventually(equal(["foo"]))
59+
60+
elementProducer.yield("bar")
61+
expect(receivedEvents).toEventually(equal(["foo", "bar"]))
62+
}
63+
}
64+
}
65+
66+
describe("DelayedSequence") {
67+
context("MobiusLoop with an AsyncSequence event source") {
68+
var loop: MobiusLoop<String, String, String>!
69+
var receivedModels: [String]!
70+
71+
beforeEach {
72+
let effectHandler = EffectRouter<String, String>()
73+
.asConnectable
74+
75+
let eventSource = CompositeEventSourceBuilder<String>()
76+
.addEventSource(sequence, receiveOn: .main)
77+
.build()
78+
79+
loop = Mobius
80+
.loop(update: { _, event in .next(event) }, effectHandler: effectHandler)
81+
.withEventSource(eventSource)
82+
.start(from: "foo")
83+
84+
receivedModels = []
85+
loop.addObserver { model in receivedModels.append(model) }
86+
}
87+
88+
afterEach {
89+
loop.dispose()
90+
}
91+
92+
it("should prevent events from being submitted after dispose") {
93+
elementProducer.yield("bar")
94+
expect(receivedModels).toEventually(equal(["foo", "bar"]))
95+
96+
loop.dispose()
97+
98+
elementProducer.yield("baz")
99+
expect(receivedModels).toNever(equal(["foo", "bar", "baz"]))
100+
}
101+
}
102+
103+
context("MobiusController with an AsyncSequence event source") {
104+
let loopQueue = DispatchQueue(label: "loop queue")
105+
let viewQueue = DispatchQueue(label: "view queue")
106+
107+
var controller: MobiusController<String, String, String>!
108+
var view: RecordingTestConnectable!
109+
110+
beforeEach {
111+
let effectHandler = EffectRouter<String, String>()
112+
.asConnectable
113+
114+
let eventSource = CompositeEventSourceBuilder<String>()
115+
.addEventSource(sequence)
116+
.build()
117+
118+
controller = Mobius
119+
.loop(update: { _, event in .next(String(event)) }, effectHandler: effectHandler)
120+
.withEventSource(eventSource)
121+
.makeController(from: "foo", loopQueue: loopQueue, viewQueue: viewQueue)
122+
123+
view = RecordingTestConnectable(expectedQueue: viewQueue)
124+
controller.connectView(view)
125+
}
126+
127+
it("should prevent events from being submitted after dispose") {
128+
controller.start()
129+
130+
elementProducer.yield("bar")
131+
expect(view.recorder.items).toEventually(equal(["foo", "bar"]))
132+
133+
controller.stop()
134+
135+
elementProducer.yield("baz")
136+
expect(view.recorder.items).toNever(equal(["foo", "bar", "baz"]))
137+
}
138+
}
139+
}
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)