Skip to content

Use serde to read tables and indexes from schema #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ num-traits = { version = "0.2.15", default-features = false }
num-derive = "0.3"
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] }
const_format = "0.2.34"

[dependencies.uuid]
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@ impl From<serde_json::Error> for SQLiteError {
SQLiteError(ResultCode::ABORT, Some(value.to_string()))
}
}

impl From<core::fmt::Error> for SQLiteError {
fn from(value: core::fmt::Error) -> Self {
SQLiteError(ResultCode::INTERNAL, Some(format!("{}", value)))
}
}
2 changes: 1 addition & 1 deletion crates/core/src/fix_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn remove_duplicate_key_encoding(key: &str) -> Option<String> {
}

fn powersync_remove_duplicate_key_encoding_impl(
ctx: *mut sqlite::context,
_ctx: *mut sqlite::context,
args: &[*mut sqlite::value],
) -> Result<Option<String>, SQLiteError> {
let arg = args.get(0).ok_or(ResultCode::MISUSE)?;
Expand Down
122 changes: 60 additions & 62 deletions crates/core/src/schema/management.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
extern crate alloc;

use alloc::format;
use alloc::string::String;
use alloc::vec::Vec;
use alloc::{format, vec};
use core::ffi::c_int;

use sqlite::{Connection, ResultCode, Value};
Expand All @@ -14,6 +14,8 @@ use crate::ext::ExtendedDatabase;
use crate::util::{quote_identifier, quote_json_path};
use crate::{create_auto_tx_function, create_sqlite_text_fn};

use super::Schema;

fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> {
{
// In a block so that the statement is finalized before dropping tables
Expand Down Expand Up @@ -138,87 +140,83 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN (

fn update_indexes(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> {
let mut statements: Vec<String> = alloc::vec![];
let schema = serde_json::from_str::<Schema>(schema)?;
let mut expected_index_names: Vec<String> = vec![];

{
// In a block so that the statement is finalized before dropping indexes
// language=SQLite
let statement = db.prepare_v2("\
SELECT
powersync_internal_table_name(tables.value) as table_name,
(powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name,
json_extract(indexes.value, '$.columns') as index_columns,
ifnull(sqlite_master.sql, '') as sql
FROM json_each(json_extract(?, '$.tables')) tables
CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes
LEFT JOIN sqlite_master ON sqlite_master.name = index_name AND sqlite_master.type = 'index'
").into_db_result(db)?;
statement.bind_text(1, schema, sqlite::Destructor::STATIC)?;
let find_index =
db.prepare_v2("SELECT sql FROM sqlite_master WHERE name = ? AND type = 'index'")?;

while statement.step().into_db_result(db)? == ResultCode::ROW {
let table_name = statement.column_text(0)?;
let index_name = statement.column_text(1)?;
let columns = statement.column_text(2)?;
let existing_sql = statement.column_text(3)?;

// language=SQLite
let stmt2 = db.prepare_v2("select json_extract(e.value, '$.name') as name, json_extract(e.value, '$.type') as type, json_extract(e.value, '$.ascending') as ascending from json_each(?) e")?;
stmt2.bind_text(1, columns, sqlite::Destructor::STATIC)?;

let mut column_values: Vec<String> = alloc::vec![];
while stmt2.step()? == ResultCode::ROW {
let name = stmt2.column_text(0)?;
let type_name = stmt2.column_text(1)?;
let ascending = stmt2.column_int(2) != 0;

if ascending {
let value = format!(
for table in &schema.tables {
let table_name = table.internal_name();

for index in &table.indexes {
let index_name = format!("{}__{}", table_name, &index.name);

let existing_sql = {
find_index.reset()?;
find_index.bind_text(1, &index_name, sqlite::Destructor::STATIC)?;

let result = if let ResultCode::ROW = find_index.step()? {
Some(find_index.column_text(0)?)
} else {
None
};

result
};

let mut column_values: Vec<String> = alloc::vec![];
for indexed_column in &index.columns {
let mut value = format!(
"CAST(json_extract(data, {:}) as {:})",
quote_json_path(name),
type_name
);
column_values.push(value);
} else {
let value = format!(
"CAST(json_extract(data, {:}) as {:}) DESC",
quote_json_path(name),
type_name
quote_json_path(&indexed_column.name),
&indexed_column.type_name
);

if !indexed_column.ascending {
value += " DESC";
}

column_values.push(value);
}
}

let sql = format!(
"CREATE INDEX {} ON {}({})",
quote_identifier(index_name),
quote_identifier(table_name),
column_values.join(", ")
);
if existing_sql == "" {
statements.push(sql);
} else if existing_sql != sql {
statements.push(format!("DROP INDEX {}", quote_identifier(index_name)));
statements.push(sql);
let sql = format!(
"CREATE INDEX {} ON {}({})",
quote_identifier(&index_name),
quote_identifier(&table_name),
column_values.join(", ")
);

if existing_sql.is_none() {
statements.push(sql);
} else if existing_sql != Some(&sql) {
statements.push(format!("DROP INDEX {}", quote_identifier(&index_name)));
statements.push(sql);
}

expected_index_names.push(index_name);
}
}

// In a block so that the statement is finalized before dropping indexes
// language=SQLite
let statement = db.prepare_v2("\
WITH schema_indexes AS (
SELECT
powersync_internal_table_name(tables.value) as table_name,
(powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name
FROM json_each(json_extract(?1, '$.tables')) tables
CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes
)
let statement = db
.prepare_v2(
"\
SELECT
sqlite_master.name as index_name
FROM sqlite_master
WHERE sqlite_master.type = 'index'
AND sqlite_master.name GLOB 'ps_data_*'
AND sqlite_master.name NOT IN (SELECT index_name FROM schema_indexes)
").into_db_result(db)?;
statement.bind_text(1, schema, sqlite::Destructor::STATIC)?;
AND sqlite_master.name NOT IN (SELECT value FROM json_each(?))
",
)
.into_db_result(db)?;
let json_names = serde_json::to_string(&expected_index_names)?;
statement.bind_text(1, &json_names, sqlite::Destructor::STATIC)?;

while statement.step()? == ResultCode::ROW {
let name = statement.column_text(0)?;
Expand Down
11 changes: 8 additions & 3 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
mod management;
mod table_info;

use alloc::vec::Vec;
use serde::Deserialize;
use sqlite::ResultCode;
use sqlite_nostd as sqlite;
pub use table_info::{
ColumnInfo, ColumnNameAndTypeStatement, DiffIncludeOld, TableInfo, TableInfoFlags,
};
pub use table_info::{Column, DiffIncludeOld, Table, TableInfoFlags};

#[derive(Deserialize)]
pub struct Schema {
tables: Vec<table_info::Table>,
}

pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> {
management::register(db)
Expand Down
Loading