Skip to content

Commit 6b2c80b

Browse files
fix race conditions in watches
1 parent 9e18996 commit 6b2c80b

File tree

5 files changed

+114
-61
lines changed

5 files changed

+114
-61
lines changed

core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import com.powersync.db.schema.Schema
77
import com.powersync.testutils.UserRow
88
import kotlinx.coroutines.runBlocking
99
import kotlinx.coroutines.test.runTest
10+
import kotlinx.coroutines.test.advanceTimeBy
1011
import org.junit.After
12+
import kotlinx.coroutines.delay
13+
import kotlinx.coroutines.*
14+
1115

1216
import org.junit.Test
1317
import org.junit.runner.RunWith
@@ -91,4 +95,60 @@ class AndroidDatabaseTest {
9195
query.cancel()
9296
}
9397
}
98+
99+
@Test
100+
fun testThrottledTableUpdates() =
101+
runTest {
102+
turbineScope {
103+
// Avoids skipping delays
104+
withContext(Dispatchers.Default) {
105+
val query =
106+
database.watch(
107+
"SELECT * FROM users",
108+
throttleMs = 1000
109+
) { UserRow.from(it) }.testIn(this)
110+
111+
// Wait for initial query
112+
assertEquals(0, query.awaitItem().size)
113+
114+
database.execute(
115+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
116+
listOf("Test", "[email protected]"),
117+
)
118+
119+
database.writeTransaction {
120+
it.execute(
121+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
122+
listOf("Test2", "[email protected]"),
123+
)
124+
it.execute(
125+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
126+
listOf("Test3", "[email protected]"),
127+
)
128+
}
129+
130+
assertEquals(3, query.awaitItem().size)
131+
132+
try {
133+
database.writeTransaction {
134+
it.execute("DELETE FROM users;")
135+
it.execute("syntax error, revert please")
136+
}
137+
} catch (e: Exception) {
138+
// Ignore
139+
}
140+
141+
database.execute(
142+
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
143+
listOf("Test4", "[email protected]"),
144+
)
145+
assertEquals(4, query.awaitItem().size)
146+
147+
query.expectNoEvents()
148+
query.cancel()
149+
150+
}
151+
152+
}
153+
}
94154
}

core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,11 @@ package com.powersync
22

33
import app.cash.sqldelight.db.SqlDriver
44
import com.powersync.utils.AtomicMutableSet
5-
import com.powersync.utils.throttle
65
import kotlinx.coroutines.CoroutineScope
7-
import kotlinx.coroutines.flow.Flow
86
import kotlinx.coroutines.flow.MutableSharedFlow
7+
import kotlinx.coroutines.flow.SharedFlow
98
import kotlinx.coroutines.flow.asSharedFlow
10-
import kotlinx.coroutines.flow.filter
11-
import kotlinx.coroutines.flow.map
12-
import kotlinx.coroutines.launch
9+
import kotlinx.coroutines.runBlocking
1310

