diff --git a/CHANGELOG.md b/CHANGELOG.md index db4f36da..e95827c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`. * Update PowerSync core extension to `0.4.1`, fixing an issue with the new Rust client. +* Rust sync client: Fix writes made while offline not being uploaded reliably. ## 1.2.0 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index b6b214eb..f780a0d7 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -563,6 +563,65 @@ abstract class BaseSyncIntegrationTest( database.expectUserCount(2) } + @Test + fun `handles write made while offline`() = + databaseTest { + connector = TestConnector() + val uploadCompleted = CompletableDeferred() + checkpointResponse = { + uploadCompleted.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("1")) + } + + database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write")) + database.connect(connector, options = options) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(scope) + turbine.waitFor { it.connected } + + val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope) + query.awaitItem() shouldBe listOf("local write") + + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 1234)) + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + writeCheckpoint = "1", + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from server"}""", + ), + ), + after = null, + nextAfter = null, + ), + ) + + uploadCompleted.await() + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe listOf("from server") + + turbine.cancelAndIgnoreRemainingEvents() + query.cancelAndIgnoreRemainingEvents() + } + } + @Test fun testTokenExpired() = databaseTest { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 8d0dbc43..9559f6fb 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -48,7 +48,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock import kotlinx.serialization.Serializable -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement @@ -133,6 +132,7 @@ internal class SyncStream( uploadAllCrud() } finally { if (holdingUploadLock) { + logger.v { "crud upload: notify completion" } completedCrudUploads.send(Unit) isUploadingCrud.set(null) } @@ -297,6 +297,7 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, + var hadSyncLine: Boolean = false, var fetchLinesJob: Job? = null, var credentialsInvalidation: Job? = null, ) { @@ -342,6 +343,8 @@ internal class SyncStream( fetchLinesJob = scope.launch { launch { + logger.v { "listening for completed uploads" } + for (completion in completedCrudUploads) { control("completed_upload") } @@ -406,16 +409,36 @@ internal class SyncStream( } } + /** + * Triggers a crud upload when called for the first time. + * + * We could have pending local writes made while disconnected, so in addition to listening + * on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We + * do this on the first sync line because the client is likely to be online in that case. + */ + private fun triggerCrudUploadIfFirstLine() { + if (!hadSyncLine) { + triggerCrudUploadAsync() + hadSyncLine = true + } + } + + private suspend fun line(text: String) { + triggerCrudUploadIfFirstLine() + control("line_text", text) + } + + private suspend fun line(blob: ByteArray) { + triggerCrudUploadIfFirstLine() + control("line_binary", blob) + } + private suspend fun connect(start: Instruction.EstablishSyncStream) { when (val method = options.method) { ConnectionMethod.Http -> - connectViaHttp(start.request).collect { rawLine -> - control("line_text", rawLine) - } + connectViaHttp(start.request).collect(this::line) is ConnectionMethod.WebSocket -> - connectViaWebSocket(start.request, method).collect { binaryLine -> - control("line_binary", binaryLine) - } + connectViaWebSocket(start.request, method).collect(this::line) } } }