Skip to content

fix: down syncs not being reflected in client app #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ internal class BucketStorage(

logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" }

return db.readTransaction {
return db.writeTransaction {
if (hasCrud()) {
logger.w { "[updateLocalTarget] ps crud is not empty" }
return@readTransaction false
return@writeTransaction false
}

val seqAfter =
Expand All @@ -88,15 +88,17 @@ internal class BucketStorage(
throw AssertionError("Sqlite Sequence should not be empty")

if (seqAfter != seqBefore) {
logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore")
// New crud data may have been uploaded since we got the checkpoint. Abort.
return@readTransaction false
return@writeTransaction false
}

db.execute(
"UPDATE ${InternalTable.BUCKETS} SET target_op = ? WHERE name='\$local'",
"UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
listOf(opId)
)
return@readTransaction true

return@writeTransaction true
}
}

Expand All @@ -113,7 +115,7 @@ internal class BucketStorage(

suspend fun getBucketStates(): List<BucketState> {
return db.getAll(
"SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
"SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
mapper = { cursor ->
BucketState(
bucket = cursor.getString(0)!!,
Expand All @@ -138,6 +140,8 @@ internal class BucketStorage(
)
}

Logger.d("[deleteBucket] Done deleting")

this.pendingBucketDeletes.value = true
}

Expand Down Expand Up @@ -206,7 +210,7 @@ internal class BucketStorage(

private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult {
val res = db.getOptional(
"SELECT powersync_validate_checkpoint(?) as result",
"SELECT powersync_validate_checkpoint(?) AS result",
parameters = listOf(JsonUtil.json.encodeToString(checkpoint)),
mapper = { cursor ->
cursor.getString(0)!!
Expand All @@ -227,11 +231,16 @@ internal class BucketStorage(
*/
private suspend fun updateObjectsFromBuckets(): Boolean {
return db.writeTransaction { tx ->
val res = tx.execute(

tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
listOf("sync_local", "")
)

val res = tx.get("select last_insert_rowid()") { cursor ->
cursor.getLong(0)!!
}

return@writeTransaction res == 1L
}
}
Expand Down Expand Up @@ -260,12 +269,9 @@ internal class BucketStorage(

db.writeTransaction { tx ->
tx.execute(
"DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)",
"INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","")
)

tx.execute(
"DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op",
)
// Executed once after start-up, and again when there are pending deletes.
pendingBucketDeletes.value = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ internal class PowerSyncDatabaseImpl(
}

return@readTransaction CrudTransaction(
crud = entries, transactionId = txId,
crud = entries,
transactionId = txId,
complete = { writeCheckpoint ->
logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" }
handleWriteCheckpoint(entries.last().clientId, writeCheckpoint)
Expand Down Expand Up @@ -260,12 +261,12 @@ internal class PowerSyncDatabaseImpl(

if (writeCheckpoint != null && bucketStorage.hasCrud()) {
tx.execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
listOf(writeCheckpoint),
)
} else {
tx.execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
listOf(bucketStorage.getMaxOpId()),
)
}
Expand Down
10 changes: 2 additions & 8 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ internal class SyncStream(
return false
} else {
// This isolate is the only one triggering
bucketStorage.updateLocalTarget { getWriteCheckpoint() }
return true
return bucketStorage.updateLocalTarget { getWriteCheckpoint() }
}
}

private suspend fun getWriteCheckpoint(): String {
val credentials = connector.getCredentialsCached()
require(credentials != null) { "Not logged in" }
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'")
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId")

val response = httpClient.get(uri) {
contentType(ContentType.Application.Json)
Expand Down Expand Up @@ -263,9 +262,7 @@ internal class SyncStream(
jsonString: String,
state: SyncStreamState
): SyncStreamState {
logger.i { "[handleInstruction] Received Instruction: $jsonString" }
val obj = JsonUtil.json.parseToJsonElement(jsonString).jsonObject

// TODO: Clean up
when {
isStreamingSyncCheckpoint(obj) -> return handleStreamingSyncCheckpoint(obj, state)
Expand Down Expand Up @@ -293,7 +290,6 @@ internal class SyncStream(
): SyncStreamState {
val checkpoint =
JsonUtil.json.decodeFromJsonElement<Checkpoint>(jsonObj["checkpoint"] as JsonElement)

state.targetCheckpoint = checkpoint
val bucketsToDelete = state.bucketSet!!.toMutableList()
val newBuckets = mutableSetOf<String>()
Expand Down Expand Up @@ -388,8 +384,6 @@ internal class SyncStream(
jsonObj: JsonObject,
state: SyncStreamState
): SyncStreamState {


val syncBuckets =
listOf<SyncDataBucket>(JsonUtil.json.decodeFromJsonElement(jsonObj["data"] as JsonElement))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.powersync.demos.powersync

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.powersync.PowerSyncDatabase
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

internal class ListContent(
Expand Down Expand Up @@ -37,7 +39,7 @@ internal class ListContent(
}

fun onItemDeleteClicked(item: ListItem) {
runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
Expand All @@ -48,7 +50,7 @@ internal class ListContent(
fun onAddItemClicked() {
if (_inputText.value.isBlank()) return

runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute(
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.powersync.demos.powersync

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.powersync.PowerSyncDatabase
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

internal class ListContent(
Expand Down Expand Up @@ -37,7 +39,7 @@ internal class ListContent(
}

fun onItemDeleteClicked(item: ListItem) {
runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
Expand All @@ -48,7 +50,7 @@ internal class ListContent(
fun onAddItemClicked() {
if (_inputText.value.isBlank()) return

runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute(
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
Expand Down
Loading