From 2abc97bd43d3a96e80e09879c097ccf30c325bba Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Fri, 4 Oct 2024 18:48:30 +0200 Subject: [PATCH 1/8] feat: add warning if crud transactions are not completed --- .../com/powersync/bucket/BucketStorage.kt | 19 ++++++++ .../kotlin/com/powersync/sync/SyncStream.kt | 44 ++++++++++--------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 443743a9..1d0b6c56 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -6,6 +6,8 @@ import com.powersync.db.internal.PsInternalDatabase import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudRow import kotlinx.serialization.encodeToString import com.powersync.db.internal.InternalTable import com.powersync.utils.JsonUtil @@ -51,6 +53,23 @@ internal class BucketStorage( return id ?: throw IllegalStateException("Client ID not found") } + suspend fun nextCrudItem(): CrudEntry? { + val next = db.queries.getCrudFirstEntry().awaitAsOneOrNull() + logger.i("Next Item") + logger.i(next.toString()) + val crudItem = next?.let { CrudEntry.fromRow( + CrudRow( + id = it.id.toString(), + data = it.data_!!, + txId = it.tx_id?.toInt() + ) + ) } + logger.i("Crud Item") + logger.i(crudItem.toString()) + + return crudItem + } + suspend fun hasCrud(): Boolean { return db.queries.hasCrud().awaitAsOneOrNull() == 1L } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 6b0c18c9..87776b20 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -8,9 +8,8 @@ import com.powersync.bucket.Checkpoint import com.powersync.bucket.WriteCheckpointResponse import co.touchlab.stately.concurrency.AtomicBoolean import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.utils.JsonParam +import com.powersync.db.crud.CrudEntry import com.powersync.utils.JsonUtil -import com.powersync.utils.toJsonObject import io.ktor.client.HttpClient import io.ktor.client.call.body import io.ktor.client.plugins.HttpTimeout @@ -135,14 +134,31 @@ internal class SyncStream( } private suspend fun uploadAllCrud() { + var checkedCrudItem: CrudEntry? = null + while (true) { + status.update(uploading = true) + /** + * This is the first item in the FIFO CRUD queue. + */ try { - val done = uploadCrudBatch() - if (status.uploadError == true) { - status.update(clearUploadError = true) - } - - if (done) { + val nextCrudItem = bucketStorage.nextCrudItem() + if (nextCrudItem != null) { + if (nextCrudItem.clientId != checkedCrudItem?.clientId) { + // This will force a higher log level than exceptions which are caught here. + logger.w( + """Potentially previously uploaded CRUD entries are still present in the upload queue. + Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. + The next upload iteration will be delayed.""" + ) + throw Exception("Delaying due to previously encountered CRUD item.") + } + + checkedCrudItem = nextCrudItem + uploadCrud() + } else { + // Uploading is completed + bucketStorage.updateLocalTarget { getWriteCheckpoint() } break } } catch (e: Exception) { @@ -155,18 +171,6 @@ internal class SyncStream( status.update(uploading = false) } - private suspend fun uploadCrudBatch(): Boolean { - if (bucketStorage.hasCrud()) { - status.update(uploading = true) - uploadCrud() - return false - } else { - // This isolate is the only one triggering - bucketStorage.updateLocalTarget { getWriteCheckpoint() } - return true - } - } - private suspend fun getWriteCheckpoint(): String { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } From 96249acb57116a2008b95a0268608b38b77bbb44 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Fri, 4 Oct 2024 18:51:33 +0200 Subject: [PATCH 2/8] chore: remove logs --- .../commonMain/kotlin/com/powersync/bucket/BucketStorage.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 1d0b6c56..682b3fcc 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -55,8 +55,6 @@ internal class BucketStorage( suspend fun nextCrudItem(): CrudEntry? { val next = db.queries.getCrudFirstEntry().awaitAsOneOrNull() - logger.i("Next Item") - logger.i(next.toString()) val crudItem = next?.let { CrudEntry.fromRow( CrudRow( id = it.id.toString(), @@ -64,8 +62,6 @@ internal class BucketStorage( txId = it.tx_id?.toInt() ) ) } - logger.i("Crud Item") - logger.i(crudItem.toString()) return crudItem } From 68332943e5f9faa4d0279d8f914488d6c08bf93b Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Mon, 7 Oct 2024 10:29:56 +0200 Subject: [PATCH 3/8] fix: logic error --- core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 87776b20..85dc795d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -144,7 +144,7 @@ internal class SyncStream( try { val nextCrudItem = bucketStorage.nextCrudItem() if (nextCrudItem != null) { - if (nextCrudItem.clientId != checkedCrudItem?.clientId) { + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. logger.w( """Potentially previously uploaded CRUD entries are still present in the upload queue. From ead9327992c0d06a42601b3d2f1f8c53c584d3c2 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Thu, 10 Oct 2024 06:50:58 +0200 Subject: [PATCH 4/8] chore: add tests --- core/build.gradle.kts | 2 + .../com/powersync/bucket/BucketStorage.kt | 4 +- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 5 +- .../powersync/db/internal/InternalDatabase.kt | 271 +---------------- .../db/internal/InternalDatabaseImpl.kt | 272 ++++++++++++++++++ .../internal/PowerSyncTransactionFactory.kt | 4 +- .../com/powersync/bucket/BucketStorageTest.kt | 151 ++++++++++ 7 files changed, 439 insertions(+), 270 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt create mode 100644 core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 78fbb70e..f8b88a4f 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -9,6 +9,7 @@ plugins { alias(libs.plugins.mavenPublishPlugin) alias(libs.plugins.downloadPlugin) id("com.powersync.plugins.sonatype") + id("dev.mokkery") version "2.4.0" } val sqliteVersion = "3450000" @@ -115,6 +116,7 @@ kotlin { commonTest.dependencies { implementation(libs.kotlin.test) + implementation(libs.test.coroutines) } } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 794da621..935b21b0 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -2,18 +2,18 @@ package com.powersync.bucket import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import co.touchlab.kermit.Logger -import com.powersync.db.internal.PsInternalDatabase import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import co.touchlab.stately.concurrency.AtomicBoolean import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.CrudRow +import com.powersync.db.internal.InternalDatabase import kotlinx.serialization.encodeToString import com.powersync.db.internal.InternalTable import com.powersync.utils.JsonUtil internal class BucketStorage( - private val db: PsInternalDatabase, + private val db: InternalDatabase, private val logger: Logger ) { private val tableNames: MutableSet = mutableSetOf() diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index eef61a4a..03850db4 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -14,7 +14,7 @@ import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.CrudRow import com.powersync.db.crud.CrudTransaction -import com.powersync.db.internal.PsInternalDatabase +import com.powersync.db.internal.InternalDatabaseImpl import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.Schema @@ -53,7 +53,7 @@ internal class PowerSyncDatabaseImpl( val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { - private val internalDb = PsInternalDatabase(driver, scope) + private val internalDb = InternalDatabaseImpl(driver, scope) private val bucketStorage: BucketStorage = BucketStorage(internalDb, logger) /** @@ -73,6 +73,7 @@ internal class PowerSyncDatabaseImpl( logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" } + internalDb.get("SELECT powersync_init()") {} applySchema() updateHasSynced() } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 533b223e..5796e12f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -1,273 +1,16 @@ package com.powersync.db.internal -import app.cash.sqldelight.ExecutableQuery -import app.cash.sqldelight.Query -import app.cash.sqldelight.async.coroutines.awaitAsList -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull -import app.cash.sqldelight.coroutines.asFlow -import app.cash.sqldelight.coroutines.mapToList -import app.cash.sqldelight.db.QueryResult -import app.cash.sqldelight.db.SqlCursor -import app.cash.sqldelight.db.SqlPreparedStatement -import com.powersync.PowerSyncTransaction +import com.persistence.PowersyncQueries import com.powersync.PsSqlDriver import com.powersync.db.Queries import com.powersync.persistence.PsDatabase -import com.powersync.utils.JsonUtil -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.serialization.encodeToString +internal interface InternalDatabase : Queries { + val driver: PsSqlDriver + val transactor: PsDatabase + val queries: PowersyncQueries -@OptIn(FlowPreview::class) -internal class PsInternalDatabase(val driver: PsSqlDriver, private val scope: CoroutineScope) : - Queries { - - val transactor: PsDatabase = PsDatabase(driver) - val queries = transactor.powersyncQueries - - companion object { - const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" - const val DEFAULT_WATCH_THROTTLE_MS = 30L - } - - init { - scope.launch { - val accumulatedUpdates = mutableSetOf() - tableUpdates() -// Debounce will discard any events which occur inside the debounce window -// This will accumulate those table updates - .onEach { tables -> accumulatedUpdates.addAll(tables) } - .debounce(DEFAULT_WATCH_THROTTLE_MS) - .collect { - val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } - driver.notifyListeners(queryKeys = dataTables.toTypedArray()) - accumulatedUpdates.clear() - } - } - } - - override suspend fun execute( - sql: String, - parameters: List? - ): Long { - val numParams = parameters?.size ?: 0 - - return driver.execute( - identifier = null, - sql = sql, - parameters = numParams, - binders = getBindersFromParams(parameters) - ).await() - } - - override suspend fun get( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): RowType { - val result = this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsOneOrNull() - return requireNotNull(result) { "Query returned no result" } - } - - override suspend fun getAll( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): List { - return this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsList() - } - - override suspend fun getOptional( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): RowType? { - return this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsOneOrNull() - } - - override fun watch( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): Flow> { - - val tables = getSourceTables(sql, parameters).map { toFriendlyTableName(it) } - .filter { it.isNotBlank() }.toSet() - return watchQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper, - tables = tables - ).asFlow().mapToList(scope.coroutineContext) - } - - private fun createQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - ): ExecutableQuery { - return object : ExecutableQuery(mapper) { - override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { - return driver.executeQuery(null, query, mapper, parameters, binders) - } - } - } - - private fun watchQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - tables: Set = setOf() - ): Query { - - return object : Query(mapper) { - override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { - return driver.executeQuery(null, query, mapper, parameters, binders) - } - - override fun addListener(listener: Listener) { - driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) - } - - override fun removeListener(listener: Listener) { - driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) - } - } - } - - override suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R { - return transactor.transactionWithResult(noEnclosing = true) { - val transaction = PowerSyncTransaction(this@PsInternalDatabase) - callback(transaction) - } - } - - override suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R { - return transactor.transactionWithResult(noEnclosing = true) { - val transaction = PowerSyncTransaction(this@PsInternalDatabase) - callback(transaction) - } - } - - // Register callback for table updates - private fun tableUpdates(): Flow> { - return driver.tableUpdates() - } - - // Register callback for table updates on a specific table - fun updatesOnTable(tableName: String): Flow { - return driver.updatesOnTable(tableName) - } - - private fun toFriendlyTableName(tableName: String): String { - val regex = POWERSYNC_TABLE_MATCH.toRegex() - if (regex.containsMatchIn(tableName)) { - return tableName.replace(regex, "") - } - return tableName - } - - private fun getSourceTables( - sql: String, - parameters: List?, - ): Set { - val rows = createQuery( - query = "EXPLAIN $sql", - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = { - ExplainQueryResult( - addr = it.getString(0)!!, - opcode = it.getString(1)!!, - p1 = it.getLong(2)!!, - p2 = it.getLong(3)!!, - p3 = it.getLong(4)!! - ) - } - ).executeAsList() - - val rootPages = mutableListOf() - for (row in rows) { - if ((row.opcode == "OpenRead" || row.opcode == "OpenWrite") && row.p3 == 0L && row.p2 != 0L) { - rootPages.add(row.p2) - } - } - val params = listOf(JsonUtil.json.encodeToString(rootPages)) - val tableRows = createQuery( - "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - parameters = params.size, - binders = { - bindString(0, params[0]) - }, mapper = { it.getString(0)!! } - ).executeAsList() - - return tableRows.toSet() - } - - fun getExistingTableNames(tableGlob: String): List { - val existingTableNames = createQuery( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", - parameters = 1, - binders = { - bindString(0, tableGlob) - }, - mapper = { cursor -> - cursor.getString(0)!! - } - ).executeAsList() - return existingTableNames - } - - internal data class ExplainQueryResult( - val addr: String, - val opcode: String, - val p1: Long, - val p2: Long, - val p3: Long, - ) -} - -internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatement.() -> Unit)? { - if (parameters.isNullOrEmpty()) { - return null - } - return { - parameters.forEachIndexed { index, parameter -> - when (parameter) { - is Boolean -> bindBoolean(index, parameter) - is String -> bindString(index, parameter) - is Long -> bindLong(index, parameter) - is Double -> bindDouble(index, parameter) - is ByteArray -> bindBytes(index, parameter) - else -> { - if(parameter != null) { - throw IllegalArgumentException("Unsupported parameter type: ${parameter::class}, at index $index") - } - } - } - } - } + fun getExistingTableNames(tableGlob: String): List + fun updatesOnTable(tableName: String): Flow } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt new file mode 100644 index 00000000..a4a967fc --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -0,0 +1,272 @@ +package com.powersync.db.internal + +import app.cash.sqldelight.ExecutableQuery +import app.cash.sqldelight.Query +import app.cash.sqldelight.async.coroutines.awaitAsList +import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull +import app.cash.sqldelight.coroutines.asFlow +import app.cash.sqldelight.coroutines.mapToList +import app.cash.sqldelight.db.QueryResult +import app.cash.sqldelight.db.SqlCursor +import app.cash.sqldelight.db.SqlPreparedStatement +import com.powersync.PowerSyncTransaction +import com.powersync.PsSqlDriver +import com.powersync.persistence.PsDatabase +import com.powersync.utils.JsonUtil +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.serialization.encodeToString + + +@OptIn(FlowPreview::class) +internal class InternalDatabaseImpl(override val driver: PsSqlDriver, private val scope: CoroutineScope): + InternalDatabase { + + override val transactor: PsDatabase = PsDatabase(driver) + override val queries = transactor.powersyncQueries + + companion object { + const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" + const val DEFAULT_WATCH_THROTTLE_MS = 30L + } + + init { + scope.launch { + val accumulatedUpdates = mutableSetOf() + tableUpdates() +// Debounce will discard any events which occur inside the debounce window +// This will accumulate those table updates + .onEach { tables -> accumulatedUpdates.addAll(tables) } + .debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { + val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } + driver.notifyListeners(queryKeys = dataTables.toTypedArray()) + accumulatedUpdates.clear() + } + } + } + + override suspend fun execute( + sql: String, + parameters: List? + ): Long { + val numParams = parameters?.size ?: 0 + + return driver.execute( + identifier = null, + sql = sql, + parameters = numParams, + binders = getBindersFromParams(parameters) + ).await() + } + + override suspend fun get( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): RowType { + val result = this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsOneOrNull() + return requireNotNull(result) { "Query returned no result" } + } + + override suspend fun getAll( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): List { + return this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsList() + } + + override suspend fun getOptional( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): RowType? { + return this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsOneOrNull() + } + + override fun watch( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): Flow> { + + val tables = getSourceTables(sql, parameters).map { toFriendlyTableName(it) } + .filter { it.isNotBlank() }.toSet() + return watchQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper, + tables = tables + ).asFlow().mapToList(scope.coroutineContext) + } + + private fun createQuery( + query: String, + mapper: (SqlCursor) -> T, + parameters: Int = 0, + binders: (SqlPreparedStatement.() -> Unit)? = null, + ): ExecutableQuery { + return object : ExecutableQuery(mapper) { + override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { + return driver.executeQuery(null, query, mapper, parameters, binders) + } + } + } + + private fun watchQuery( + query: String, + mapper: (SqlCursor) -> T, + parameters: Int = 0, + binders: (SqlPreparedStatement.() -> Unit)? = null, + tables: Set = setOf() + ): Query { + + return object : Query(mapper) { + override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { + return driver.executeQuery(null, query, mapper, parameters, binders) + } + + override fun addListener(listener: Listener) { + driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) + } + + override fun removeListener(listener: Listener) { + driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) + } + } + } + + override suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R { + return transactor.transactionWithResult(noEnclosing = true) { + val transaction = PowerSyncTransaction(this@InternalDatabaseImpl) + callback(transaction) + } + } + + override suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R { + return transactor.transactionWithResult(noEnclosing = true) { + val transaction = PowerSyncTransaction(this@InternalDatabaseImpl) + callback(transaction) + } + } + + // Register callback for table updates + private fun tableUpdates(): Flow> { + return driver.tableUpdates() + } + + // Register callback for table updates on a specific table + override fun updatesOnTable(tableName: String): Flow { + return driver.updatesOnTable(tableName) + } + + private fun toFriendlyTableName(tableName: String): String { + val regex = POWERSYNC_TABLE_MATCH.toRegex() + if (regex.containsMatchIn(tableName)) { + return tableName.replace(regex, "") + } + return tableName + } + + private fun getSourceTables( + sql: String, + parameters: List?, + ): Set { + val rows = createQuery( + query = "EXPLAIN $sql", + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = { + ExplainQueryResult( + addr = it.getString(0)!!, + opcode = it.getString(1)!!, + p1 = it.getLong(2)!!, + p2 = it.getLong(3)!!, + p3 = it.getLong(4)!! + ) + } + ).executeAsList() + + val rootPages = mutableListOf() + for (row in rows) { + if ((row.opcode == "OpenRead" || row.opcode == "OpenWrite") && row.p3 == 0L && row.p2 != 0L) { + rootPages.add(row.p2) + } + } + val params = listOf(JsonUtil.json.encodeToString(rootPages)) + val tableRows = createQuery( + "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", + parameters = params.size, + binders = { + bindString(0, params[0]) + }, mapper = { it.getString(0)!! } + ).executeAsList() + + return tableRows.toSet() + } + + override fun getExistingTableNames(tableGlob: String): List { + val existingTableNames = createQuery( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", + parameters = 1, + binders = { + bindString(0, tableGlob) + }, + mapper = { cursor -> + cursor.getString(0)!! + } + ).executeAsList() + return existingTableNames + } + + internal data class ExplainQueryResult( + val addr: String, + val opcode: String, + val p1: Long, + val p2: Long, + val p3: Long, + ) +} + +internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatement.() -> Unit)? { + if (parameters.isNullOrEmpty()) { + return null + } + return { + parameters.forEachIndexed { index, parameter -> + when (parameter) { + is Boolean -> bindBoolean(index, parameter) + is String -> bindString(index, parameter) + is Long -> bindLong(index, parameter) + is Double -> bindDouble(index, parameter) + is ByteArray -> bindBytes(index, parameter) + else -> { + if(parameter != null) { + throw IllegalArgumentException("Unsupported parameter type: ${parameter::class}, at index $index") + } + } + } + } + } +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt index e562cbf2..8d2e11b9 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt @@ -2,10 +2,10 @@ package com.powersync import app.cash.sqldelight.db.SqlCursor import com.powersync.db.internal.PowerSyncTransaction -import com.powersync.db.internal.PsInternalDatabase +import com.powersync.db.internal.InternalDatabaseImpl internal fun PowerSyncTransaction( - internalDatabase: PsInternalDatabase, + internalDatabase: InternalDatabaseImpl, ): PowerSyncTransaction { val transaction = object : PowerSyncTransaction { diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt new file mode 100644 index 00000000..392c4be9 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -0,0 +1,151 @@ +import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull +import kotlin.test.* +import com.powersync.bucket.BucketStorage +import co.touchlab.kermit.Logger +import com.persistence.GetCrudFirstEntry +import com.powersync.bucket.BucketState +import com.powersync.db.internal.InternalDatabase +import dev.mokkery.answering.returns +import dev.mokkery.every +import dev.mokkery.everySuspend +import dev.mokkery.matcher.any +import dev.mokkery.mock +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest + +class BucketStorageTest { + private lateinit var bucketStorage: BucketStorage + private lateinit var mockDb: InternalDatabase + + @Test + fun testGetMaxOpId() { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + } + bucketStorage = BucketStorage(mockDb, Logger) + assertEquals("9223372036854775807", bucketStorage.getMaxOpId()) + } + + @Test + fun testGetClientId() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns "test-client-id" + } + bucketStorage = BucketStorage(mockDb, Logger) + val clientId = bucketStorage.getClientId() + assertEquals("test-client-id", clientId) + } + + @Test + fun testGetClientIdThrowsException() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns null + } + bucketStorage = BucketStorage(mockDb, Logger) + + assertFailsWith { + bucketStorage.getClientId() + } + } + + @Test + fun testNextCrudItem() = runTest { + mockDb = mock() { + val mockFirstCrudEntry = GetCrudFirstEntry(1, 1, "test-data") + everySuspend { queries.getCrudFirstEntry().awaitAsOneOrNull() } returns mockFirstCrudEntry + } + bucketStorage = BucketStorage(mockDb, Logger) + + + val result = bucketStorage.nextCrudItem() + assertNotNull(result) + } + + @Test + fun testHasCrud() = runTest { + mockDb = mock() { + everySuspend { queries.hasCrud().awaitAsOneOrNull() } returns 1L + } + bucketStorage = BucketStorage(mockDb, Logger) + + assertTrue(bucketStorage.hasCrud()) + } + + @Test + fun testUpdateLocalTarget() = runBlocking { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns 1L + everySuspend { writeTransaction(any()) } returns true + } + bucketStorage = BucketStorage(mockDb, Logger) + + val result = bucketStorage.updateLocalTarget { "new-checkpoint" } + assertTrue(result) + } + +// @Test +// fun testSaveSyncData() = runTest { +// val mockSyncDataBatch = mock() +// every { mockSyncDataBatch.buckets } returns listOf() +// every { mockDb.writeTransaction(any()) } returns Unit +// +// bucketStorage.saveSyncData(mockSyncDataBatch) +// // Assert that the transaction was called (you might need to use a spy or different mocking strategy to verify this) +// } + + @Test + fun testGetBucketStates() = runTest { + val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns 1L + everySuspend { getAll(any(), any(), any()) } returns mockBucketStates + } + bucketStorage = BucketStorage(mockDb, Logger) + + val result = bucketStorage.getBucketStates() + assertEquals(mockBucketStates, result) + } + + @Test + fun testRemoveBuckets() = runBlocking { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns 1L + everySuspend { writeTransaction(any()) } returns Unit + } + + bucketStorage.removeBuckets(listOf("bucket1", "bucket2")) + // Assert that the transaction was called for each bucket (you might need to use a spy or different mocking strategy to verify this) + } + + @Test + fun testHasCompletedSync() = runBlocking { + every { mockDb.getOptional(any(), null, any()) } returns "2023-01-01" + + assertTrue(bucketStorage.hasCompletedSync()) + } +} From 4ed6dd2375dc9bf7dae1cb613584db80e0079773 Mon Sep 17 00:00:00 2001 From: Dominic Date: Thu, 10 Oct 2024 10:35:28 +0200 Subject: [PATCH 5/8] chore: update functions to make testing easier --- .../com/powersync/bucket/BucketStorage.kt | 22 +++--- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 19 ++--- .../com/powersync/bucket/BucketStorageTest.kt | 72 +++++++++---------- .../sqldelight/com/persistence/Powersync.sq | 6 -- 4 files changed, 51 insertions(+), 68 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 935b21b0..47518a26 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,6 +1,5 @@ package com.powersync.bucket -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import co.touchlab.kermit.Logger import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult @@ -54,20 +53,24 @@ internal class BucketStorage( } suspend fun nextCrudItem(): CrudEntry? { - val next = db.queries.getCrudFirstEntry().awaitAsOneOrNull() - val crudItem = next?.let { CrudEntry.fromRow( - CrudRow( - id = it.id.toString(), - data = it.data_!!, - txId = it.tx_id?.toInt() + val crudItem = db.getOptional("SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1") { cursor -> + CrudEntry.fromRow( + CrudRow( + id = cursor.getString(0)!!, + txId = cursor.getString(1)?.toInt(), + data = cursor.getString(2)!! + ) ) - ) } + } return crudItem } suspend fun hasCrud(): Boolean { - return db.queries.hasCrud().awaitAsOneOrNull() == 1L + val res = db.getOptional("SELECT 1 FROM ps_crud LIMIT 1") { + it.getLong(0)!! + } + return res == 1L } suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { @@ -145,7 +148,6 @@ internal class BucketStorage( } } - private suspend fun deleteBucket(bucketName: String) { db.writeTransaction{ tx -> diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 03850db4..73e50e56 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -2,7 +2,6 @@ package com.powersync.db import app.cash.sqldelight.async.coroutines.awaitAsList import app.cash.sqldelight.async.coroutines.awaitAsOne -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import app.cash.sqldelight.db.SqlCursor import co.touchlab.kermit.Logger import com.powersync.DatabaseDriverFactory @@ -165,23 +164,15 @@ internal class PowerSyncDatabaseImpl( override suspend fun getNextCrudTransaction(): CrudTransaction? { return this.readTransaction { - val firstEntry = internalDb.queries.getCrudFirstEntry().awaitAsOneOrNull() + val entry = bucketStorage.nextCrudItem() ?: return@readTransaction null - val first = CrudEntry.fromRow( - CrudRow( - id = firstEntry.id.toString(), - data = firstEntry.data_!!, - txId = firstEntry.tx_id?.toInt() - ) - ) - val txId = first.transactionId - val entries: List - if (txId == null) { - entries = listOf(first) + val txId = entry.transactionId + val entries: List = if (txId == null) { + listOf(entry) } else { - entries = internalDb.queries.getCrudEntryByTxId(txId.toLong()).awaitAsList().map { + internalDb.queries.getCrudEntryByTxId(txId.toLong()).awaitAsList().map { CrudEntry.fromRow( CrudRow( id = it.id.toString(), diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 392c4be9..583c274b 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -1,9 +1,9 @@ -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import kotlin.test.* import com.powersync.bucket.BucketStorage import co.touchlab.kermit.Logger -import com.persistence.GetCrudFirstEntry import com.powersync.bucket.BucketState +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.UpdateType import com.powersync.db.internal.InternalDatabase import dev.mokkery.answering.returns import dev.mokkery.every @@ -60,27 +60,53 @@ class BucketStorageTest { @Test fun testNextCrudItem() = runTest { + val mockCrudEntry = CrudEntry(id = "1", clientId = 1, op = UpdateType.PUT, table = "table1", transactionId = 1, opData = mapOf("key" to "value")) mockDb = mock() { - val mockFirstCrudEntry = GetCrudFirstEntry(1, 1, "test-data") - everySuspend { queries.getCrudFirstEntry().awaitAsOneOrNull() } returns mockFirstCrudEntry + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns mockCrudEntry + } + bucketStorage = BucketStorage(mockDb, Logger) + + + val result = bucketStorage.nextCrudItem() + assertEquals(mockCrudEntry, result) + } + + @Test + fun testNullNextCrudItem() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns null } bucketStorage = BucketStorage(mockDb, Logger) val result = bucketStorage.nextCrudItem() - assertNotNull(result) + assertEquals(null, result) } @Test fun testHasCrud() = runTest { mockDb = mock() { - everySuspend { queries.hasCrud().awaitAsOneOrNull() } returns 1L + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns 1L } bucketStorage = BucketStorage(mockDb, Logger) assertTrue(bucketStorage.hasCrud()) } + @Test + fun testNullHasCrud() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns null + } + bucketStorage = BucketStorage(mockDb, Logger) + + assertFalse(bucketStorage.hasCrud()) + } + @Test fun testUpdateLocalTarget() = runBlocking { mockDb = mock() { @@ -98,16 +124,6 @@ class BucketStorageTest { assertTrue(result) } -// @Test -// fun testSaveSyncData() = runTest { -// val mockSyncDataBatch = mock() -// every { mockSyncDataBatch.buckets } returns listOf() -// every { mockDb.writeTransaction(any()) } returns Unit -// -// bucketStorage.saveSyncData(mockSyncDataBatch) -// // Assert that the transaction was called (you might need to use a spy or different mocking strategy to verify this) -// } - @Test fun testGetBucketStates() = runTest { val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) @@ -126,26 +142,6 @@ class BucketStorageTest { assertEquals(mockBucketStates, result) } - @Test - fun testRemoveBuckets() = runBlocking { - mockDb = mock() { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") - everySuspend { getOptional( - any(), - any(), - any() - )} returns 1L - everySuspend { writeTransaction(any()) } returns Unit - } - - bucketStorage.removeBuckets(listOf("bucket1", "bucket2")) - // Assert that the transaction was called for each bucket (you might need to use a spy or different mocking strategy to verify this) - } - - @Test - fun testHasCompletedSync() = runBlocking { - every { mockDb.getOptional(any(), null, any()) } returns "2023-01-01" - - assertTrue(bucketStorage.hasCompletedSync()) - } + // TODO: Add tests for removeBuckets, hasCompletedSync, syncLocalDatabase currently not covered because + // currently the internal methods are private and cannot be accessed from the test class } diff --git a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq index f27815b6..2c8c88fe 100644 --- a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq +++ b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq @@ -12,15 +12,9 @@ powersyncClear: SELECT powersync_clear(?); -- CRUD operations -hasCrud: -SELECT 1 FROM ps_crud LIMIT 1; - getCrudEntries: SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?; -getCrudFirstEntry: -SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1; - getCrudEntryByTxId: SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC; From d137156953ca479a34140cfc067ced3d8dfcf60d Mon Sep 17 00:00:00 2001 From: Dominic Date: Thu, 10 Oct 2024 15:36:45 +0200 Subject: [PATCH 6/8] chore: add tests --- build.gradle.kts | 11 + core/build.gradle.kts | 1 + .../com/powersync/bucket/BucketStorage.kt | 324 +----------------- .../com/powersync/bucket/BucketStorageImpl.kt | 315 +++++++++++++++++ .../connectors/PowerSyncBackendConnector.kt | 6 +- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 3 +- .../kotlin/com/powersync/sync/SyncStream.kt | 18 +- .../com/powersync/bucket/BucketStorageTest.kt | 22 +- .../com/powersync/sync/SyncStreamTest.kt | 92 +++++ gradle/libs.versions.toml | 1 + 10 files changed, 453 insertions(+), 340 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt create mode 100644 core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index a5ae6e87..d73a7cdc 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,6 +14,15 @@ plugins { alias(libs.plugins.downloadPlugin) apply false } +// Having different versions of this lead to the issue mentioned here +// https://stackoverflow.com/questions/76479563/could-not-found-kotlinx-atomicfu-compose-multiplatform-ios +// This and the `apply(plugin = "kotlinx-atomicfu")` in allprojects below solve the issue but can be deleted in future when +// the issue is resolved https://github.com/Kotlin/kotlinx-atomicfu/issues/469 +buildscript { + dependencies { + classpath("org.jetbrains.kotlinx:atomicfu-gradle-plugin:0.23.1") + } +} allprojects { repositories { @@ -44,6 +53,8 @@ allprojects { exclude(group = "ai.grazie.nlp") } + // + apply(plugin = "kotlinx-atomicfu") } subprojects { val GROUP: String by project diff --git a/core/build.gradle.kts b/core/build.gradle.kts index f8b88a4f..5d0f7b16 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -117,6 +117,7 @@ kotlin { commonTest.dependencies { implementation(libs.kotlin.test) implementation(libs.test.coroutines) + implementation(libs.kermit.test) } } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 47518a26..7f55d180 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,315 +1,19 @@ package com.powersync.bucket -import co.touchlab.kermit.Logger +import com.powersync.db.crud.CrudEntry import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult -import co.touchlab.stately.concurrency.AtomicBoolean -import com.powersync.db.crud.CrudEntry -import com.powersync.db.crud.CrudRow -import com.powersync.db.internal.InternalDatabase -import kotlinx.serialization.encodeToString -import com.powersync.db.internal.InternalTable -import com.powersync.utils.JsonUtil - -internal class BucketStorage( - private val db: InternalDatabase, - private val logger: Logger -) { - private val tableNames: MutableSet = mutableSetOf() - private var hasCompletedSync = AtomicBoolean(false) - private var pendingBucketDeletes = AtomicBoolean(false) - - /** - * Count up, and do a compact on startup. - */ - private var compactCounter = COMPACT_OPERATION_INTERVAL - - companion object { - const val MAX_OP_ID = "9223372036854775807" - const val COMPACT_OPERATION_INTERVAL = 1_000 - } - - init { - readTableNames() - } - - private fun readTableNames() { - tableNames.clear() - // Query to get existing table names - val names = db.getExistingTableNames("ps_data_*") - - tableNames.addAll(names) - } - - fun getMaxOpId(): String { - return MAX_OP_ID - } - - suspend fun getClientId(): String { - val id = db.getOptional("SELECT powersync_client_id() as client_id") { - it.getString(0)!! - } - return id ?: throw IllegalStateException("Client ID not found") - } - - suspend fun nextCrudItem(): CrudEntry? { - val crudItem = db.getOptional("SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1") { cursor -> - CrudEntry.fromRow( - CrudRow( - id = cursor.getString(0)!!, - txId = cursor.getString(1)?.toInt(), - data = cursor.getString(2)!! - ) - ) - } - - return crudItem - } - - suspend fun hasCrud(): Boolean { - val res = db.getOptional("SELECT 1 FROM ps_crud LIMIT 1") { - it.getLong(0)!! - } - return res == 1L - } - - suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { - db.getOptional( - "SELECT target_op FROM ${InternalTable.BUCKETS} WHERE name = '\$local' AND target_op = ?", - parameters = listOf(MAX_OP_ID), - mapper = { cursor -> cursor.getLong(0)!! } - ) - ?: // Nothing to update - return false - - val seqBefore = - db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { - it.getLong(0)!! - } ?: // Nothing to update - return false - - val opId = checkpointCallback() - - logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } - - return db.writeTransaction { - if (hasCrud()) { - logger.w { "[updateLocalTarget] ps crud is not empty" } - return@writeTransaction false - } - - val seqAfter = - db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { - it.getLong(0)!! - } - ?: // assert isNotEmpty - throw AssertionError("Sqlite Sequence should not be empty") - - if (seqAfter != seqBefore) { - logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") - // New crud data may have been uploaded since we got the checkpoint. Abort. - return@writeTransaction false - } - - db.execute( - "UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'", - listOf(opId) - ) - - return@writeTransaction true - } - } - - suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { - db.writeTransaction { tx -> - val jsonString = JsonUtil.json.encodeToString(syncDataBatch) - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("save", jsonString) - ) - } - this.compactCounter += syncDataBatch.buckets.sumOf { it.data.size } - } - - suspend fun getBucketStates(): List { - return db.getAll( - "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", - mapper = { cursor -> - BucketState( - bucket = cursor.getString(0)!!, - opId = cursor.getString(1)!! - ) - }) - } - - suspend fun removeBuckets(bucketsToDelete: List) { - bucketsToDelete.forEach { bucketName -> - deleteBucket(bucketName) - } - } - - private suspend fun deleteBucket(bucketName: String) { - - db.writeTransaction{ tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("delete_bucket", bucketName) - ) - } - - Logger.d("[deleteBucket] Done deleting") - - this.pendingBucketDeletes.value = true - } - - suspend fun hasCompletedSync(): Boolean { - if (hasCompletedSync.value) { - return true - } - - val completedSync = db.getOptional( - "SELECT powersync_last_synced_at()", - mapper = { cursor -> - cursor.getString(0)!! - }) - - return if (completedSync != null) { - hasCompletedSync.value = true - true - } else { - false - } - } - - suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult { - val result = validateChecksums(targetCheckpoint) - - if (!result.checkpointValid) { - logger.w { "[SyncLocalDatabase] Checksums failed for ${result.checkpointFailures}" } - result.checkpointFailures?.forEach { bucketName -> - deleteBucket(bucketName) - } - result.ready = false - return result - } - - val bucketNames = targetCheckpoint.checksums.map { it.bucket } - - db.writeTransaction { tx -> - tx.execute( - "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", - listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)) - ) - - if (targetCheckpoint.writeCheckpoint != null) { - tx.execute( - "UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'", - listOf(targetCheckpoint.writeCheckpoint), - ) - } - } - - val valid = updateObjectsFromBuckets() - - if (!valid) { - return SyncLocalDatabaseResult( - ready = false, - checkpointValid = true, - ) - } - - this.forceCompact() - - return SyncLocalDatabaseResult( - ready = true, - ) - } - - private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { - val res = db.getOptional( - "SELECT powersync_validate_checkpoint(?) AS result", - parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), - mapper = { cursor -> - cursor.getString(0)!! - }) - ?: //no result - return SyncLocalDatabaseResult( - ready = false, - checkpointValid = false, - ) - - return JsonUtil.json.decodeFromString(res) - } - - /** - * Atomically update the local state. - * - * This includes creating new tables, dropping old tables, and copying data over from the oplog. - */ - private suspend fun updateObjectsFromBuckets(): Boolean { - return db.writeTransaction { tx -> - - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("sync_local", "") - ) - - val res = tx.get("select last_insert_rowid()") { cursor -> - cursor.getLong(0)!! - } - - return@writeTransaction res == 1L - } - } - - private suspend fun forceCompact() { - // Reset counter - this.compactCounter = COMPACT_OPERATION_INTERVAL - this.pendingBucketDeletes.value = true - - this.autoCompact() - } - - - private suspend fun autoCompact() { - // 1. Delete buckets - deletePendingBuckets() - - // 2. Clear REMOVE operations, only keeping PUT ones - clearRemoveOps() - } - - private suspend fun deletePendingBuckets() { - if (!this.pendingBucketDeletes.value) { - return - } - - db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","") - ) - - // Executed once after start-up, and again when there are pending deletes. - pendingBucketDeletes.value = false - } - } - - private suspend fun clearRemoveOps() { - if (this.compactCounter < COMPACT_OPERATION_INTERVAL) { - return - } - - db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - listOf("clear_remove_ops", "") - ) - } - this.compactCounter = 0 - } - @Suppress("UNUSED_PARAMETER") - fun setTargetCheckpoint(checkpoint: Checkpoint) { - // No-op for now - } -} +internal interface BucketStorage { + fun getMaxOpId(): String + suspend fun getClientId(): String + suspend fun nextCrudItem(): CrudEntry? + suspend fun hasCrud(): Boolean + suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean + suspend fun saveSyncData(syncDataBatch: SyncDataBatch) + suspend fun getBucketStates(): List + suspend fun removeBuckets(bucketsToDelete: List) + suspend fun hasCompletedSync(): Boolean + suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult + fun setTargetCheckpoint(checkpoint: Checkpoint) +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt new file mode 100644 index 00000000..3d528dfb --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -0,0 +1,315 @@ +package com.powersync.bucket + +import co.touchlab.kermit.Logger +import com.powersync.sync.SyncDataBatch +import com.powersync.sync.SyncLocalDatabaseResult +import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudRow +import com.powersync.db.internal.InternalDatabase +import kotlinx.serialization.encodeToString +import com.powersync.db.internal.InternalTable +import com.powersync.utils.JsonUtil + +internal class BucketStorageImpl( + private val db: InternalDatabase, + private val logger: Logger +): BucketStorage { + private val tableNames: MutableSet = mutableSetOf() + private var hasCompletedSync = AtomicBoolean(false) + private var pendingBucketDeletes = AtomicBoolean(false) + + /** + * Count up, and do a compact on startup. + */ + private var compactCounter = COMPACT_OPERATION_INTERVAL + + companion object { + const val MAX_OP_ID = "9223372036854775807" + const val COMPACT_OPERATION_INTERVAL = 1_000 + } + + init { + readTableNames() + } + + private fun readTableNames() { + tableNames.clear() + // Query to get existing table names + val names = db.getExistingTableNames("ps_data_*") + + tableNames.addAll(names) + } + + override fun getMaxOpId(): String { + return MAX_OP_ID + } + + override suspend fun getClientId(): String { + val id = db.getOptional("SELECT powersync_client_id() as client_id") { + it.getString(0)!! + } + return id ?: throw IllegalStateException("Client ID not found") + } + + override suspend fun nextCrudItem(): CrudEntry? { + val crudItem = db.getOptional("SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1") { cursor -> + CrudEntry.fromRow( + CrudRow( + id = cursor.getString(0)!!, + txId = cursor.getString(1)?.toInt(), + data = cursor.getString(2)!! + ) + ) + } + + return crudItem + } + + override suspend fun hasCrud(): Boolean { + val res = db.getOptional("SELECT 1 FROM ps_crud LIMIT 1") { + it.getLong(0)!! + } + return res == 1L + } + + override suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { + db.getOptional( + "SELECT target_op FROM ${InternalTable.BUCKETS} WHERE name = '\$local' AND target_op = ?", + parameters = listOf(MAX_OP_ID), + mapper = { cursor -> cursor.getLong(0)!! } + ) + ?: // Nothing to update + return false + + val seqBefore = + db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { + it.getLong(0)!! + } ?: // Nothing to update + return false + + val opId = checkpointCallback() + + logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } + + return db.writeTransaction { + if (hasCrud()) { + logger.w { "[updateLocalTarget] ps crud is not empty" } + return@writeTransaction false + } + + val seqAfter = + db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { + it.getLong(0)!! + } + ?: // assert isNotEmpty + throw AssertionError("Sqlite Sequence should not be empty") + + if (seqAfter != seqBefore) { + logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") + // New crud data may have been uploaded since we got the checkpoint. Abort. + return@writeTransaction false + } + + db.execute( + "UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'", + listOf(opId) + ) + + return@writeTransaction true + } + } + + override suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { + db.writeTransaction { tx -> + val jsonString = JsonUtil.json.encodeToString(syncDataBatch) + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("save", jsonString) + ) + } + this.compactCounter += syncDataBatch.buckets.sumOf { it.data.size } + } + + override suspend fun getBucketStates(): List { + return db.getAll( + "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", + mapper = { cursor -> + BucketState( + bucket = cursor.getString(0)!!, + opId = cursor.getString(1)!! + ) + }) + } + + override suspend fun removeBuckets(bucketsToDelete: List) { + bucketsToDelete.forEach { bucketName -> + deleteBucket(bucketName) + } + } + + private suspend fun deleteBucket(bucketName: String) { + + db.writeTransaction{ tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("delete_bucket", bucketName) + ) + } + + Logger.d("[deleteBucket] Done deleting") + + this.pendingBucketDeletes.value = true + } + + override suspend fun hasCompletedSync(): Boolean { + if (hasCompletedSync.value) { + return true + } + + val completedSync = db.getOptional( + "SELECT powersync_last_synced_at()", + mapper = { cursor -> + cursor.getString(0)!! + }) + + return if (completedSync != null) { + hasCompletedSync.value = true + true + } else { + false + } + } + + override suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult { + val result = validateChecksums(targetCheckpoint) + + if (!result.checkpointValid) { + logger.w { "[SyncLocalDatabase] Checksums failed for ${result.checkpointFailures}" } + result.checkpointFailures?.forEach { bucketName -> + deleteBucket(bucketName) + } + result.ready = false + return result + } + + val bucketNames = targetCheckpoint.checksums.map { it.bucket } + + db.writeTransaction { tx -> + tx.execute( + "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", + listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)) + ) + + if (targetCheckpoint.writeCheckpoint != null) { + tx.execute( + "UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'", + listOf(targetCheckpoint.writeCheckpoint), + ) + } + } + + val valid = updateObjectsFromBuckets() + + if (!valid) { + return SyncLocalDatabaseResult( + ready = false, + checkpointValid = true, + ) + } + + this.forceCompact() + + return SyncLocalDatabaseResult( + ready = true, + ) + } + + private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { + val res = db.getOptional( + "SELECT powersync_validate_checkpoint(?) AS result", + parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), + mapper = { cursor -> + cursor.getString(0)!! + }) + ?: //no result + return SyncLocalDatabaseResult( + ready = false, + checkpointValid = false, + ) + + return JsonUtil.json.decodeFromString(res) + } + + /** + * Atomically update the local state. + * + * This includes creating new tables, dropping old tables, and copying data over from the oplog. + */ + private suspend fun updateObjectsFromBuckets(): Boolean { + return db.writeTransaction { tx -> + + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("sync_local", "") + ) + + val res = tx.get("select last_insert_rowid()") { cursor -> + cursor.getLong(0)!! + } + + return@writeTransaction res == 1L + } + } + + private suspend fun forceCompact() { + // Reset counter + this.compactCounter = COMPACT_OPERATION_INTERVAL + this.pendingBucketDeletes.value = true + + this.autoCompact() + } + + + private suspend fun autoCompact() { + // 1. Delete buckets + deletePendingBuckets() + + // 2. Clear REMOVE operations, only keeping PUT ones + clearRemoveOps() + } + + private suspend fun deletePendingBuckets() { + if (!this.pendingBucketDeletes.value) { + return + } + + db.writeTransaction { tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","") + ) + + // Executed once after start-up, and again when there are pending deletes. + pendingBucketDeletes.value = false + } + } + + private suspend fun clearRemoveOps() { + if (this.compactCounter < COMPACT_OPERATION_INTERVAL) { + return + } + + db.writeTransaction { tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", + listOf("clear_remove_ops", "") + ) + } + this.compactCounter = 0 + } + + @Suppress("UNUSED_PARAMETER") + override fun setTargetCheckpoint(checkpoint: Checkpoint) { + // No-op for now + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt index 43e24859..551bea61 100644 --- a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt +++ b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt @@ -26,7 +26,7 @@ public abstract class PowerSyncBackendConnector { * * These credentials may have expired already. */ - public suspend fun getCredentialsCached(): PowerSyncCredentials? { + public open suspend fun getCredentialsCached(): PowerSyncCredentials? { cachedCredentials?.let { return it } prefetchCredentials()?.join() return cachedCredentials @@ -37,7 +37,7 @@ public abstract class PowerSyncBackendConnector { * * This may be called when the current credentials have expired. */ - public fun invalidateCredentials() { + public open fun invalidateCredentials() { cachedCredentials = null } @@ -49,7 +49,7 @@ public abstract class PowerSyncBackendConnector { * * This may be called before the current credentials have expired. */ - public suspend fun prefetchCredentials(): Job? { + public open suspend fun prefetchCredentials(): Job? { fetchRequest?.takeIf { it.isActive }?.let { return it } fetchRequest = scope.launch { diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 73e50e56..0c5d8cf7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -8,6 +8,7 @@ import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.PsSqlDriver import com.powersync.bucket.BucketStorage +import com.powersync.bucket.BucketStorageImpl import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry @@ -53,7 +54,7 @@ internal class PowerSyncDatabaseImpl( driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { private val internalDb = InternalDatabaseImpl(driver, scope) - private val bucketStorage: BucketStorage = BucketStorage(internalDb, logger) + private val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) /** * The current sync status. diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index fdc0c4e1..e57af35a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -3,10 +3,10 @@ package com.powersync.sync import co.touchlab.kermit.Logger import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest -import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint import com.powersync.bucket.WriteCheckpointResponse import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.bucket.BucketStorage import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.utils.JsonUtil @@ -135,7 +135,7 @@ internal class SyncStream( private suspend fun uploadAllCrud() { var checkedCrudItem: CrudEntry? = null - + logger.i(checkedCrudItem.toString()) while (true) { status.update(uploading = true) /** @@ -145,7 +145,6 @@ internal class SyncStream( val nextCrudItem = bucketStorage.nextCrudItem() if (nextCrudItem != null) { if (nextCrudItem.clientId == checkedCrudItem?.clientId) { - // This will force a higher log level than exceptions which are caught here. logger.w( """Potentially previously uploaded CRUD entries are still present in the upload queue. Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. @@ -171,17 +170,6 @@ internal class SyncStream( status.update(uploading = false) } - private suspend fun uploadCrudBatch(): Boolean { - if (bucketStorage.hasCrud()) { - status.update(uploading = true) - uploadCrud() - return false - } else { - // This isolate is the only one triggering - return bucketStorage.updateLocalTarget { getWriteCheckpoint() } - } - } - private suspend fun getWriteCheckpoint(): String { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } @@ -388,7 +376,7 @@ internal class SyncStream( val bucketsToDelete = checkpointDiff.removedBuckets if (bucketsToDelete.isNotEmpty()) { - logger.i { "Remove buckets $bucketsToDelete" } + logger.d { "Remove buckets $bucketsToDelete" } } bucketStorage.removeBuckets(bucketsToDelete) bucketStorage.setTargetCheckpoint(state.targetCheckpoint!!) diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 583c274b..4284a1b6 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -1,5 +1,5 @@ import kotlin.test.* -import com.powersync.bucket.BucketStorage +import com.powersync.bucket.BucketStorageImpl import co.touchlab.kermit.Logger import com.powersync.bucket.BucketState import com.powersync.db.crud.CrudEntry @@ -14,7 +14,7 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest class BucketStorageTest { - private lateinit var bucketStorage: BucketStorage + private lateinit var bucketStorage: BucketStorageImpl private lateinit var mockDb: InternalDatabase @Test @@ -22,7 +22,7 @@ class BucketStorageTest { mockDb = mock() { every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) assertEquals("9223372036854775807", bucketStorage.getMaxOpId()) } @@ -36,7 +36,7 @@ class BucketStorageTest { any() )} returns "test-client-id" } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) val clientId = bucketStorage.getClientId() assertEquals("test-client-id", clientId) } @@ -51,7 +51,7 @@ class BucketStorageTest { any() )} returns null } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) assertFailsWith { bucketStorage.getClientId() @@ -65,7 +65,7 @@ class BucketStorageTest { every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(),any(), any()) } returns mockCrudEntry } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) val result = bucketStorage.nextCrudItem() @@ -78,7 +78,7 @@ class BucketStorageTest { every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(),any(), any()) } returns null } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) val result = bucketStorage.nextCrudItem() @@ -91,7 +91,7 @@ class BucketStorageTest { every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(),any(), any()) } returns 1L } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) assertTrue(bucketStorage.hasCrud()) } @@ -102,7 +102,7 @@ class BucketStorageTest { every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(),any(), any()) } returns null } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) assertFalse(bucketStorage.hasCrud()) } @@ -118,7 +118,7 @@ class BucketStorageTest { )} returns 1L everySuspend { writeTransaction(any()) } returns true } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) val result = bucketStorage.updateLocalTarget { "new-checkpoint" } assertTrue(result) @@ -136,7 +136,7 @@ class BucketStorageTest { )} returns 1L everySuspend { getAll(any(), any(), any()) } returns mockBucketStates } - bucketStorage = BucketStorage(mockDb, Logger) + bucketStorage = BucketStorageImpl(mockDb, Logger) val result = bucketStorage.getBucketStates() assertEquals(mockBucketStates, result) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt new file mode 100644 index 00000000..4dc27844 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -0,0 +1,92 @@ +package com.powersync.sync + +import kotlin.test.* +import kotlinx.serialization.json.JsonObject +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.bucket.BucketStorage +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.UpdateType +import dev.mokkery.answering.returns +import dev.mokkery.everySuspend +import dev.mokkery.mock +import dev.mokkery.resetAnswers +import dev.mokkery.verify +import kotlinx.coroutines.test.runTest + +class SyncStreamTest { + private lateinit var bucketStorage: BucketStorage + private lateinit var connector: PowerSyncBackendConnector + private lateinit var syncStream: SyncStream + private val testLogWriter = TestLogWriter( + loggable = Severity.Verbose + ) + private val logger = Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(testLogWriter) + ) + ) + + @BeforeTest + fun setup() { + bucketStorage = mock() + connector = mock() + } + + @Test + fun testInvalidateCredentials() = runTest { + connector = mock() { + everySuspend { invalidateCredentials() } returns Unit + } + + syncStream = SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = {}, + logger = logger, + params = JsonObject(emptyMap()) + ) + + syncStream.invalidateCredentials() + verify { connector.invalidateCredentials() } + } + + // TODO: Work on improving testing this without needing to test the logs are displayed + @Test + fun testTriggerCrudUploadWhenAlreadyUploading() = runTest { + val mockCrudEntry = CrudEntry(id = "1", clientId = 1, op = UpdateType.PUT, table = "table1", transactionId = 1, opData = mapOf("key" to "value")) + bucketStorage = mock() { + everySuspend { nextCrudItem() } returns mockCrudEntry + } + + syncStream = SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()) + ) + + syncStream.status.update(connected = true) + syncStream.triggerCrudUpload() + + testLogWriter.assertCount(2) + + with(testLogWriter.logs[0]) { + assertContains( + message, + "Potentially previously uploaded CRUD entries are still present in the upload queue." + ) + assertEquals(Severity.Warn, severity) + } + with(testLogWriter.logs[1]) { + assertEquals(message,"Error uploading crud") + assertEquals(Severity.Error, severity) + } + } +} \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4e75d140..d4bee08c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -46,6 +46,7 @@ androidx-test-junit = "1.2.1" [libraries] configuration-annotations = { module = "co.touchlab.skie:configuration-annotations", version.ref = "configurationAnnotations" } kermit = { module = "co.touchlab:kermit", version.ref = "kermit" } +kermit-test = { module = "co.touchlab:kermit-test", version.ref = "kermit" } powersync-sqlite-core = { module = "co.powersync:powersync-sqlite-core", version.ref = "powersync-core" } mavenPublishPlugin = { module = "com.vanniktech:gradle-maven-publish-plugin", version.ref = "maven-publish" } From 0a37f530203e30d51fb18b64b510db5ab99da2bb Mon Sep 17 00:00:00 2001 From: Dominic Date: Thu, 10 Oct 2024 15:42:49 +0200 Subject: [PATCH 7/8] chore: remove log --- core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index e57af35a..9fae433a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -135,7 +135,7 @@ internal class SyncStream( private suspend fun uploadAllCrud() { var checkedCrudItem: CrudEntry? = null - logger.i(checkedCrudItem.toString()) + while (true) { status.update(uploading = true) /** From 5fe8cf9cdc64d34addfc3167bb91f5ef1da75804 Mon Sep 17 00:00:00 2001 From: Dominic Date: Thu, 10 Oct 2024 16:33:22 +0200 Subject: [PATCH 8/8] chore: versions --- core/build.gradle.kts | 2 +- .../commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt | 5 +++-- gradle/libs.versions.toml | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 5d0f7b16..c46b0ef9 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -9,7 +9,7 @@ plugins { alias(libs.plugins.mavenPublishPlugin) alias(libs.plugins.downloadPlugin) id("com.powersync.plugins.sonatype") - id("dev.mokkery") version "2.4.0" + alias(libs.plugins.mokkery) } val sqliteVersion = "3450000" diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 4dc27844..06fc6ceb 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -13,10 +13,10 @@ import com.powersync.db.crud.UpdateType import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock -import dev.mokkery.resetAnswers import dev.mokkery.verify import kotlinx.coroutines.test.runTest +@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) class SyncStreamTest { private lateinit var bucketStorage: BucketStorage private lateinit var connector: PowerSyncBackendConnector @@ -76,7 +76,7 @@ class SyncStreamTest { syncStream.triggerCrudUpload() testLogWriter.assertCount(2) - + with(testLogWriter.logs[0]) { assertContains( message, @@ -84,6 +84,7 @@ class SyncStreamTest { ) assertEquals(Severity.Warn, severity) } + with(testLogWriter.logs[1]) { assertEquals(message,"Error uploading crud") assertEquals(Severity.Error, severity) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d4bee08c..ee24ce6b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -33,6 +33,7 @@ skie = "0.9.0-RC.5" maven-publish = "0.27.0" download-plugin = "5.5.0" grammerKit = "0.1.12" +mokkery = "2.4.0" # Sample - Android androidx-core = "1.13.1" @@ -105,6 +106,7 @@ sqldelight = { id = "app.cash.sqldelight", version.ref = "sqlDelight" } grammarKitComposer = { id = "com.alecstrong.grammar.kit.composer", version.ref = "grammerKit" } mavenPublishPlugin = { id = "com.vanniktech.maven.publish", version.ref = "maven-publish" } downloadPlugin = { id = "de.undercouch.download", version.ref = "download-plugin" } +mokkery = { id = "dev.mokkery", version.ref = "mokkery" } [bundles] sqldelight = [