diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 671c334..10e434d 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -1,8 +1,7 @@ extern crate alloc; - use alloc::format; -use alloc::string::{String}; +use alloc::string::String; use alloc::vec::Vec; use core::ffi::c_int; use core::slice; @@ -34,7 +33,8 @@ fn powersync_validate_checkpoint_impl( let db = ctx.db_handle(); // language=SQLite - let statement = db.prepare_v2("WITH + let statement = db.prepare_v2( + "WITH bucket_list(bucket, lower_op_id, checksum) AS ( SELECT json_extract(json_each.value, '$.bucket') as bucket, @@ -57,7 +57,8 @@ FROM bucket_list bucket_list.bucket = oplog.bucket AND oplog.op_id <= CAST(json_extract(?1, '$.last_op_id') as INTEGER) AND oplog.op_id > bucket_list.lower_op_id -GROUP BY bucket_list.bucket")?; +GROUP BY bucket_list.bucket", + )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; @@ -65,6 +66,7 @@ GROUP BY bucket_list.bucket")?; while statement.step()? == ResultCode::ROW { let name = statement.column_text(0)?; + // checksums with column_int are wrapped to i32 by SQLite let add_checksum = statement.column_int(1)?; let oplog_checksum = statement.column_int(2)?; let _count = statement.column_int(3)?; @@ -72,7 +74,8 @@ GROUP BY bucket_list.bucket")?; let _last_applied_op = statement.column_int64(5)?; let expected_checksum = statement.column_int(6)?; - let checksum = oplog_checksum + add_checksum; + // wrapping add is like +, but safely overflows + let checksum = oplog_checksum.wrapping_add(add_checksum); if checksum != expected_checksum { failures.push(String::from(name)); @@ -87,7 +90,11 @@ GROUP BY bucket_list.bucket")?; Ok(json::to_string(&result)?) } -create_sqlite_text_fn!(powersync_validate_checkpoint, powersync_validate_checkpoint_impl, "powersync_validate_checkpoint"); +create_sqlite_text_fn!( + powersync_validate_checkpoint, + powersync_validate_checkpoint_impl, + "powersync_validate_checkpoint" +); pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { db.create_function_v2( @@ -103,5 +110,3 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { Ok(()) } - - diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 8f38f93..6ed6784 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -4,26 +4,27 @@ use alloc::vec::Vec; use serde::{Deserialize, Deserializer, Serialize}; use serde_json as json; +use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; -use crate::error::{SQLiteError, PSResult}; use crate::ext::SafeManagedStmt; use crate::sync_types::{BucketChecksum, Checkpoint, StreamingSyncLine}; use crate::util::*; // Run inside a transaction -pub fn insert_operation( - db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { +pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { // language=SQLite - let statement = db.prepare_v2("\ + let statement = db.prepare_v2( + "\ SELECT json_extract(e.value, '$.bucket') as bucket, json_extract(e.value, '$.data') as data, json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?, '$.buckets')) e")?; +FROM json_each(json_extract(?, '$.buckets')) e", + )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; while statement.step()? == ResultCode::ROW { @@ -39,9 +40,15 @@ FROM json_each(json_extract(?, '$.buckets')) e")?; Ok(()) } -pub fn insert_bucket_operations(db: *mut sqlite::sqlite3, bucket: &str, data: &str) -> Result<(), SQLiteError> { +pub fn insert_bucket_operations( + db: *mut sqlite::sqlite3, + bucket: &str, + data: &str, +) -> Result<(), SQLiteError> { + // Statement to insert new operations (only for PUT and REMOVE). // language=SQLite - let iterate_statement = db.prepare_v2("\ + let iterate_statement = db.prepare_v2( + "\ SELECT json_extract(e.value, '$.op_id') as op_id, json_extract(e.value, '$.op') as op, @@ -50,39 +57,50 @@ SELECT json_extract(e.value, '$.checksum') as checksum, json_extract(e.value, '$.data') as data, json_extract(e.value, '$.subkey') as subkey -FROM json_each(?) e")?; +FROM json_each(?) e", + )?; iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; + // Statement to supersede (replace) operations with the same key. // language=SQLite - let supersede_statement = db.prepare_v2("\ -UPDATE ps_oplog SET - superseded = 1, - op = 2, - data = NULL + let supersede_statement = db.prepare_v2( + "\ +DELETE FROM ps_oplog WHERE ps_oplog.superseded = 0 AND unlikely(ps_oplog.bucket = ?1) - AND ps_oplog.key = ?2")?; + AND ps_oplog.key = ?2 +RETURNING op_id, hash", + )?; supersede_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; // language=SQLite let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")?; +INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)")?; insert_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; + // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. + // We can consider splitting this into separate SELECT and INSERT statements. // language=SQLite - let bucket_statement = db.prepare_v2("INSERT OR IGNORE INTO ps_buckets(name) VALUES(?)")?; + let bucket_statement = db.prepare_v2( + "INSERT INTO ps_buckets(name) + VALUES(?) + ON CONFLICT DO UPDATE + SET last_applied_op = last_applied_op + RETURNING last_applied_op", + )?; bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - bucket_statement.exec()?; + bucket_statement.step()?; - // language=SQLite - let bucket_target_statement = db.prepare_v2("\ -UPDATE ps_buckets - SET target_op = MAX(ifnull(cast(json_extract(?, '$.target') as integer), 0), ps_buckets.target_op) - WHERE name = ?")?; - bucket_target_statement.bind_text(2, bucket, sqlite::Destructor::STATIC)?; + // This is an optimization for initial sync - we can avoid persisting individual REMOVE + // operations when last_applied_op = 0. + // We do still need to do the "supersede_statement" step for this case, since a REMOVE + // operation can supersede another PUT operation we're syncing at the same time. + let mut last_applied_op = bucket_statement.column_int64(0)?; + + bucket_statement.reset()?; - let mut first_op: Option = None; let mut last_op: Option = None; + let mut add_checksum: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { let op_id = iterate_statement.column_int64(0)?; @@ -93,29 +111,54 @@ UPDATE ps_buckets let op_data = iterate_statement.column_text(5); last_op = Some(op_id); - if first_op.is_none() { - first_op = Some(op_id); - } - if op == "PUT" || op == "REMOVE" || op == "MOVE" { + if op == "PUT" || op == "REMOVE" { let key: String; if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) { let subkey = iterate_statement.column_text(6).unwrap_or("null"); key = format!("{}/{}/{}", &object_type, &object_id, subkey); - supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; - supersede_statement.exec()?; } else { key = String::from(""); } - let superseded = if op == "MOVE" { 1 } else { 0 }; - let opi = if op == "MOVE" { 2 } else if op == "PUT" { 3 } else { 4 }; + supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; + + let mut superseded = false; + + while supersede_statement.step()? == ResultCode::ROW { + // Superseded (deleted) a previous operation, add the checksum + let superseded_op = supersede_statement.column_int64(0)?; + let supersede_checksum = supersede_statement.column_int(1)?; + add_checksum = add_checksum.wrapping_add(supersede_checksum); + + if superseded_op <= last_applied_op { + // Superseded an operation previously applied - we cannot skip removes + // For initial sync, last_applied_op = 0, so this is always false. + // For subsequent sync, this is only true if the row was previously + // synced, not when it was first synced in the current batch. + superseded = true; + } + } + supersede_statement.reset()?; + + let should_skip_remove = !superseded && op == "REMOVE"; + if (should_skip_remove) { + // If a REMOVE statement did not replace (supersede) any previous + // operations, we do not need to persist it. + // The same applies if the bucket was not synced to the local db yet, + // even if it did supersede another operation. + // Handle the same as MOVE. + add_checksum = add_checksum.wrapping_add(checksum); + continue; + } + + let opi = if op == "PUT" { 3 } else { 4 }; insert_statement.bind_int64(2, op_id)?; insert_statement.bind_int(3, opi)?; - if key == "" { - insert_statement.bind_null(4)?; - } else { + if key != "" { insert_statement.bind_text(4, &key, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(4)?; } if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { @@ -132,15 +175,9 @@ UPDATE ps_buckets } insert_statement.bind_int(8, checksum)?; - insert_statement.bind_int(9, superseded)?; insert_statement.exec()?; - - if op == "MOVE" { - if let Ok(data) = op_data { - bucket_target_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - bucket_target_statement.exec()?; - } - } + } else if op == "MOVE" { + add_checksum = add_checksum.wrapping_add(checksum); } else if op == "CLEAR" { // Any remaining PUT operations should get an implicit REMOVE // language=SQLite @@ -151,77 +188,60 @@ UPDATE ps_buckets // And we need to re-apply all of those. // We also replace the checksum with the checksum of the CLEAR op. // language=SQLite - let clear_statement2 = db.prepare_v2("UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2")?; + let clear_statement2 = db.prepare_v2( + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1 WHERE name = ?2", + )?; clear_statement2.bind_text(2, bucket, sqlite::Destructor::STATIC)?; clear_statement2.bind_int(1, checksum)?; clear_statement2.exec()?; + + add_checksum = 0; + last_applied_op = 0; } } if let Some(last_op) = &last_op { // language=SQLite - let statement = db.prepare_v2("UPDATE ps_buckets SET last_op = ?1 WHERE name = ?2")?; - statement.bind_text(2, bucket, sqlite::Destructor::STATIC)?; - statement.bind_int64(1, *last_op)?; - statement.exec()?; - } - - - // Compact superseded ops immediately - if let (Some(first_op), Some(last_op)) = (&first_op, &last_op) { - // language=SQLite - let statement = db.prepare_v2("UPDATE ps_buckets - SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0) - FROM ps_oplog AS oplog - WHERE superseded = 1 - AND oplog.bucket = ?1 - AND oplog.op_id >= ?2 - AND oplog.op_id <= ?3) - WHERE ps_buckets.name = ?1")?; + let statement = db.prepare_v2( + "UPDATE ps_buckets + SET last_op = ?2, + add_checksum = add_checksum + ?3 + WHERE name = ?1", + )?; statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - statement.bind_int64(2, *first_op)?; - statement.bind_int64(3, *last_op)?; - statement.exec()?; + statement.bind_int64(2, *last_op)?; + statement.bind_int(3, add_checksum)?; - // language=SQLite - let statement = db.prepare_v2("DELETE - FROM ps_oplog - WHERE superseded = 1 - AND bucket = ? - AND op_id >= ? - AND op_id <= ?")?; - statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - statement.bind_int64(2, *first_op)?; - statement.bind_int64(3, *last_op)?; statement.exec()?; } Ok(()) } -pub fn clear_remove_ops( - db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { - +pub fn clear_remove_ops(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { // language=SQLite - let statement = db.prepare_v2( - "SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?; + let statement = + db.prepare_v2("SELECT name, last_applied_op FROM ps_buckets WHERE pending_delete = 0")?; // language=SQLite - let update_statement = db.prepare_v2("UPDATE ps_buckets + let update_statement = db.prepare_v2( + "UPDATE ps_buckets SET add_checksum = add_checksum + (SELECT IFNULL(SUM(hash), 0) FROM ps_oplog AS oplog WHERE (superseded = 1 OR op != 3) AND oplog.bucket = ?1 AND oplog.op_id <= ?2) - WHERE ps_buckets.name = ?1")?; + WHERE ps_buckets.name = ?1", + )?; // language=SQLite - let delete_statement = db.prepare_v2("DELETE + let delete_statement = db.prepare_v2( + "DELETE FROM ps_oplog WHERE (superseded = 1 OR op != 3) AND bucket = ?1 - AND op_id <= ?2")?; - + AND op_id <= ?2", + )?; while statement.step()? == ResultCode::ROW { // Note: Each iteration here may be run in a separate transaction. @@ -242,10 +262,7 @@ pub fn clear_remove_ops( Ok(()) } - -pub fn delete_pending_buckets( - db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { - +pub fn delete_pending_buckets(db: *mut sqlite::sqlite3, _data: &str) -> Result<(), SQLiteError> { // language=SQLite let statement = db.prepare_v2( "DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)")?; @@ -258,31 +275,27 @@ pub fn delete_pending_buckets( Ok(()) } - -pub fn delete_bucket( - db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> { - +pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteError> { let id = gen_uuid(); let new_name = format!("$delete_{}_{}", name, id.hyphenated().to_string()); // language=SQLite let statement = db.prepare_v2( - "UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2")?; + "UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2", + )?; statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?; statement.bind_text(2, &name, sqlite::Destructor::STATIC)?; statement.exec()?; // Rename bucket // language=SQLite - let statement = db.prepare_v2( - "UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?; + let statement = db.prepare_v2("UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2")?; statement.bind_text(1, &new_name, sqlite::Destructor::STATIC)?; statement.bind_text(2, name, sqlite::Destructor::STATIC)?; statement.exec()?; // language=SQLite - let statement = db.prepare_v2( - "DELETE FROM ps_buckets WHERE name = ?1")?; + let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1")?; statement.bind_text(1, name, sqlite::Destructor::STATIC)?; statement.exec()?; @@ -295,13 +308,8 @@ pub fn delete_bucket( Ok(()) } - -pub fn stream_operation( - db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { - +pub fn stream_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { let line: StreamingSyncLine = serde_json::from_str(data)?; Ok(()) } - -