diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index a68d891a..105a7fc5 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -10,7 +10,10 @@ use crate::{ operations::delete_bucket, schema::Schema, state::DatabaseState, - sync::checkpoint::{validate_checkpoint, ChecksumMismatch}, + sync::{ + checkpoint::{validate_checkpoint, ChecksumMismatch}, + sync_status::SyncPriorityStatus, + }, sync_local::{PartialSyncOperation, SyncOperation}, }; @@ -68,6 +71,32 @@ impl StorageAdapter { Ok(requests) } + pub fn collect_sync_state(&self) -> Result, PowerSyncError> { + // language=SQLite + let statement = self + .db + .prepare_v2( + "SELECT priority, unixepoch(last_synced_at) FROM ps_sync_state ORDER BY priority", + ) + .into_db_result(self.db)?; + + let mut items = Vec::::new(); + while statement.step()? == ResultCode::ROW { + let priority = BucketPriority { + number: statement.column_int(0), + }; + let timestamp = statement.column_int64(1); + + items.push(SyncPriorityStatus { + priority, + last_synced_at: Some(Timestamp(timestamp)), + has_synced: Some(true), + }); + } + + return Ok(items); + } + pub fn delete_buckets<'a>( &self, buckets: impl IntoIterator, diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index a344cb68..7e122d63 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -577,8 +577,11 @@ impl StreamingSyncIteration { )); }; - self.status - .update(|s| s.start_connecting(), &mut event.instructions); + let sync_state = self.adapter.collect_sync_state()?; + self.status.update( + move |s| s.start_connecting(sync_state), + &mut event.instructions, + ); let requests = self.adapter.collect_bucket_requests()?; let local_bucket_names: Vec = requests.iter().map(|s| s.name.clone()).collect(); diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 89a86a3b..9fc2c164 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -46,10 +46,12 @@ impl DownloadSyncStatus { self.downloading = None; } - pub fn start_connecting(&mut self) { + pub fn start_connecting(&mut self, status: Vec) { self.connected = false; self.downloading = None; self.connecting = true; + self.priority_status = status; + self.debug_assert_priority_status_is_sorted(); } pub fn mark_connected(&mut self) { @@ -161,9 +163,9 @@ pub struct Timestamp(pub i64); #[derive(Serialize, Hash)] pub struct SyncPriorityStatus { - priority: BucketPriority, - last_synced_at: Option, - has_synced: Option, + pub priority: BucketPriority, + pub last_synced_at: Option, + pub has_synced: Option, } /// Per-bucket download progress information. diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index a39d9538..3a0fe27a 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -301,6 +301,46 @@ void _syncTests({ } }); + syncTest('remembers sync state', (controller) { + invokeControl('start', null); + + pushCheckpoint(buckets: priorityBuckets); + pushCheckpointComplete(); + + controller.elapse(Duration(minutes: 10)); + pushCheckpoint(buckets: priorityBuckets); + pushCheckpointComplete(priority: 2); + invokeControl('stop', null); + + final instructions = invokeControl('start', null); + expect( + instructions, + contains( + containsPair( + 'UpdateSyncStatus', + containsPair( + 'status', + containsPair( + 'priority_status', + [ + { + 'priority': 2, + 'last_synced_at': 1740823800, + 'has_synced': true + }, + { + 'priority': 2147483647, + 'last_synced_at': 1740823200, + 'has_synced': true + }, + ], + ), + ), + ), + ), + ); + }); + test('clearing database clears sync status', () { invokeControl('start', null); pushCheckpoint(buckets: priorityBuckets);