diff --git a/build.gradle.kts b/build.gradle.kts index a5ae6e87..d73a7cdc 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,6 +14,15 @@ plugins { alias(libs.plugins.downloadPlugin) apply false } +// Having different versions of this lead to the issue mentioned here +// https://stackoverflow.com/questions/76479563/could-not-found-kotlinx-atomicfu-compose-multiplatform-ios +// This and the `apply(plugin = "kotlinx-atomicfu")` in allprojects below solve the issue but can be deleted in future when +// the issue is resolved https://github.com/Kotlin/kotlinx-atomicfu/issues/469 +buildscript { + dependencies { + classpath("org.jetbrains.kotlinx:atomicfu-gradle-plugin:0.23.1") + } +} allprojects { repositories { @@ -44,6 +53,8 @@ allprojects { exclude(group = "ai.grazie.nlp") } + // + apply(plugin = "kotlinx-atomicfu") } subprojects { val GROUP: String by project diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 78fbb70e..c46b0ef9 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -9,6 +9,7 @@ plugins { alias(libs.plugins.mavenPublishPlugin) alias(libs.plugins.downloadPlugin) id("com.powersync.plugins.sonatype") + alias(libs.plugins.mokkery) } val sqliteVersion = "3450000" @@ -115,6 +116,8 @@ kotlin { commonTest.dependencies { implementation(libs.kotlin.test) + implementation(libs.test.coroutines) + implementation(libs.kermit.test) } } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index fac87d87..7f55d180 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,298 +1,19 @@ package com.powersync.bucket -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull -import co.touchlab.kermit.Logger -import com.powersync.db.internal.PsInternalDatabase +import com.powersync.db.crud.CrudEntry import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult -import co.touchlab.stately.concurrency.AtomicBoolean -import kotlinx.serialization.encodeToString -import com.powersync.db.internal.InternalTable -import com.powersync.utils.JsonUtil -internal class BucketStorage( - private val db: PsInternalDatabase, - private val logger: Logger -) { - private val tableNames: MutableSet = mutableSetOf() - private var hasCompletedSync = AtomicBoolean(false) - private var pendingBucketDeletes = AtomicBoolean(false) - - /** - * Count up, and do a compact on startup. - */ - private var compactCounter = COMPACT_OPERATION_INTERVAL - - companion object { - const val MAX_OP_ID = "9223372036854775807" - const val COMPACT_OPERATION_INTERVAL = 1_000 - } - - init { - readTableNames() - } - - private fun readTableNames() { - tableNames.clear() - // Query to get existing table names - val names = db.getExistingTableNames("ps_data_*") - - tableNames.addAll(names) - } - - fun getMaxOpId(): String { - return MAX_OP_ID - } - - suspend fun getClientId(): String { - val id = db.getOptional("SELECT powersync_client_id() as client_id") { - it.getString(0)!! - } - return id ?: throw IllegalStateException("Client ID not found") - } - - suspend fun hasCrud(): Boolean { - return db.queries.hasCrud().awaitAsOneOrNull() == 1L - } - - suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { - db.getOptional( - "SELECT target_op FROM ${InternalTable.BUCKETS} WHERE name = '\$local' AND target_op = ?", - parameters = listOf(MAX_OP_ID), - mapper = { cursor -> cursor.getLong(0)!! } - ) - ?: // Nothing to update - return false - - val seqBefore = - db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { - it.getLong(0)!! - } ?: // Nothing to update - return false - - val opId = checkpointCallback() - - logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } - - return db.writeTransaction { - if (hasCrud()) { - logger.w { "[updateLocalTarget] ps crud is not empty" } - return@writeTransaction false - } - - val seqAfter = - db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { - it.getLong(0)!! - } - ?: // assert isNotEmpty - throw AssertionError("Sqlite Sequence should not be empty") - - if (seqAfter != seqBefore) { - logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") - // New crud data may have been uploaded since we got the checkpoint. Abort. - return@writeTransaction false - } - - db.execute( - "UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'", - listOf(opId) - ) - - return@writeTransaction true - } - } - - suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { - db.writeTransaction { tx -> - val jsonString = JsonUtil.json.encodeToString(syncDataBatch) - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("save", jsonString) - ) - } - this.compactCounter += syncDataBatch.buckets.sumOf { it.data.size } - } - - suspend fun getBucketStates(): List { - return db.getAll( - "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", - mapper = { cursor -> - BucketState( - bucket = cursor.getString(0)!!, - opId = cursor.getString(1)!! - ) - }) - } - - suspend fun removeBuckets(bucketsToDelete: List) { - bucketsToDelete.forEach { bucketName -> - deleteBucket(bucketName) - } - } - - - private suspend fun deleteBucket(bucketName: String) { - - db.writeTransaction{ tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("delete_bucket", bucketName) - ) - } - - Logger.d("[deleteBucket] Done deleting") - - this.pendingBucketDeletes.value = true - } - - suspend fun hasCompletedSync(): Boolean { - if (hasCompletedSync.value) { - return true - } - - val completedSync = db.getOptional( - "SELECT powersync_last_synced_at()", - mapper = { cursor -> - cursor.getString(0)!! - }) - - return if (completedSync != null) { - hasCompletedSync.value = true - true - } else { - false - } - } - - suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult { - val result = validateChecksums(targetCheckpoint) - - if (!result.checkpointValid) { - logger.w { "[SyncLocalDatabase] Checksums failed for ${result.checkpointFailures}" } - result.checkpointFailures?.forEach { bucketName -> - deleteBucket(bucketName) - } - result.ready = false - return result - } - - val bucketNames = targetCheckpoint.checksums.map { it.bucket } - - db.writeTransaction { tx -> - tx.execute( - "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", - listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)) - ) - - if (targetCheckpoint.writeCheckpoint != null) { - tx.execute( - "UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'", - listOf(targetCheckpoint.writeCheckpoint), - ) - } - } - - val valid = updateObjectsFromBuckets() - - if (!valid) { - return SyncLocalDatabaseResult( - ready = false, - checkpointValid = true, - ) - } - - this.forceCompact() - - return SyncLocalDatabaseResult( - ready = true, - ) - } - - private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { - val res = db.getOptional( - "SELECT powersync_validate_checkpoint(?) AS result", - parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), - mapper = { cursor -> - cursor.getString(0)!! - }) - ?: //no result - return SyncLocalDatabaseResult( - ready = false, - checkpointValid = false, - ) - - return JsonUtil.json.decodeFromString(res) - } - - /** - * Atomically update the local state. - * - * This includes creating new tables, dropping old tables, and copying data over from the oplog. - */ - private suspend fun updateObjectsFromBuckets(): Boolean { - return db.writeTransaction { tx -> - - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("sync_local", "") - ) - - val res = tx.get("select last_insert_rowid()") { cursor -> - cursor.getLong(0)!! - } - - return@writeTransaction res == 1L - } - } - - private suspend fun forceCompact() { - // Reset counter - this.compactCounter = COMPACT_OPERATION_INTERVAL - this.pendingBucketDeletes.value = true - - this.autoCompact() - } - - - private suspend fun autoCompact() { - // 1. Delete buckets - deletePendingBuckets() - - // 2. Clear REMOVE operations, only keeping PUT ones - clearRemoveOps() - } - - private suspend fun deletePendingBuckets() { - if (!this.pendingBucketDeletes.value) { - return - } - - db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","") - ) - - // Executed once after start-up, and again when there are pending deletes. - pendingBucketDeletes.value = false - } - } - - private suspend fun clearRemoveOps() { - if (this.compactCounter < COMPACT_OPERATION_INTERVAL) { - return - } - - db.writeTransaction { tx -> - tx.execute( - "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", - listOf("clear_remove_ops", "") - ) - } - this.compactCounter = 0 - } - - @Suppress("UNUSED_PARAMETER") - fun setTargetCheckpoint(checkpoint: Checkpoint) { - // No-op for now - } -} +internal interface BucketStorage { + fun getMaxOpId(): String + suspend fun getClientId(): String + suspend fun nextCrudItem(): CrudEntry? + suspend fun hasCrud(): Boolean + suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean + suspend fun saveSyncData(syncDataBatch: SyncDataBatch) + suspend fun getBucketStates(): List + suspend fun removeBuckets(bucketsToDelete: List) + suspend fun hasCompletedSync(): Boolean + suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult + fun setTargetCheckpoint(checkpoint: Checkpoint) +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt new file mode 100644 index 00000000..3d528dfb --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -0,0 +1,315 @@ +package com.powersync.bucket + +import co.touchlab.kermit.Logger +import com.powersync.sync.SyncDataBatch +import com.powersync.sync.SyncLocalDatabaseResult +import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudRow +import com.powersync.db.internal.InternalDatabase +import kotlinx.serialization.encodeToString +import com.powersync.db.internal.InternalTable +import com.powersync.utils.JsonUtil + +internal class BucketStorageImpl( + private val db: InternalDatabase, + private val logger: Logger +): BucketStorage { + private val tableNames: MutableSet = mutableSetOf() + private var hasCompletedSync = AtomicBoolean(false) + private var pendingBucketDeletes = AtomicBoolean(false) + + /** + * Count up, and do a compact on startup. + */ + private var compactCounter = COMPACT_OPERATION_INTERVAL + + companion object { + const val MAX_OP_ID = "9223372036854775807" + const val COMPACT_OPERATION_INTERVAL = 1_000 + } + + init { + readTableNames() + } + + private fun readTableNames() { + tableNames.clear() + // Query to get existing table names + val names = db.getExistingTableNames("ps_data_*") + + tableNames.addAll(names) + } + + override fun getMaxOpId(): String { + return MAX_OP_ID + } + + override suspend fun getClientId(): String { + val id = db.getOptional("SELECT powersync_client_id() as client_id") { + it.getString(0)!! + } + return id ?: throw IllegalStateException("Client ID not found") + } + + override suspend fun nextCrudItem(): CrudEntry? { + val crudItem = db.getOptional("SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1") { cursor -> + CrudEntry.fromRow( + CrudRow( + id = cursor.getString(0)!!, + txId = cursor.getString(1)?.toInt(), + data = cursor.getString(2)!! + ) + ) + } + + return crudItem + } + + override suspend fun hasCrud(): Boolean { + val res = db.getOptional("SELECT 1 FROM ps_crud LIMIT 1") { + it.getLong(0)!! + } + return res == 1L + } + + override suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { + db.getOptional( + "SELECT target_op FROM ${InternalTable.BUCKETS} WHERE name = '\$local' AND target_op = ?", + parameters = listOf(MAX_OP_ID), + mapper = { cursor -> cursor.getLong(0)!! } + ) + ?: // Nothing to update + return false + + val seqBefore = + db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { + it.getLong(0)!! + } ?: // Nothing to update + return false + + val opId = checkpointCallback() + + logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } + + return db.writeTransaction { + if (hasCrud()) { + logger.w { "[updateLocalTarget] ps crud is not empty" } + return@writeTransaction false + } + + val seqAfter = + db.getOptional("SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable.CRUD}'") { + it.getLong(0)!! + } + ?: // assert isNotEmpty + throw AssertionError("Sqlite Sequence should not be empty") + + if (seqAfter != seqBefore) { + logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") + // New crud data may have been uploaded since we got the checkpoint. Abort. + return@writeTransaction false + } + + db.execute( + "UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'", + listOf(opId) + ) + + return@writeTransaction true + } + } + + override suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { + db.writeTransaction { tx -> + val jsonString = JsonUtil.json.encodeToString(syncDataBatch) + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("save", jsonString) + ) + } + this.compactCounter += syncDataBatch.buckets.sumOf { it.data.size } + } + + override suspend fun getBucketStates(): List { + return db.getAll( + "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", + mapper = { cursor -> + BucketState( + bucket = cursor.getString(0)!!, + opId = cursor.getString(1)!! + ) + }) + } + + override suspend fun removeBuckets(bucketsToDelete: List) { + bucketsToDelete.forEach { bucketName -> + deleteBucket(bucketName) + } + } + + private suspend fun deleteBucket(bucketName: String) { + + db.writeTransaction{ tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("delete_bucket", bucketName) + ) + } + + Logger.d("[deleteBucket] Done deleting") + + this.pendingBucketDeletes.value = true + } + + override suspend fun hasCompletedSync(): Boolean { + if (hasCompletedSync.value) { + return true + } + + val completedSync = db.getOptional( + "SELECT powersync_last_synced_at()", + mapper = { cursor -> + cursor.getString(0)!! + }) + + return if (completedSync != null) { + hasCompletedSync.value = true + true + } else { + false + } + } + + override suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult { + val result = validateChecksums(targetCheckpoint) + + if (!result.checkpointValid) { + logger.w { "[SyncLocalDatabase] Checksums failed for ${result.checkpointFailures}" } + result.checkpointFailures?.forEach { bucketName -> + deleteBucket(bucketName) + } + result.ready = false + return result + } + + val bucketNames = targetCheckpoint.checksums.map { it.bucket } + + db.writeTransaction { tx -> + tx.execute( + "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", + listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)) + ) + + if (targetCheckpoint.writeCheckpoint != null) { + tx.execute( + "UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'", + listOf(targetCheckpoint.writeCheckpoint), + ) + } + } + + val valid = updateObjectsFromBuckets() + + if (!valid) { + return SyncLocalDatabaseResult( + ready = false, + checkpointValid = true, + ) + } + + this.forceCompact() + + return SyncLocalDatabaseResult( + ready = true, + ) + } + + private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { + val res = db.getOptional( + "SELECT powersync_validate_checkpoint(?) AS result", + parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), + mapper = { cursor -> + cursor.getString(0)!! + }) + ?: //no result + return SyncLocalDatabaseResult( + ready = false, + checkpointValid = false, + ) + + return JsonUtil.json.decodeFromString(res) + } + + /** + * Atomically update the local state. + * + * This includes creating new tables, dropping old tables, and copying data over from the oplog. + */ + private suspend fun updateObjectsFromBuckets(): Boolean { + return db.writeTransaction { tx -> + + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", + listOf("sync_local", "") + ) + + val res = tx.get("select last_insert_rowid()") { cursor -> + cursor.getLong(0)!! + } + + return@writeTransaction res == 1L + } + } + + private suspend fun forceCompact() { + // Reset counter + this.compactCounter = COMPACT_OPERATION_INTERVAL + this.pendingBucketDeletes.value = true + + this.autoCompact() + } + + + private suspend fun autoCompact() { + // 1. Delete buckets + deletePendingBuckets() + + // 2. Clear REMOVE operations, only keeping PUT ones + clearRemoveOps() + } + + private suspend fun deletePendingBuckets() { + if (!this.pendingBucketDeletes.value) { + return + } + + db.writeTransaction { tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","") + ) + + // Executed once after start-up, and again when there are pending deletes. + pendingBucketDeletes.value = false + } + } + + private suspend fun clearRemoveOps() { + if (this.compactCounter < COMPACT_OPERATION_INTERVAL) { + return + } + + db.writeTransaction { tx -> + tx.execute( + "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", + listOf("clear_remove_ops", "") + ) + } + this.compactCounter = 0 + } + + @Suppress("UNUSED_PARAMETER") + override fun setTargetCheckpoint(checkpoint: Checkpoint) { + // No-op for now + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt index 43e24859..551bea61 100644 --- a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt +++ b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt @@ -26,7 +26,7 @@ public abstract class PowerSyncBackendConnector { * * These credentials may have expired already. */ - public suspend fun getCredentialsCached(): PowerSyncCredentials? { + public open suspend fun getCredentialsCached(): PowerSyncCredentials? { cachedCredentials?.let { return it } prefetchCredentials()?.join() return cachedCredentials @@ -37,7 +37,7 @@ public abstract class PowerSyncBackendConnector { * * This may be called when the current credentials have expired. */ - public fun invalidateCredentials() { + public open fun invalidateCredentials() { cachedCredentials = null } @@ -49,7 +49,7 @@ public abstract class PowerSyncBackendConnector { * * This may be called before the current credentials have expired. */ - public suspend fun prefetchCredentials(): Job? { + public open suspend fun prefetchCredentials(): Job? { fetchRequest?.takeIf { it.isActive }?.let { return it } fetchRequest = scope.launch { diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index aa567bfc..c1854035 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -2,19 +2,19 @@ package com.powersync.db import app.cash.sqldelight.async.coroutines.awaitAsList import app.cash.sqldelight.async.coroutines.awaitAsOne -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import app.cash.sqldelight.db.SqlCursor import co.touchlab.kermit.Logger import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.PsSqlDriver import com.powersync.bucket.BucketStorage +import com.powersync.bucket.BucketStorageImpl import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.CrudRow import com.powersync.db.crud.CrudTransaction -import com.powersync.db.internal.PsInternalDatabase +import com.powersync.db.internal.InternalDatabaseImpl import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.Schema @@ -53,8 +53,8 @@ internal class PowerSyncDatabaseImpl( val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { - private val internalDb = PsInternalDatabase(driver, scope) - private val bucketStorage: BucketStorage = BucketStorage(internalDb, logger) + private val internalDb = InternalDatabaseImpl(driver, scope) + private val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) /** * The current sync status. @@ -73,6 +73,7 @@ internal class PowerSyncDatabaseImpl( logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" } + internalDb.get("SELECT powersync_init()") {} applySchema() updateHasSynced() } @@ -164,23 +165,15 @@ internal class PowerSyncDatabaseImpl( override suspend fun getNextCrudTransaction(): CrudTransaction? { return this.readTransaction { - val firstEntry = internalDb.queries.getCrudFirstEntry().awaitAsOneOrNull() + val entry = bucketStorage.nextCrudItem() ?: return@readTransaction null - val first = CrudEntry.fromRow( - CrudRow( - id = firstEntry.id.toString(), - data = firstEntry.data_!!, - txId = firstEntry.tx_id?.toInt() - ) - ) - val txId = first.transactionId - val entries: List - if (txId == null) { - entries = listOf(first) + val txId = entry.transactionId + val entries: List = if (txId == null) { + listOf(entry) } else { - entries = internalDb.queries.getCrudEntryByTxId(txId.toLong()).awaitAsList().map { + internalDb.queries.getCrudEntryByTxId(txId.toLong()).awaitAsList().map { CrudEntry.fromRow( CrudRow( id = it.id.toString(), diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 533b223e..5796e12f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -1,273 +1,16 @@ package com.powersync.db.internal -import app.cash.sqldelight.ExecutableQuery -import app.cash.sqldelight.Query -import app.cash.sqldelight.async.coroutines.awaitAsList -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull -import app.cash.sqldelight.coroutines.asFlow -import app.cash.sqldelight.coroutines.mapToList -import app.cash.sqldelight.db.QueryResult -import app.cash.sqldelight.db.SqlCursor -import app.cash.sqldelight.db.SqlPreparedStatement -import com.powersync.PowerSyncTransaction +import com.persistence.PowersyncQueries import com.powersync.PsSqlDriver import com.powersync.db.Queries import com.powersync.persistence.PsDatabase -import com.powersync.utils.JsonUtil -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.serialization.encodeToString +internal interface InternalDatabase : Queries { + val driver: PsSqlDriver + val transactor: PsDatabase + val queries: PowersyncQueries -@OptIn(FlowPreview::class) -internal class PsInternalDatabase(val driver: PsSqlDriver, private val scope: CoroutineScope) : - Queries { - - val transactor: PsDatabase = PsDatabase(driver) - val queries = transactor.powersyncQueries - - companion object { - const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" - const val DEFAULT_WATCH_THROTTLE_MS = 30L - } - - init { - scope.launch { - val accumulatedUpdates = mutableSetOf() - tableUpdates() -// Debounce will discard any events which occur inside the debounce window -// This will accumulate those table updates - .onEach { tables -> accumulatedUpdates.addAll(tables) } - .debounce(DEFAULT_WATCH_THROTTLE_MS) - .collect { - val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } - driver.notifyListeners(queryKeys = dataTables.toTypedArray()) - accumulatedUpdates.clear() - } - } - } - - override suspend fun execute( - sql: String, - parameters: List? - ): Long { - val numParams = parameters?.size ?: 0 - - return driver.execute( - identifier = null, - sql = sql, - parameters = numParams, - binders = getBindersFromParams(parameters) - ).await() - } - - override suspend fun get( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): RowType { - val result = this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsOneOrNull() - return requireNotNull(result) { "Query returned no result" } - } - - override suspend fun getAll( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): List { - return this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsList() - } - - override suspend fun getOptional( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): RowType? { - return this.createQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper - ).awaitAsOneOrNull() - } - - override fun watch( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType - ): Flow> { - - val tables = getSourceTables(sql, parameters).map { toFriendlyTableName(it) } - .filter { it.isNotBlank() }.toSet() - return watchQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper, - tables = tables - ).asFlow().mapToList(scope.coroutineContext) - } - - private fun createQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - ): ExecutableQuery { - return object : ExecutableQuery(mapper) { - override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { - return driver.executeQuery(null, query, mapper, parameters, binders) - } - } - } - - private fun watchQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - tables: Set = setOf() - ): Query { - - return object : Query(mapper) { - override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { - return driver.executeQuery(null, query, mapper, parameters, binders) - } - - override fun addListener(listener: Listener) { - driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) - } - - override fun removeListener(listener: Listener) { - driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) - } - } - } - - override suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R { - return transactor.transactionWithResult(noEnclosing = true) { - val transaction = PowerSyncTransaction(this@PsInternalDatabase) - callback(transaction) - } - } - - override suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R { - return transactor.transactionWithResult(noEnclosing = true) { - val transaction = PowerSyncTransaction(this@PsInternalDatabase) - callback(transaction) - } - } - - // Register callback for table updates - private fun tableUpdates(): Flow> { - return driver.tableUpdates() - } - - // Register callback for table updates on a specific table - fun updatesOnTable(tableName: String): Flow { - return driver.updatesOnTable(tableName) - } - - private fun toFriendlyTableName(tableName: String): String { - val regex = POWERSYNC_TABLE_MATCH.toRegex() - if (regex.containsMatchIn(tableName)) { - return tableName.replace(regex, "") - } - return tableName - } - - private fun getSourceTables( - sql: String, - parameters: List?, - ): Set { - val rows = createQuery( - query = "EXPLAIN $sql", - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = { - ExplainQueryResult( - addr = it.getString(0)!!, - opcode = it.getString(1)!!, - p1 = it.getLong(2)!!, - p2 = it.getLong(3)!!, - p3 = it.getLong(4)!! - ) - } - ).executeAsList() - - val rootPages = mutableListOf() - for (row in rows) { - if ((row.opcode == "OpenRead" || row.opcode == "OpenWrite") && row.p3 == 0L && row.p2 != 0L) { - rootPages.add(row.p2) - } - } - val params = listOf(JsonUtil.json.encodeToString(rootPages)) - val tableRows = createQuery( - "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - parameters = params.size, - binders = { - bindString(0, params[0]) - }, mapper = { it.getString(0)!! } - ).executeAsList() - - return tableRows.toSet() - } - - fun getExistingTableNames(tableGlob: String): List { - val existingTableNames = createQuery( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", - parameters = 1, - binders = { - bindString(0, tableGlob) - }, - mapper = { cursor -> - cursor.getString(0)!! - } - ).executeAsList() - return existingTableNames - } - - internal data class ExplainQueryResult( - val addr: String, - val opcode: String, - val p1: Long, - val p2: Long, - val p3: Long, - ) -} - -internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatement.() -> Unit)? { - if (parameters.isNullOrEmpty()) { - return null - } - return { - parameters.forEachIndexed { index, parameter -> - when (parameter) { - is Boolean -> bindBoolean(index, parameter) - is String -> bindString(index, parameter) - is Long -> bindLong(index, parameter) - is Double -> bindDouble(index, parameter) - is ByteArray -> bindBytes(index, parameter) - else -> { - if(parameter != null) { - throw IllegalArgumentException("Unsupported parameter type: ${parameter::class}, at index $index") - } - } - } - } - } + fun getExistingTableNames(tableGlob: String): List + fun updatesOnTable(tableName: String): Flow } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt new file mode 100644 index 00000000..a4a967fc --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -0,0 +1,272 @@ +package com.powersync.db.internal + +import app.cash.sqldelight.ExecutableQuery +import app.cash.sqldelight.Query +import app.cash.sqldelight.async.coroutines.awaitAsList +import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull +import app.cash.sqldelight.coroutines.asFlow +import app.cash.sqldelight.coroutines.mapToList +import app.cash.sqldelight.db.QueryResult +import app.cash.sqldelight.db.SqlCursor +import app.cash.sqldelight.db.SqlPreparedStatement +import com.powersync.PowerSyncTransaction +import com.powersync.PsSqlDriver +import com.powersync.persistence.PsDatabase +import com.powersync.utils.JsonUtil +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.serialization.encodeToString + + +@OptIn(FlowPreview::class) +internal class InternalDatabaseImpl(override val driver: PsSqlDriver, private val scope: CoroutineScope): + InternalDatabase { + + override val transactor: PsDatabase = PsDatabase(driver) + override val queries = transactor.powersyncQueries + + companion object { + const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" + const val DEFAULT_WATCH_THROTTLE_MS = 30L + } + + init { + scope.launch { + val accumulatedUpdates = mutableSetOf() + tableUpdates() +// Debounce will discard any events which occur inside the debounce window +// This will accumulate those table updates + .onEach { tables -> accumulatedUpdates.addAll(tables) } + .debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { + val dataTables = accumulatedUpdates.map { toFriendlyTableName(it) }.filter { it.isNotBlank() } + driver.notifyListeners(queryKeys = dataTables.toTypedArray()) + accumulatedUpdates.clear() + } + } + } + + override suspend fun execute( + sql: String, + parameters: List? + ): Long { + val numParams = parameters?.size ?: 0 + + return driver.execute( + identifier = null, + sql = sql, + parameters = numParams, + binders = getBindersFromParams(parameters) + ).await() + } + + override suspend fun get( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): RowType { + val result = this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsOneOrNull() + return requireNotNull(result) { "Query returned no result" } + } + + override suspend fun getAll( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): List { + return this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsList() + } + + override suspend fun getOptional( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): RowType? { + return this.createQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper + ).awaitAsOneOrNull() + } + + override fun watch( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType + ): Flow> { + + val tables = getSourceTables(sql, parameters).map { toFriendlyTableName(it) } + .filter { it.isNotBlank() }.toSet() + return watchQuery( + query = sql, + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = mapper, + tables = tables + ).asFlow().mapToList(scope.coroutineContext) + } + + private fun createQuery( + query: String, + mapper: (SqlCursor) -> T, + parameters: Int = 0, + binders: (SqlPreparedStatement.() -> Unit)? = null, + ): ExecutableQuery { + return object : ExecutableQuery(mapper) { + override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { + return driver.executeQuery(null, query, mapper, parameters, binders) + } + } + } + + private fun watchQuery( + query: String, + mapper: (SqlCursor) -> T, + parameters: Int = 0, + binders: (SqlPreparedStatement.() -> Unit)? = null, + tables: Set = setOf() + ): Query { + + return object : Query(mapper) { + override fun execute(mapper: (SqlCursor) -> QueryResult): QueryResult { + return driver.executeQuery(null, query, mapper, parameters, binders) + } + + override fun addListener(listener: Listener) { + driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) + } + + override fun removeListener(listener: Listener) { + driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) + } + } + } + + override suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R { + return transactor.transactionWithResult(noEnclosing = true) { + val transaction = PowerSyncTransaction(this@InternalDatabaseImpl) + callback(transaction) + } + } + + override suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R { + return transactor.transactionWithResult(noEnclosing = true) { + val transaction = PowerSyncTransaction(this@InternalDatabaseImpl) + callback(transaction) + } + } + + // Register callback for table updates + private fun tableUpdates(): Flow> { + return driver.tableUpdates() + } + + // Register callback for table updates on a specific table + override fun updatesOnTable(tableName: String): Flow { + return driver.updatesOnTable(tableName) + } + + private fun toFriendlyTableName(tableName: String): String { + val regex = POWERSYNC_TABLE_MATCH.toRegex() + if (regex.containsMatchIn(tableName)) { + return tableName.replace(regex, "") + } + return tableName + } + + private fun getSourceTables( + sql: String, + parameters: List?, + ): Set { + val rows = createQuery( + query = "EXPLAIN $sql", + parameters = parameters?.size ?: 0, + binders = getBindersFromParams(parameters), + mapper = { + ExplainQueryResult( + addr = it.getString(0)!!, + opcode = it.getString(1)!!, + p1 = it.getLong(2)!!, + p2 = it.getLong(3)!!, + p3 = it.getLong(4)!! + ) + } + ).executeAsList() + + val rootPages = mutableListOf() + for (row in rows) { + if ((row.opcode == "OpenRead" || row.opcode == "OpenWrite") && row.p3 == 0L && row.p2 != 0L) { + rootPages.add(row.p2) + } + } + val params = listOf(JsonUtil.json.encodeToString(rootPages)) + val tableRows = createQuery( + "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", + parameters = params.size, + binders = { + bindString(0, params[0]) + }, mapper = { it.getString(0)!! } + ).executeAsList() + + return tableRows.toSet() + } + + override fun getExistingTableNames(tableGlob: String): List { + val existingTableNames = createQuery( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", + parameters = 1, + binders = { + bindString(0, tableGlob) + }, + mapper = { cursor -> + cursor.getString(0)!! + } + ).executeAsList() + return existingTableNames + } + + internal data class ExplainQueryResult( + val addr: String, + val opcode: String, + val p1: Long, + val p2: Long, + val p3: Long, + ) +} + +internal fun getBindersFromParams(parameters: List?): (SqlPreparedStatement.() -> Unit)? { + if (parameters.isNullOrEmpty()) { + return null + } + return { + parameters.forEachIndexed { index, parameter -> + when (parameter) { + is Boolean -> bindBoolean(index, parameter) + is String -> bindString(index, parameter) + is Long -> bindLong(index, parameter) + is Double -> bindDouble(index, parameter) + is ByteArray -> bindBytes(index, parameter) + else -> { + if(parameter != null) { + throw IllegalArgumentException("Unsupported parameter type: ${parameter::class}, at index $index") + } + } + } + } + } +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt index e562cbf2..8d2e11b9 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransactionFactory.kt @@ -2,10 +2,10 @@ package com.powersync import app.cash.sqldelight.db.SqlCursor import com.powersync.db.internal.PowerSyncTransaction -import com.powersync.db.internal.PsInternalDatabase +import com.powersync.db.internal.InternalDatabaseImpl internal fun PowerSyncTransaction( - internalDatabase: PsInternalDatabase, + internalDatabase: InternalDatabaseImpl, ): PowerSyncTransaction { val transaction = object : PowerSyncTransaction { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index b3a126fd..9fae433a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -3,14 +3,13 @@ package com.powersync.sync import co.touchlab.kermit.Logger import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest -import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint import com.powersync.bucket.WriteCheckpointResponse import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.bucket.BucketStorage import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.utils.JsonParam +import com.powersync.db.crud.CrudEntry import com.powersync.utils.JsonUtil -import com.powersync.utils.toJsonObject import io.ktor.client.HttpClient import io.ktor.client.call.body import io.ktor.client.plugins.HttpTimeout @@ -135,14 +134,30 @@ internal class SyncStream( } private suspend fun uploadAllCrud() { + var checkedCrudItem: CrudEntry? = null + while (true) { + status.update(uploading = true) + /** + * This is the first item in the FIFO CRUD queue. + */ try { - val done = uploadCrudBatch() - if (status.uploadError == true) { - status.update(clearUploadError = true) - } - - if (done) { + val nextCrudItem = bucketStorage.nextCrudItem() + if (nextCrudItem != null) { + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { + logger.w( + """Potentially previously uploaded CRUD entries are still present in the upload queue. + Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. + The next upload iteration will be delayed.""" + ) + throw Exception("Delaying due to previously encountered CRUD item.") + } + + checkedCrudItem = nextCrudItem + uploadCrud() + } else { + // Uploading is completed + bucketStorage.updateLocalTarget { getWriteCheckpoint() } break } } catch (e: Exception) { @@ -155,17 +170,6 @@ internal class SyncStream( status.update(uploading = false) } - private suspend fun uploadCrudBatch(): Boolean { - if (bucketStorage.hasCrud()) { - status.update(uploading = true) - uploadCrud() - return false - } else { - // This isolate is the only one triggering - return bucketStorage.updateLocalTarget { getWriteCheckpoint() } - } - } - private suspend fun getWriteCheckpoint(): String { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } @@ -372,7 +376,7 @@ internal class SyncStream( val bucketsToDelete = checkpointDiff.removedBuckets if (bucketsToDelete.isNotEmpty()) { - logger.i { "Remove buckets $bucketsToDelete" } + logger.d { "Remove buckets $bucketsToDelete" } } bucketStorage.removeBuckets(bucketsToDelete) bucketStorage.setTargetCheckpoint(state.targetCheckpoint!!) diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt new file mode 100644 index 00000000..4284a1b6 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -0,0 +1,147 @@ +import kotlin.test.* +import com.powersync.bucket.BucketStorageImpl +import co.touchlab.kermit.Logger +import com.powersync.bucket.BucketState +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.UpdateType +import com.powersync.db.internal.InternalDatabase +import dev.mokkery.answering.returns +import dev.mokkery.every +import dev.mokkery.everySuspend +import dev.mokkery.matcher.any +import dev.mokkery.mock +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest + +class BucketStorageTest { + private lateinit var bucketStorage: BucketStorageImpl + private lateinit var mockDb: InternalDatabase + + @Test + fun testGetMaxOpId() { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + assertEquals("9223372036854775807", bucketStorage.getMaxOpId()) + } + + @Test + fun testGetClientId() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns "test-client-id" + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + val clientId = bucketStorage.getClientId() + assertEquals("test-client-id", clientId) + } + + @Test + fun testGetClientIdThrowsException() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns null + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + assertFailsWith { + bucketStorage.getClientId() + } + } + + @Test + fun testNextCrudItem() = runTest { + val mockCrudEntry = CrudEntry(id = "1", clientId = 1, op = UpdateType.PUT, table = "table1", transactionId = 1, opData = mapOf("key" to "value")) + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns mockCrudEntry + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + + val result = bucketStorage.nextCrudItem() + assertEquals(mockCrudEntry, result) + } + + @Test + fun testNullNextCrudItem() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns null + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + + val result = bucketStorage.nextCrudItem() + assertEquals(null, result) + } + + @Test + fun testHasCrud() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns 1L + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + assertTrue(bucketStorage.hasCrud()) + } + + @Test + fun testNullHasCrud() = runTest { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional(any(),any(), any()) } returns null + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + assertFalse(bucketStorage.hasCrud()) + } + + @Test + fun testUpdateLocalTarget() = runBlocking { + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns 1L + everySuspend { writeTransaction(any()) } returns true + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + val result = bucketStorage.updateLocalTarget { "new-checkpoint" } + assertTrue(result) + } + + @Test + fun testGetBucketStates() = runTest { + val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) + mockDb = mock() { + every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") + everySuspend { getOptional( + any(), + any(), + any() + )} returns 1L + everySuspend { getAll(any(), any(), any()) } returns mockBucketStates + } + bucketStorage = BucketStorageImpl(mockDb, Logger) + + val result = bucketStorage.getBucketStates() + assertEquals(mockBucketStates, result) + } + + // TODO: Add tests for removeBuckets, hasCompletedSync, syncLocalDatabase currently not covered because + // currently the internal methods are private and cannot be accessed from the test class +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt new file mode 100644 index 00000000..06fc6ceb --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -0,0 +1,93 @@ +package com.powersync.sync + +import kotlin.test.* +import kotlinx.serialization.json.JsonObject +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.bucket.BucketStorage +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.UpdateType +import dev.mokkery.answering.returns +import dev.mokkery.everySuspend +import dev.mokkery.mock +import dev.mokkery.verify +import kotlinx.coroutines.test.runTest + +@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +class SyncStreamTest { + private lateinit var bucketStorage: BucketStorage + private lateinit var connector: PowerSyncBackendConnector + private lateinit var syncStream: SyncStream + private val testLogWriter = TestLogWriter( + loggable = Severity.Verbose + ) + private val logger = Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(testLogWriter) + ) + ) + + @BeforeTest + fun setup() { + bucketStorage = mock() + connector = mock() + } + + @Test + fun testInvalidateCredentials() = runTest { + connector = mock() { + everySuspend { invalidateCredentials() } returns Unit + } + + syncStream = SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = {}, + logger = logger, + params = JsonObject(emptyMap()) + ) + + syncStream.invalidateCredentials() + verify { connector.invalidateCredentials() } + } + + // TODO: Work on improving testing this without needing to test the logs are displayed + @Test + fun testTriggerCrudUploadWhenAlreadyUploading() = runTest { + val mockCrudEntry = CrudEntry(id = "1", clientId = 1, op = UpdateType.PUT, table = "table1", transactionId = 1, opData = mapOf("key" to "value")) + bucketStorage = mock() { + everySuspend { nextCrudItem() } returns mockCrudEntry + } + + syncStream = SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()) + ) + + syncStream.status.update(connected = true) + syncStream.triggerCrudUpload() + + testLogWriter.assertCount(2) + + with(testLogWriter.logs[0]) { + assertContains( + message, + "Potentially previously uploaded CRUD entries are still present in the upload queue." + ) + assertEquals(Severity.Warn, severity) + } + + with(testLogWriter.logs[1]) { + assertEquals(message,"Error uploading crud") + assertEquals(Severity.Error, severity) + } + } +} \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0211b2ff..c065d468 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -33,6 +33,7 @@ skie = "0.9.0-RC.5" maven-publish = "0.27.0" download-plugin = "5.5.0" grammerKit = "0.1.12" +mokkery = "2.4.0" # Sample - Android androidx-core = "1.13.1" @@ -46,6 +47,7 @@ androidx-test-junit = "1.2.1" [libraries] configuration-annotations = { module = "co.touchlab.skie:configuration-annotations", version.ref = "configurationAnnotations" } kermit = { module = "co.touchlab:kermit", version.ref = "kermit" } +kermit-test = { module = "co.touchlab:kermit-test", version.ref = "kermit" } powersync-sqlite-core = { module = "co.powersync:powersync-sqlite-core", version.ref = "powersync-core" } mavenPublishPlugin = { module = "com.vanniktech:gradle-maven-publish-plugin", version.ref = "maven-publish" } @@ -104,6 +106,7 @@ sqldelight = { id = "app.cash.sqldelight", version.ref = "sqlDelight" } grammarKitComposer = { id = "com.alecstrong.grammar.kit.composer", version.ref = "grammerKit" } mavenPublishPlugin = { id = "com.vanniktech.maven.publish", version.ref = "maven-publish" } downloadPlugin = { id = "de.undercouch.download", version.ref = "download-plugin" } +mokkery = { id = "dev.mokkery", version.ref = "mokkery" } [bundles] sqldelight = [ diff --git a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq index f27815b6..2c8c88fe 100644 --- a/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq +++ b/persistence/src/commonMain/sqldelight/com/persistence/Powersync.sq @@ -12,15 +12,9 @@ powersyncClear: SELECT powersync_clear(?); -- CRUD operations -hasCrud: -SELECT 1 FROM ps_crud LIMIT 1; - getCrudEntries: SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?; -getCrudFirstEntry: -SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1; - getCrudEntryByTxId: SELECT id, tx_id, data FROM ps_crud WHERE tx_id = ? ORDER BY id ASC;