1411
internal class PsSqlDriver(
1512
private val driver: SqlDriver,
@@ -22,47 +19,31 @@ internal class PsSqlDriver(
2219
private val pendingUpdates = AtomicMutableSet<String>()
2320

2421
fun updateTable(tableName: String) {
25-
scope.launch {
22+
// This should only ever be executed by an execute operation which should
23+
// always be executed with the IO Dispatcher
24+
runBlocking {
2625
pendingUpdates.add(tableName)
2726
}
2827
}
2928

3029
fun clearTableUpdates() {
31-
scope.launch {
30+
// This should only ever be executed on rollback which should be executed via the
31+
// IO Dispatcher.
32+
runBlocking {
3233
pendingUpdates.clear()
3334
}
3435
}
3536

36-
// Flows on table updates containing tables
37-
fun updatesOnTables(
38-
tableNames: Set<String>,
39-
throttleMs: Long?,
40-
): Flow<Unit> {
37+
// Flows on any table change
38+
// This specifically returns a SharedFlow for timing considerations
39+
fun updatesOnTables(): SharedFlow<Set<String>> {
4140
// Spread the input table names in order to account for internal views
42-
val resolvedTableNames =
43-
tableNames
44-
.flatMap { t -> setOf("ps_data__$t", "ps_data_local__$t", t) }
45-
.toSet()
46-
var flow =
47-
tableUpdatesFlow
48-
.asSharedFlow()
49-
.filter {
50-
it
51-
.intersect(
52-
resolvedTableNames,
53-
).isNotEmpty()
54-
}
55-
56-
if (throttleMs != null) {
57-
flow = flow.throttle(throttleMs)
58-
}
59-
60-
return flow.map { }
41+
return tableUpdatesFlow
42+
.asSharedFlow()
6143
}
6244

63-
fun fireTableUpdates() {
64-
scope.launch {
65-
tableUpdatesFlow.emit(pendingUpdates.toSet(true))
66-
}
45+
suspend fun fireTableUpdates() {
46+
val updates = pendingUpdates.toSet(true)
47+
tableUpdatesFlow.emit(updates)
6748
}
6849
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import com.powersync.sync.SyncStatusData
2222
import com.powersync.sync.SyncStream
2323
import com.powersync.utils.JsonParam
2424
import com.powersync.utils.JsonUtil
25+
import com.powersync.utils.throttle
2526
import com.powersync.utils.toJsonObject
2627
import kotlinx.coroutines.CoroutineScope
2728
import kotlinx.coroutines.FlowPreview
2829
import kotlinx.coroutines.Job
2930
import kotlinx.coroutines.cancelAndJoin
3031
import kotlinx.coroutines.flow.Flow
32+
import kotlinx.coroutines.flow.filter
3133
import kotlinx.coroutines.flow.first
3234
import kotlinx.coroutines.launch
3335
import kotlinx.coroutines.runBlocking
@@ -141,10 +143,10 @@ internal class PowerSyncDatabaseImpl(
141143
uploadJob =
142144
scope.launch {
143145
internalDb
144-
.updatesOnTables(
145-
setOf(InternalTable.CRUD.toString()),
146-
throttleMs = crudThrottleMs,
147-
).collect {
146+
.updatesOnTables()
147+
.filter { it.contains(InternalTable.CRUD.toString()) }
148+
.throttle(crudThrottleMs)
149+
.collect {
148150
syncStream!!.triggerCrudUpload()
149151
}
150152
}

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@ import com.persistence.PowersyncQueries
55
import com.powersync.db.Queries
66
import com.powersync.persistence.PsDatabase
77
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.SharedFlow
89

910
internal interface InternalDatabase :
1011
Queries,
1112
Closeable {
1213
val transactor: PsDatabase
1314
val queries: PowersyncQueries
1415

15-
fun updatesOnTables(
16-
tableNames: Set<String>,
17-
throttleMs: Long?,
18-
): Flow<Unit>
16+
fun updatesOnTables(): SharedFlow<Set<String>>
1917
}

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ import com.powersync.db.SqlCursor
1010
import com.powersync.db.runWrapped
1111
import com.powersync.persistence.PsDatabase
1212
import com.powersync.utils.JsonUtil
13+
import com.powersync.utils.throttle
1314
import kotlinx.coroutines.CancellationException
1415
import kotlinx.coroutines.CoroutineScope
1516
import kotlinx.coroutines.Dispatchers
1617
import kotlinx.coroutines.FlowPreview
1718
import kotlinx.coroutines.IO
1819
import kotlinx.coroutines.flow.Flow
19-
import kotlinx.coroutines.flow.emitAll
20-
import kotlinx.coroutines.flow.flow
21-
import kotlinx.coroutines.flow.map
22-
import kotlinx.coroutines.flow.onStart
20+
import kotlinx.coroutines.flow.SharedFlow
21+
import kotlinx.coroutines.flow.channelFlow
22+
import kotlinx.coroutines.flow.filter
23+
import kotlinx.coroutines.flow.onSubscription
2324
import kotlinx.coroutines.withContext
2425
import kotlinx.serialization.encodeToString
2526

@@ -156,22 +157,36 @@ internal class InternalDatabaseImpl(
156157
throttleMs: Long?,
157158
mapper: (SqlCursor) -> RowType,
158159
): Flow<List<RowType>> =
159-
flow {
160+
// Use a channel flow here since we throttle (buffer used under the hood)
161+
// This causes some emissions to be from different scopes.
162+
channelFlow {
160163
// Fetch the tables asynchronously with getAll
161164
val tables =
162165
getSourceTables(sql, parameters)
163166
.filter { it.isNotBlank() }
164167
.toSet()
165168

166-
emitAll(
167-
updatesOnTables(tables, throttleMs = throttleMs ?: DEFAULT_WATCH_THROTTLE_MS)
168-
.map {
169-
getAll(sql, parameters = parameters, mapper = mapper)
170-
}.onStart {
171-
// Emit the initial query result
172-
emit(getAll(sql, parameters = parameters, mapper = mapper))
173-
},
174-
)
169+
// Register a listener before fetching the initial result
170+
val updateFlow =
171+
updatesOnTables()
172+
173+
val initialResult = getAll(sql, parameters = parameters, mapper = mapper)
174+
// Listen for updates before emitting the initial result
175+
176+
updateFlow
177+
// onSubscription here is very important.
178+
// This ensures that the initial result and all updates are emitted.
179+
.onSubscription {
180+
println("emitting initial result")
181+
send(initialResult)
182+
}.filter {
183+
// Only trigger updates on relevant tables
184+
it.intersect(tables).isNotEmpty()
185+
}.throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS)
186+
.collect {
187+
println("mapping update to result")
188+
send(getAll(sql, parameters = parameters, mapper = mapper))
189+
}
175190
}
176191

177192
private fun <T : Any> createQuery(
@@ -219,10 +234,7 @@ internal class InternalDatabaseImpl(
219234
}
220235

221236
// Register callback for table updates on a specific table
222-
override fun updatesOnTables(
223-
tableNames: Set<String>,
224-
throttleMs: Long?,
225-
): Flow<Unit> = driver.updatesOnTables(tableNames, throttleMs)
237+
override fun updatesOnTables(): SharedFlow<Set<String>> = driver.updatesOnTables()
226238

227239
private suspend fun getSourceTables(
228240
sql: String,

0 commit comments

Comments
 (0)