diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index a6974aa..3aa5dcf 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -1,12 +1,10 @@ use alloc::string::{String, ToString}; use core::error::Error; -use sqlite_nostd::{Connection, ResultCode, sqlite3}; - +use sqlite_nostd::{sqlite3, Connection, ResultCode}; #[derive(Debug)] pub struct SQLiteError(pub ResultCode, pub Option); - impl core::fmt::Display for SQLiteError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!(f, "{:?}", self) @@ -16,21 +14,10 @@ impl core::fmt::Display for SQLiteError { impl Error for SQLiteError {} pub trait PSResult { - fn into_result(self) -> Result; fn into_db_result(self, db: *mut sqlite3) -> Result; } impl PSResult for Result { - fn into_result(self) -> Result { - if let Err(code) = self { - Err(SQLiteError(code, None)) - } else if let Ok(r) = self { - Ok(r) - } else { - Err(SQLiteError(ResultCode::ABORT, None)) - } - } - fn into_db_result(self, db: *mut sqlite3) -> Result { if let Err(code) = self { let message = db.errmsg().unwrap_or(String::from("Conversion error")); diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs new file mode 100644 index 0000000..040b65d --- /dev/null +++ b/crates/core/src/kv.rs @@ -0,0 +1,88 @@ +extern crate alloc; + +use alloc::format; +use alloc::string::{String, ToString}; +use core::ffi::c_int; +use core::slice; + +use sqlite::ResultCode; +use sqlite_nostd as sqlite; +use sqlite_nostd::{Connection, Context}; + +use crate::create_sqlite_optional_text_fn; +use crate::create_sqlite_text_fn; +use crate::error::SQLiteError; + +fn powersync_client_id_impl( + ctx: *mut sqlite::context, + _args: &[*mut sqlite::value], +) -> Result { + let db = ctx.db_handle(); + + // language=SQLite + let statement = db.prepare_v2("select value from ps_kv where key = 'client_id'")?; + + if statement.step()? == ResultCode::ROW { + let client_id = statement.column_text(0)?; + return Ok(client_id.to_string()); + } else { + return Err(SQLiteError( + ResultCode::ABORT, + Some(format!("No client_id found in ps_kv")), + )); + } +} + +create_sqlite_text_fn!( + powersync_client_id, + powersync_client_id_impl, + "powersync_last_synced_at" +); + +fn powersync_last_synced_at_impl( + ctx: *mut sqlite::context, + _args: &[*mut sqlite::value], +) -> Result, SQLiteError> { + let db = ctx.db_handle(); + + // language=SQLite + let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?; + + if statement.step()? == ResultCode::ROW { + let client_id = statement.column_text(0)?; + return Ok(Some(client_id.to_string())); + } else { + return Ok(None); + } +} + +create_sqlite_optional_text_fn!( + powersync_last_synced_at, + powersync_last_synced_at_impl, + "powersync_last_synced_at" +); + +pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + db.create_function_v2( + "powersync_client_id", + 0, + sqlite::UTF8 | sqlite::DETERMINISTIC, + None, + Some(powersync_client_id), + None, + None, + None, + )?; + db.create_function_v2( + "powersync_last_synced_at", + 0, + sqlite::UTF8 | sqlite::DETERMINISTIC, + None, + Some(powersync_last_synced_at), + None, + None, + None, + )?; + + Ok(()) +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index da3b2c8..5fd6dc8 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,5 +1,6 @@ #![no_std] #![feature(vec_into_raw_parts)] +#![allow(internal_features)] #![feature(core_intrinsics)] #![feature(error_in_core)] #![feature(assert_matches)] @@ -11,23 +12,24 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; -mod util; -mod uuid; -mod views; -mod view_admin; -mod macros; +mod checkpoint; +mod crud_vtab; mod diff; -mod schema_management; -mod operations_vtab; -mod operations; -mod ext; mod error; -mod crud_vtab; -mod vtab_util; +mod ext; +mod kv; +mod macros; +mod operations; +mod operations_vtab; +mod schema_management; mod sync_local; -mod checkpoint; -mod version; mod sync_types; +mod util; +mod uuid; +mod version; +mod view_admin; +mod views; +mod vtab_util; #[no_mangle] pub extern "C" fn sqlite3_powersync_init( @@ -43,7 +45,7 @@ pub extern "C" fn sqlite3_powersync_init( code as c_int } else { ResultCode::OK as c_int - } + }; } fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { @@ -53,6 +55,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { crate::diff::register(db)?; crate::view_admin::register(db)?; crate::checkpoint::register(db)?; + crate::kv::register(db)?; crate::schema_management::register(db)?; crate::operations_vtab::register(db)?; @@ -61,12 +64,17 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { Ok(()) } - extern "C" { #[cfg(feature = "static")] #[allow(non_snake_case)] pub fn sqlite3_auto_extension( - xEntryPoint: Option c_int>, + xEntryPoint: Option< + extern "C" fn( + *mut sqlite::sqlite3, + *mut *mut c_char, + *mut sqlite::api_routines, + ) -> c_int, + >, ) -> ::core::ffi::c_int; } diff --git a/crates/core/src/macros.rs b/crates/core/src/macros.rs index dc37107..aeb1f1a 100644 --- a/crates/core/src/macros.rs +++ b/crates/core/src/macros.rs @@ -1,4 +1,3 @@ - #[macro_export] macro_rules! create_sqlite_text_fn { ($fn_name:ident, $fn_impl_name:ident, $description:literal) => { @@ -31,6 +30,41 @@ macro_rules! create_sqlite_text_fn { }; } +#[macro_export] +macro_rules! create_sqlite_optional_text_fn { + ($fn_name:ident, $fn_impl_name:ident, $description:literal) => { + extern "C" fn $fn_name( + ctx: *mut sqlite::context, + argc: c_int, + argv: *mut *mut sqlite::value, + ) { + let args = sqlite::args!(argc, argv); + + let result = $fn_impl_name(ctx, args); + + if let Err(err) = result { + let SQLiteError(code, message) = SQLiteError::from(err); + if message.is_some() { + ctx.result_error(&format!("{:} {:}", $description, message.unwrap())); + } else { + let error = ctx.db_handle().errmsg().unwrap(); + if error == "not an error" { + ctx.result_error(&format!("{:}", $description)); + } else { + ctx.result_error(&format!("{:} {:}", $description, error)); + } + } + ctx.result_error_code(code); + } else if let Ok(r) = result { + if let Some(s) = r { + ctx.result_text_transient(&s); + } else { + ctx.result_null(); + } + } + } + }; +} // Wrap a function in an auto-transaction. // Gives the equivalent of SQLite's auto-commit behaviour, except that applies to all statements diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 6ed6784..e3802e0 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,15 +1,11 @@ use alloc::format; use alloc::string::{String, ToString}; -use alloc::vec::Vec; -use serde::{Deserialize, Deserializer, Serialize}; -use serde_json as json; use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; use crate::ext::SafeManagedStmt; -use crate::sync_types::{BucketChecksum, Checkpoint, StreamingSyncLine}; use crate::util::*; // Run inside a transaction @@ -142,7 +138,7 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super supersede_statement.reset()?; let should_skip_remove = !superseded && op == "REMOVE"; - if (should_skip_remove) { + if should_skip_remove { // If a REMOVE statement did not replace (supersede) any previous // operations, we do not need to persist it. // The same applies if the bucket was not synced to the local db yet, @@ -307,9 +303,3 @@ pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), SQLiteE Ok(()) } - -pub fn stream_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { - let line: StreamingSyncLine = serde_json::from_str(data)?; - - Ok(()) -} diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 8fb128e..fbafaae 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -7,9 +7,10 @@ use core::slice; use sqlite::{Connection, ResultCode, Value}; use sqlite_nostd as sqlite; -use crate::operations::{clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation, stream_operation}; +use crate::operations::{ + clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation, +}; use crate::sync_local::sync_local; -use crate::sync_types::Checkpoint; use crate::vtab_util::*; #[repr(C)] @@ -17,12 +18,10 @@ struct VirtualTable { base: sqlite::vtab, db: *mut sqlite::sqlite3, - target_checkpoint: Option, target_applied: bool, - target_validated: bool + target_validated: bool, } - extern "C" fn connect( db: *mut sqlite::sqlite3, _aux: *mut c_void, @@ -31,7 +30,8 @@ extern "C" fn connect( vtab: *mut *mut sqlite::vtab, _err: *mut *mut c_char, ) -> c_int { - if let Err(rc) = sqlite::declare_vtab(db, "CREATE TABLE powersync_operations(op TEXT, data TEXT);") + if let Err(rc) = + sqlite::declare_vtab(db, "CREATE TABLE powersync_operations(op TEXT, data TEXT);") { return rc as c_int; } @@ -44,9 +44,8 @@ extern "C" fn connect( zErrMsg: core::ptr::null_mut(), }, db, - target_checkpoint: None, target_validated: false, - target_applied: false + target_applied: false, })); *vtab = tab.cast::(); let _ = sqlite::vtab_config(db, 0); @@ -102,10 +101,7 @@ extern "C" fn update( } else if op == "delete_bucket" { let result = delete_bucket(db, data); vtab_result(vtab, result) - } else if op == "stream" { - let result = stream_operation(db, data); - vtab_result(vtab, result) - } else { + } else { ResultCode::MISUSE as c_int } } else { diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index cc5f710..382e830 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -2,20 +2,21 @@ use alloc::collections::BTreeSet; use alloc::format; use alloc::string::String; +use crate::error::{PSResult, SQLiteError}; use sqlite_nostd as sqlite; use sqlite_nostd::{ColumnType, Connection, ResultCode}; -use crate::error::{SQLiteError, PSResult}; use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; - pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result { // language=SQLite - let statement = db.prepare_v2("\ + let statement = db.prepare_v2( + "\ SELECT group_concat(name) FROM ps_buckets -WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)")?; +WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)", + )?; if statement.step()? != ResultCode::ROW { return Err(SQLiteError::from(ResultCode::ABORT)); @@ -36,15 +37,15 @@ WHERE target_op > last_op AND (name = '$local' OR pending_delete = 0)")?; Ok(true) } -pub fn sync_local( - db: *mut sqlite::sqlite3, _data: &str) -> Result { - +pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result { if !can_update_local(db)? { return Ok(0); } // language=SQLite - let statement = db.prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'").into_db_result(db)?; + let statement = db + .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") + .into_db_result(db)?; let mut tables: BTreeSet = BTreeSet::new(); while statement.step()? == ResultCode::ROW { @@ -60,7 +61,9 @@ pub fn sync_local( // |--SEARCH r USING INDEX ps_oplog_by_row (row_type=? AND row_id=?) // `--USE TEMP B-TREE FOR GROUP BY // language=SQLite - let statement = db.prepare_v2("\ + let statement = db + .prepare_v2( + "\ -- 3. Group the objects from different buckets together into a single one (ops). SELECT r.row_type as type, r.row_id as id, @@ -79,7 +82,9 @@ FROM ps_buckets AS buckets WHERE r.superseded = 0 AND b.superseded = 0 -- Group for (3) -GROUP BY r.row_type, r.row_id").into_db_result(db)?; +GROUP BY r.row_type, r.row_id", + ) + .into_db_result(db)?; // TODO: cache statements @@ -96,12 +101,16 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?; if buckets == "[]" { // DELETE - let delete_statement = db.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)).into_db_result(db)?; + let delete_statement = db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(db)?; delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; delete_statement.exec()?; } else { // INSERT/UPDATE - let insert_statement = db.prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)).into_db_result(db)?; + let insert_statement = db + .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) + .into_db_result(db)?; insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; insert_statement.exec()?; @@ -110,14 +119,18 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?; if buckets == "[]" { // DELETE // language=SQLite - let delete_statement = db.prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?").into_db_result(db)?; + let delete_statement = db + .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") + .into_db_result(db)?; delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; delete_statement.exec()?; } else { // INSERT/UPDATE // language=SQLite - let insert_statement = db.prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)").into_db_result(db)?; + let insert_statement = db + .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") + .into_db_result(db)?; insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; @@ -127,9 +140,16 @@ GROUP BY r.row_type, r.row_id").into_db_result(db)?; } // language=SQLite - db.exec_safe("UPDATE ps_buckets + db.exec_safe( + "UPDATE ps_buckets SET last_applied_op = last_op - WHERE last_applied_op != last_op").into_db_result(db)?; + WHERE last_applied_op != last_op", + ) + .into_db_result(db)?; + + // language=SQLite + db.exec_safe("insert or replace into ps_kv(key, value) values('last_synced_at', datetime())") + .into_db_result(db)?; Ok(1) } diff --git a/crates/core/src/sync_types.rs b/crates/core/src/sync_types.rs index ad45113..429980d 100644 --- a/crates/core/src/sync_types.rs +++ b/crates/core/src/sync_types.rs @@ -1,20 +1,8 @@ use alloc::string::String; use alloc::vec::Vec; -use serde::{de, Deserialize, Deserializer, Serialize}; -use serde_json as json; +use serde::{Deserialize, Serialize}; -use crate::util::{deserialize_string_to_i64, deserialize_optional_string_to_i64}; -use alloc::format; -use alloc::string::{ToString}; -use core::fmt; -use serde::de::{MapAccess, Visitor}; - -use sqlite_nostd as sqlite; -use sqlite_nostd::{Connection, ResultCode}; -use uuid::Uuid; -use crate::error::{SQLiteError, PSResult}; - -use crate::ext::SafeManagedStmt; +use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; #[derive(Serialize, Deserialize, Debug)] pub struct Checkpoint { @@ -31,154 +19,3 @@ pub struct BucketChecksum { pub bucket: String, pub checksum: i32, } - - -#[derive(Serialize, Deserialize, Debug)] -pub struct CheckpointComplete { - #[serde(deserialize_with = "deserialize_string_to_i64")] - last_op_id: i64 -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct SyncBucketData { - // TODO: complete this - bucket: String -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct Keepalive { - token_expires_in: i32 -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct CheckpointDiff { - #[serde(deserialize_with = "deserialize_string_to_i64")] - last_op_id: i64, - updated_buckets: Vec, - removed_buckets: Vec, - #[serde(default)] - #[serde(deserialize_with = "deserialize_optional_string_to_i64")] - write_checkpoint: Option -} - - - -#[derive(Debug)] -pub enum StreamingSyncLine { - CheckpointLine(Checkpoint), - CheckpointDiffLine(CheckpointDiff), - CheckpointCompleteLine(CheckpointComplete), - SyncBucketDataLine(SyncBucketData), - KeepaliveLine(i32), - Unknown -} - -// Serde does not supporting ignoring unknown fields in externally-tagged enums, so we use our own -// serializer. - -struct StreamingSyncLineVisitor; - -impl<'de> Visitor<'de> for StreamingSyncLineVisitor { - type Value = StreamingSyncLine; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("sync data") - } - - fn visit_map(self, mut access: A) -> Result - where - A: MapAccess<'de>, - { - let mut r = StreamingSyncLine::Unknown; - while let Some((key, value)) = access.next_entry::()? { - if !matches!(r, StreamingSyncLine::Unknown) { - // Generally, we don't expect to receive multiple in one line. - // But if it does happen, we keep the first one. - continue; - } - match key.as_str() { - "checkpoint" => { - r = StreamingSyncLine::CheckpointLine( - serde_json::from_value(value).map_err(de::Error::custom)?, - ); - } - "checkpoint_diff" => { - r = StreamingSyncLine::CheckpointDiffLine( - serde_json::from_value(value).map_err(de::Error::custom)?, - ); - } - "checkpoint_complete" => { - r = StreamingSyncLine::CheckpointCompleteLine( - serde_json::from_value(value).map_err(de::Error::custom)?, - ); - } - "data" => { - r = StreamingSyncLine::SyncBucketDataLine( - serde_json::from_value(value).map_err(de::Error::custom)?, - ); - } - "token_expires_in" => { - r = StreamingSyncLine::KeepaliveLine( - serde_json::from_value(value).map_err(de::Error::custom)?, - ); - } - _ => {} - } - } - - Ok(r) - } -} - -impl<'de> Deserialize<'de> for StreamingSyncLine { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_map(StreamingSyncLineVisitor) - } -} - - -#[cfg(test)] -mod tests { - use core::assert_matches::assert_matches; - use super::*; - - #[test] - fn json_parsing_test() { - let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::KeepaliveLine(42)); - - let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_complete": {"last_op_id": "123"}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::CheckpointCompleteLine(CheckpointComplete { last_op_id: 123 })); - - let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_complete": {"last_op_id": "123", "other": "foo"}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::CheckpointCompleteLine(CheckpointComplete { last_op_id: 123 })); - - let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint": {"last_op_id": "123", "buckets": []}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::CheckpointLine(Checkpoint { last_op_id: 123, .. })); - - let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint": {"last_op_id": "123", "write_checkpoint": "42", "buckets": []}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::CheckpointLine(Checkpoint { last_op_id: 123, write_checkpoint: Some(42), .. })); - - let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_diff": {"last_op_id": "123", "updated_buckets": [], "removed_buckets": []}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::CheckpointDiffLine(CheckpointDiff { last_op_id: 123, .. })); - - // Additional/unknown fields - let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42, "foo": 1}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::KeepaliveLine(42)); - let line: StreamingSyncLine = serde_json::from_str(r#"{}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::Unknown); - let line: StreamingSyncLine = serde_json::from_str(r#"{"other":"test"}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::Unknown); - - // Multiple - keep the first one - let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42, "checkpoint_complete": {"last_op_id": "123"}}"#).unwrap(); - assert_matches!(line, StreamingSyncLine::KeepaliveLine(42)); - - // Test error handling - let line: Result = serde_json::from_str(r#"{"token_expires_in": "42"}"#); - assert!(line.is_err()); - } -} diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index b71fc12..c90ac88 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -3,13 +3,13 @@ extern crate alloc; use alloc::format; use alloc::string::String; -use serde::{Deserialize}; +use serde::Deserialize; use serde_json as json; use sqlite::{Connection, ResultCode}; use sqlite_nostd as sqlite; -use uuid::{Builder,Uuid}; use sqlite_nostd::ManagedStmt; +use uuid::Uuid; use crate::error::SQLiteError; @@ -41,13 +41,18 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String { return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\"")); } -pub fn extract_table_info(db: *mut sqlite::sqlite3, data: &str) -> Result { +pub fn extract_table_info( + db: *mut sqlite::sqlite3, + data: &str, +) -> Result { // language=SQLite - let statement = db.prepare_v2("SELECT + let statement = db.prepare_v2( + "SELECT json_extract(?1, '$.name') as name, ifnull(json_extract(?1, '$.view_name'), json_extract(?1, '$.name')) as view_name, json_extract(?1, '$.local_only') as local_only, - json_extract(?1, '$.insert_only') as insert_only")?; + json_extract(?1, '$.insert_only') as insert_only", + )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; let step_result = statement.step()?; @@ -57,10 +62,9 @@ pub fn extract_table_info(db: *mut sqlite::sqlite3, data: &str) -> Result(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, +where + D: serde::Deserializer<'de>, { let value = json::Value::deserialize(deserializer)?; @@ -71,16 +75,14 @@ pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result(deserializer: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, +where + D: serde::Deserializer<'de>, { let value = json::Value::deserialize(deserializer)?; match value { json::Value::Null => Ok(None), - json::Value::String(s) => s.parse::() - .map(Some) - .map_err(serde::de::Error::custom), + json::Value::String(s) => s.parse::().map(Some).map_err(serde::de::Error::custom), _ => Err(serde::de::Error::custom("Expected a string or null.")), } } @@ -115,7 +117,10 @@ mod tests { fn quote_identifier_test() { assert_eq!(quote_identifier("test"), "\"test\""); assert_eq!(quote_identifier("\"quote\""), "\"\"\"quote\"\"\""); - assert_eq!(quote_identifier("other characters."), "\"other characters.\""); + assert_eq!( + quote_identifier("other characters."), + "\"other characters.\"" + ); } #[test] diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 1626776..bc05706 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -1,7 +1,8 @@ extern crate alloc; use alloc::format; -use alloc::string::String; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; use core::ffi::c_int; use core::slice; @@ -9,9 +10,9 @@ use sqlite::{ResultCode, Value}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::{create_auto_tx_function, create_sqlite_text_fn}; use crate::error::{PSResult, SQLiteError}; use crate::util::quote_identifier; +use crate::{create_auto_tx_function, create_sqlite_text_fn}; fn powersync_drop_view_impl( ctx: *mut sqlite::context, @@ -62,7 +63,9 @@ fn powersync_internal_table_name_impl( let local_db = ctx.db_handle(); // language=SQLite - let stmt1 = local_db.prepare_v2("SELECT json_extract(?1, '$.name') as name, ifnull(json_extract(?1, '$.local_only'), 0)")?; + let stmt1 = local_db.prepare_v2( + "SELECT json_extract(?1, '$.name') as name, ifnull(json_extract(?1, '$.local_only'), 0)", + )?; stmt1.bind_text(1, schema, sqlite::Destructor::STATIC)?; let step_result = stmt1.step()?; @@ -115,26 +118,80 @@ fn powersync_init_impl( let local_db = ctx.db_handle(); // language=SQLite - local_db.exec_safe("\ -CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)")?; + local_db.exec_safe( + "\ +CREATE TABLE IF NOT EXISTS ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)", + )?; // language=SQLite - let stmt = local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; - let rc = stmt.step()?; + let current_version_stmt = + local_db.prepare_v2("SELECT ifnull(max(id), 0) as version FROM ps_migration")?; + let rc = current_version_stmt.step()?; if rc != ResultCode::ROW { return Err(SQLiteError::from(ResultCode::ABORT)); } - let version = stmt.column_int(0)?; - - if version > 2 { - // We persist down migrations, but don't support running them yet - return Err(SQLiteError(ResultCode::MISUSE, Some(String::from("Downgrade not supported")))); + const CODE_VERSION: i32 = 3; + + let mut current_version = current_version_stmt.column_int(0)?; + + while current_version > CODE_VERSION { + // Run down migrations. + // This is rare, we don't worry about optimizing this. + + current_version_stmt.reset()?; + + let down_migrations_stmt = local_db.prepare_v2("select e.value ->> 'sql' as sql from (select id, down_migrations from ps_migration where id > ?1 order by id desc limit 1) m, json_each(m.down_migrations) e")?; + down_migrations_stmt.bind_int(1, CODE_VERSION)?; + + let mut down_sql: Vec = alloc::vec![]; + + while down_migrations_stmt.step()? == ResultCode::ROW { + let sql = down_migrations_stmt.column_text(0)?; + down_sql.push(sql.to_string()); + } + + for sql in down_sql { + let rs = local_db.exec_safe(&sql); + if let Err(code) = rs { + return Err(SQLiteError( + code, + Some(format!( + "Down migration failed for {:} {:}", + current_version, sql + )), + )); + } + } + + // Refresh the version + current_version_stmt.reset()?; + let rc = current_version_stmt.step()?; + if rc != ResultCode::ROW { + return Err(SQLiteError( + rc, + Some("Down migration failed - could not get version".to_string()), + )); + } + let new_version = current_version_stmt.column_int(0)?; + if new_version >= current_version { + // Database down from version $currentVersion to $version failed - version not updated after dow migration + return Err(SQLiteError( + ResultCode::ABORT, + Some(format!( + "Down migration failed - version not updated from {:}", + current_version + )), + )); + } + current_version = new_version; } - if version < 1 { + if current_version < 1 { // language=SQLite - local_db.exec_safe(" + local_db + .exec_safe( + " CREATE TABLE ps_oplog( bucket TEXT NOT NULL, op_id INTEGER NOT NULL, @@ -164,10 +221,12 @@ CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT); INSERT INTO ps_migration(id, down_migrations) VALUES(1, NULL); -").into_db_result(local_db)?; +", + ) + .into_db_result(local_db)?; } - if version < 2 { + if current_version < 2 { // language=SQLite local_db.exec_safe("\ CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER); @@ -175,20 +234,80 @@ INSERT INTO ps_tx(id, current_tx, next_tx) VALUES(1, NULL, 1); ALTER TABLE ps_crud ADD COLUMN tx_id INTEGER; -INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migrations WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array()))); +INSERT INTO ps_migration(id, down_migrations) VALUES(2, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 2', 'params', json_array()), json_object('sql', 'DROP TABLE ps_tx', 'params', json_array()), json_object('sql', 'ALTER TABLE ps_crud DROP COLUMN tx_id', 'params', json_array()))); ").into_db_result(local_db)?; } + if current_version < 3 { + // language=SQLite + local_db.exec_safe("\ +CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB); +INSERT INTO ps_kv(key, value) values('client_id', uuid()); + +INSERT INTO ps_migration(id, down_migrations) VALUES(3, json_array(json_object('sql', 'DELETE FROM ps_migration WHERE id >= 3'), json_object('sql', 'DROP TABLE ps_kv'))); + ").into_db_result(local_db)?; + } + Ok(String::from("")) } - create_auto_tx_function!(powersync_init_tx, powersync_init_impl); -create_sqlite_text_fn!( - powersync_init, - powersync_init_tx, - "powersync_init" -); +create_sqlite_text_fn!(powersync_init, powersync_init_tx, "powersync_init"); + +fn powersync_clear_impl( + ctx: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result { + let local_db = ctx.db_handle(); + + let clear_local = args[0].int(); + + // language=SQLite + local_db.exec_safe( + "\ +DELETE FROM ps_oplog; +DELETE FROM ps_crud; +DELETE FROM ps_buckets; +DELETE FROM ps_untyped; +DELETE FROM ps_kv WHERE key != 'client_id'; +", + )?; + + let table_glob = if clear_local != 0 { + "ps_data_*" + } else { + "ps_data__*" + }; + + let tables_stmt = local_db + .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?1")?; + tables_stmt.bind_text(1, table_glob, sqlite::Destructor::STATIC)?; + + let mut tables: Vec = alloc::vec![]; + + while tables_stmt.step()? == ResultCode::ROW { + let name = tables_stmt.column_text(0)?; + tables.push(name.to_string()); + } + + for name in tables { + let quoted = quote_identifier(&name); + // The first delete statement deletes a single row, to trigger an update notification for the table. + // The second delete statement uses the truncate optimization to delete the remainder of the data. + let delete_sql = format!( + "\ +DELETE FROM {table} WHERE rowid IN (SELECT rowid FROM {table} LIMIT 1); +DELETE FROM {table};", + table = quoted + ); + local_db.exec_safe(&delete_sql)?; + } + + Ok(String::from("")) +} + +create_auto_tx_function!(powersync_clear_tx, powersync_clear_impl); +create_sqlite_text_fn!(powersync_clear, powersync_clear_tx, "powersync_clear"); pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { // This entire module is just making it easier to edit sqlite_master using queries. @@ -248,6 +367,18 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { None, )?; + // Initialize the extension internal tables. + db.create_function_v2( + "powersync_clear", + 1, + sqlite::UTF8, + None, + Some(powersync_clear), + None, + None, + None, + )?; + db.create_function_v2( "powersync_external_table_name", 1, @@ -259,7 +390,6 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { None, )?; - db.create_function_v2( "powersync_internal_table_name", 1, @@ -324,14 +454,16 @@ BEGIN END;")?; // language=SQLite - db.exec_safe("\ + db.exec_safe( + "\ CREATE TEMP VIEW powersync_tables(name, internal_name, local_only) AS SELECT powersync_external_table_name(name) as name, name as internal_name, name GLOB 'ps_data_local__*' as local_only FROM sqlite_master - WHERE type = 'table' AND name GLOB 'ps_data_*';")?; + WHERE type = 'table' AND name GLOB 'ps_data_*';", + )?; Ok(()) }