Skip to content

Commit 3dd7e43

Browse files
fix race conditions
1 parent 6b2c80b commit 3dd7e43

File tree

3 files changed

+34
-18
lines changed

3 files changed

+34
-18
lines changed

core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,18 @@ internal class PsSqlDriver(
2727
}
2828

2929
fun clearTableUpdates() {
30-
// This should only ever be executed on rollback which should be executed via the
31-
// IO Dispatcher.
30+
// This should only ever be executed by an execute operation which should
31+
// always be executed with the IO Dispatcher
3232
runBlocking {
3333
pendingUpdates.clear()
3434
}
3535
}
3636

3737
// Flows on any table change
38-
// This specifically returns a SharedFlow for timing considerations
39-
fun updatesOnTables(): SharedFlow<Set<String>> {
40-
// Spread the input table names in order to account for internal views
41-
return tableUpdatesFlow
38+
// This specifically returns a SharedFlow for downstream timing considerations
39+
fun updatesOnTables(): SharedFlow<Set<String>> =
40+
tableUpdatesFlow
4241
.asSharedFlow()
43-
}
4442

4543
suspend fun fireTableUpdates() {
4644
val updates = pendingUpdates.toSet(true)

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -166,25 +166,16 @@ internal class InternalDatabaseImpl(
166166
.filter { it.isNotBlank() }
167167
.toSet()
168168

169-
// Register a listener before fetching the initial result
170-
val updateFlow =
171-
updatesOnTables()
172-
173-
val initialResult = getAll(sql, parameters = parameters, mapper = mapper)
174-
// Listen for updates before emitting the initial result
175-
176-
updateFlow
169+
updatesOnTables()
177170
// onSubscription here is very important.
178171
// This ensures that the initial result and all updates are emitted.
179172
.onSubscription {
180-
println("emitting initial result")
181-
send(initialResult)
173+
send(getAll(sql, parameters = parameters, mapper = mapper))
182174
}.filter {
183175
// Only trigger updates on relevant tables
184176
it.intersect(tables).isNotEmpty()
185177
}.throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS)
186178
.collect {
187-
println("mapping update to result")
188179
send(getAll(sql, parameters = parameters, mapper = mapper))
189180
}
190181
}

core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package com.powersync.utils
22

3+
import kotlinx.coroutines.delay
4+
import kotlinx.coroutines.flow.flow
5+
import kotlinx.coroutines.flow.map
6+
import kotlinx.coroutines.flow.toList
7+
import kotlinx.coroutines.test.runTest
38
import kotlinx.serialization.json.JsonArray
49
import kotlinx.serialization.json.JsonNull
510
import kotlinx.serialization.json.JsonObject
@@ -36,6 +41,28 @@ class JsonTest {
3641
assertEquals("test", jsonElement.content)
3742
}
3843

44+
@Test
45+
fun testThrottle() {
46+
runTest {
47+
val t =
48+
flow {
49+
emit(1)
50+
delay(10)
51+
emit(2)
52+
delay(20)
53+
emit(3)
54+
delay(100)
55+
emit(4)
56+
}.throttle(100)
57+
.map {
58+
// Adding a delay here to simulate a slow consumer
59+
delay(1000)
60+
it
61+
}.toList()
62+
assertEquals(t, listOf(1, 4))
63+
}
64+
}
65+
3966
@Test
4067
fun testBooleanToJsonElement() {
4168
val boolean = JsonParam.Boolean(true)

0 commit comments

Comments
 (0)