Skip to content

Commit fd0c03f

Browse files
add ability to throttle watched queries
1 parent f5d0ae7 commit fd0c03f

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,12 @@ internal class PowerSyncDatabaseImpl(
139139
}
140140
}
141141

142-
uploadJob =
142+
uploadJob =
143143
scope.launch {
144-
internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString())).debounce(crudThrottleMs).collect {
145-
syncStream!!.triggerCrudUpload()
146-
}
144+
internalDb.updatesOnTables(setOf(InternalTable.CRUD.toString()))
145+
.debounce(crudThrottleMs).collect {
146+
syncStream!!.triggerCrudUpload()
147+
}
147148
}
148149
}
149150

@@ -210,7 +211,8 @@ internal class PowerSyncDatabaseImpl(
210211
}
211212
}
212213

213-
override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne()
214+
override suspend fun getPowerSyncVersion(): String =
215+
internalDb.queries.powerSyncVersion().executeAsOne()
214216

215217
override suspend fun <RowType : Any> get(
216218
sql: String,
@@ -233,12 +235,15 @@ internal class PowerSyncDatabaseImpl(
233235
override fun <RowType : Any> watch(
234236
sql: String,
235237
parameters: List<Any?>?,
238+
throttleMs: Long?,
236239
mapper: (SqlCursor) -> RowType,
237-
): Flow<List<RowType>> = internalDb.watch(sql, parameters, mapper)
240+
): Flow<List<RowType>> = internalDb.watch(sql, parameters, throttleMs, mapper)
238241

239-
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
242+
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R =
243+
internalDb.writeTransaction(callback)
240244

241-
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
245+
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R =
246+
internalDb.writeTransaction(callback)
242247

243248
override suspend fun execute(
244249
sql: String,
@@ -280,7 +285,11 @@ internal class PowerSyncDatabaseImpl(
280285
syncStream = null
281286
}
282287

283-
currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt)
288+
currentStatus.update(
289+
connected = false,
290+
connecting = false,
291+
lastSyncedAt = currentStatus.lastSyncedAt
292+
)
284293
}
285294

286295
override suspend fun disconnectAndClear(clearLocal: Boolean) {

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public interface Queries {
5454
public fun <RowType : Any> watch(
5555
sql: String,
5656
parameters: List<Any?>? = listOf(),
57+
/**
58+
* Specify the minimum interval, in milliseconds, between queries.
59+
*/
60+
throttleMs: Long? = null,
5761
mapper: (SqlCursor) -> RowType,
5862
): Flow<List<RowType>>
5963

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ internal class InternalDatabaseImpl(
5151
sql: String,
5252
parameters: List<Any?>?,
5353
mapper: (SqlCursor) -> RowType,
54-
): List<RowType> = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper)
54+
): List<RowType> =
55+
this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper)
5556

5657
override fun <RowType : Any> getOptional(
5758
sql: String,
5859
parameters: List<Any?>?,
5960
mapper: (SqlCursor) -> RowType,
60-
): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper)
61+
): RowType? =
62+
this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper)
6163
}
6264

6365
companion object {
@@ -154,6 +156,7 @@ internal class InternalDatabaseImpl(
154156
override fun <RowType : Any> watch(
155157
sql: String,
156158
parameters: List<Any?>?,
159+
throttleMs: Long?,
157160
mapper: (SqlCursor) -> RowType,
158161
): Flow<List<RowType>> =
159162
flow {
@@ -165,7 +168,7 @@ internal class InternalDatabaseImpl(
165168

166169
emitAll(
167170
updatesOnTables(tables)
168-
.debounce(DEFAULT_WATCH_THROTTLE_MS)
171+
.debounce(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS)
169172
.map {
170173
getAll(sql, parameters = parameters, mapper = mapper)
171174
}.onStart {
@@ -220,7 +223,8 @@ internal class InternalDatabaseImpl(
220223
}
221224

222225
// Register callback for table updates on a specific table
223-
override fun updatesOnTables(tableNames: Set<String>): Flow<Unit> = driver.updatesOnTables(tableNames)
226+
override fun updatesOnTables(tableNames: Set<String>): Flow<Unit> =
227+
driver.updatesOnTables(tableNames)
224228

225229
private suspend fun getSourceTables(
226230
sql: String,

0 commit comments

Comments
 (0)