diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 443743a9..fac87d87 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -74,10 +74,10 @@ internal class BucketStorage( logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } - return db.readTransaction { + return db.writeTransaction { if (hasCrud()) { logger.w { "[updateLocalTarget] ps crud is not empty" } - return@readTransaction false + return@writeTransaction false } val seqAfter = @@ -88,15 +88,17 @@ internal class BucketStorage( throw AssertionError("Sqlite Sequence should not be empty") if (seqAfter != seqBefore) { + logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") // New crud data may have been uploaded since we got the checkpoint. Abort. - return@readTransaction false + return@writeTransaction false } db.execute( - "UPDATE ${InternalTable.BUCKETS} SET target_op = ? WHERE name='\$local'", + "UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'", listOf(opId) ) - return@readTransaction true + + return@writeTransaction true } } @@ -113,7 +115,7 @@ internal class BucketStorage( suspend fun getBucketStates(): List { return db.getAll( - "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", + "SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0", mapper = { cursor -> BucketState( bucket = cursor.getString(0)!!, @@ -138,6 +140,8 @@ internal class BucketStorage( ) } + Logger.d("[deleteBucket] Done deleting") + this.pendingBucketDeletes.value = true } @@ -206,7 +210,7 @@ internal class BucketStorage( private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { val res = db.getOptional( - "SELECT powersync_validate_checkpoint(?) as result", + "SELECT powersync_validate_checkpoint(?) AS result", parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), mapper = { cursor -> cursor.getString(0)!! @@ -227,11 +231,16 @@ internal class BucketStorage( */ private suspend fun updateObjectsFromBuckets(): Boolean { return db.writeTransaction { tx -> - val res = tx.execute( + + tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", listOf("sync_local", "") ) + val res = tx.get("select last_insert_rowid()") { cursor -> + cursor.getLong(0)!! + } + return@writeTransaction res == 1L } } @@ -260,12 +269,9 @@ internal class BucketStorage( db.writeTransaction { tx -> tx.execute( - "DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)", + "INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","") ) - tx.execute( - "DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op", - ) // Executed once after start-up, and again when there are pending deletes. pendingBucketDeletes.value = false } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 5edcfc1c..eef61a4a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -192,7 +192,8 @@ internal class PowerSyncDatabaseImpl( } return@readTransaction CrudTransaction( - crud = entries, transactionId = txId, + crud = entries, + transactionId = txId, complete = { writeCheckpoint -> logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" } handleWriteCheckpoint(entries.last().clientId, writeCheckpoint) @@ -260,12 +261,12 @@ internal class PowerSyncDatabaseImpl( if (writeCheckpoint != null && bucketStorage.hasCrud()) { tx.execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'", + "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'", listOf(writeCheckpoint), ) } else { tx.execute( - "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'", + "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'", listOf(bucketStorage.getMaxOpId()), ) } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 6b0c18c9..b3a126fd 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -162,15 +162,14 @@ internal class SyncStream( return false } else { // This isolate is the only one triggering - bucketStorage.updateLocalTarget { getWriteCheckpoint() } - return true + return bucketStorage.updateLocalTarget { getWriteCheckpoint() } } } private suspend fun getWriteCheckpoint(): String { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } - val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'") + val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId") val response = httpClient.get(uri) { contentType(ContentType.Application.Json) @@ -263,9 +262,7 @@ internal class SyncStream( jsonString: String, state: SyncStreamState ): SyncStreamState { - logger.i { "[handleInstruction] Received Instruction: $jsonString" } val obj = JsonUtil.json.parseToJsonElement(jsonString).jsonObject - // TODO: Clean up when { isStreamingSyncCheckpoint(obj) -> return handleStreamingSyncCheckpoint(obj, state) @@ -293,7 +290,6 @@ internal class SyncStream( ): SyncStreamState { val checkpoint = JsonUtil.json.decodeFromJsonElement(jsonObj["checkpoint"] as JsonElement) - state.targetCheckpoint = checkpoint val bucketsToDelete = state.bucketSet!!.toMutableList() val newBuckets = mutableSetOf() @@ -388,8 +384,6 @@ internal class SyncStream( jsonObj: JsonObject, state: SyncStreamState ): SyncStreamState { - - val syncBuckets = listOf(JsonUtil.json.decodeFromJsonElement(jsonObj["data"] as JsonElement)) diff --git a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/List.kt b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/List.kt index 585a7d51..33c378c4 100644 --- a/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/List.kt +++ b/demos/android-supabase-todolist/app/src/main/java/com/powersync/androidexample/powersync/List.kt @@ -1,10 +1,12 @@ package com.powersync.demos.powersync import androidx.lifecycle.ViewModel +import androidx.lifecycle.viewModelScope import com.powersync.PowerSyncDatabase import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking internal class ListContent( @@ -37,7 +39,7 @@ internal class ListContent( } fun onItemDeleteClicked(item: ListItem) { - runBlocking { + viewModelScope.launch { db.writeTransaction { tx -> tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id)) tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id)) @@ -48,7 +50,7 @@ internal class ListContent( fun onAddItemClicked() { if (_inputText.value.isBlank()) return - runBlocking { + viewModelScope.launch { db.writeTransaction { tx -> tx.execute( "INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)", diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/powersync/List.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/powersync/List.kt index 585a7d51..33c378c4 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/powersync/List.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/powersync/List.kt @@ -1,10 +1,12 @@ package com.powersync.demos.powersync import androidx.lifecycle.ViewModel +import androidx.lifecycle.viewModelScope import com.powersync.PowerSyncDatabase import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking internal class ListContent( @@ -37,7 +39,7 @@ internal class ListContent( } fun onItemDeleteClicked(item: ListItem) { - runBlocking { + viewModelScope.launch { db.writeTransaction { tx -> tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id)) tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id)) @@ -48,7 +50,7 @@ internal class ListContent( fun onAddItemClicked() { if (_inputText.value.isBlank()) return - runBlocking { + viewModelScope.launch { db.writeTransaction { tx -> tx.execute( "INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",