Skip to content

Commit 242636f

Browse files
DominicGBauerDominicGBauer
andauthored
fix: attachment queue duplicating requests (#68)
* chore: improve attachment queue * chore: update changelog * chore: pr feedback --------- Co-authored-by: DominicGBauer <[email protected]>
1 parent bcb3bea commit 242636f

File tree

9 files changed

+105
-122
lines changed

9 files changed

+105
-122
lines changed

demos/supabase-todolist/lib/attachments/queue.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
7474
return results.map((row) => row['photo_id'] as String).toList();
7575
}).listen((ids) async {
7676
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
77-
for (String id in ids) {
78-
await syncingService.reconcileId(id, idsInQueue, fileExtension);
79-
}
77+
List<String> relevantIds =
78+
ids.where((element) => !idsInQueue.contains(element)).toList();
79+
syncingService.processIds(relevantIds, fileExtension);
8080
});
8181
}
8282
}

demos/supabase-todolist/pubspec.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ packages:
429429
path: "../../packages/powersync_attachments_helper"
430430
relative: true
431431
source: path
432-
version: "0.2.1"
432+
version: "0.3.0"
433433
realtime_client:
434434
dependency: transitive
435435
description:

packages/powersync_attachments_helper/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.3.0
2+
3+
- BREAKING CHANGE: `reconcileId` has been removed in favour of `reconcileIds`. This will require a change to `watchIds` implementation which is shown in `example/getting_started.dart`
4+
- Improved queue so that uploads, downloads and deletes do not happen multiple times
5+
16
## 0.2.1
27

38
- Added `onUploadError` as an optional function that can be set when setting up the queue to handle upload errors

packages/powersync_attachments_helper/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,17 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
6262
// This watcher will handle adding items to the queue based on
6363
// a users table element receiving a photoId
6464
@override
65-
StreamSubscription<void> watchIds() {
65+
StreamSubscription<void> watchIds({String fileExtension = 'jpg'}) {
6666
return db.watch('''
6767
SELECT photo_id FROM users
6868
WHERE photo_id IS NOT NULL
6969
''').map((results) {
7070
return results.map((row) => row['photo_id'] as String).toList();
7171
}).listen((ids) async {
7272
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
73-
for (String id in ids) {
74-
await syncingService.reconcileId(id, idsInQueue);
75-
}
73+
List<String> relevantIds =
74+
ids.where((element) => !idsInQueue.contains(element)).toList();
75+
syncingService.processIds(relevantIds, fileExtension);
7676
});
7777
}
7878
}

packages/powersync_attachments_helper/example/getting_started.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ class PhotoAttachmentQueue extends AbstractAttachmentQueue {
5252
return results.map((row) => row['photo_id'] as String).toList();
5353
}).listen((ids) async {
5454
List<String> idsInQueue = await attachmentsService.getAttachmentIds();
55-
for (String id in ids) {
56-
await syncingService.reconcileId(id, idsInQueue, fileExtension);
57-
}
55+
List<String> relevantIds =
56+
ids.where((element) => !idsInQueue.contains(element)).toList();
57+
syncingService.processIds(relevantIds, fileExtension);
5858
});
5959
}
6060
}

packages/powersync_attachments_helper/lib/src/attachments_queue.dart

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ abstract class AbstractAttachmentQueue {
6767
await localStorage.makeDir(await getStorageDirectory());
6868

6969
watchIds();
70-
syncingService.watchUploads();
71-
syncingService.watchDownloads();
72-
syncingService.watchDeletes();
70+
syncingService.watchAttachments();
7371

7472
db.statusStream.listen((status) {
7573
if (db.currentStatus.connected) {
@@ -79,9 +77,7 @@ abstract class AbstractAttachmentQueue {
7977
}
8078

8179
_trigger() async {
82-
await syncingService.runDownloads();
83-
await syncingService.runDeletes();
84-
await syncingService.runUploads();
80+
await syncingService.runSync();
8581
}
8682

8783
/// Returns the local file path for the given filename, used to store in the database.

packages/powersync_attachments_helper/lib/src/attachments_service.dart

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,47 @@ class AttachmentsService {
5959
return updatedRecord;
6060
}
6161

62+
/// Save the attachments to the attachment queue.
63+
Future<void> saveAttachments(List<Attachment> attachments) async {
64+
if (attachments.isEmpty) {
65+
return;
66+
}
67+
List<List<String>> ids = List.empty(growable: true);
68+
69+
RegExp extractObjectValueRegEx = RegExp(r': (.*?)(?:,|$)');
70+
71+
// This adds a timestamp to the attachments and
72+
// extracts the values from the attachment object
73+
// e.g "foo: bar, baz: qux" => ["bar", "qux"]
74+
// TODO: Extract value without needing to use regex
75+
List<List<String?>> updatedRecords = attachments
76+
.map((attachment) {
77+
ids.add([attachment.id]);
78+
return attachment.copyWith(
79+
timestamp: DateTime.now().millisecondsSinceEpoch,
80+
);
81+
})
82+
.toList()
83+
.map((attachment) {
84+
return extractObjectValueRegEx
85+
.allMatches(attachment.toString().replaceAll('}', ''))
86+
.map((match) => match.group(1))
87+
.toList();
88+
})
89+
.toList();
90+
91+
await db.executeBatch('''
92+
INSERT OR REPLACE INTO $table
93+
(id, filename, local_uri, media_type, size, timestamp, state) VALUES (?, ?, ?, ?, ?, ?, ?)
94+
''', updatedRecords);
95+
96+
await db.executeBatch('''
97+
DELETE FROM $table WHERE id = ?
98+
''', ids);
99+
100+
return;
101+
}
102+
62103
/// Get all the ID's of attachments in the attachment queue.
63104
Future<List<String>> getAttachmentIds() async {
64105
ResultSet results =

packages/powersync_attachments_helper/lib/src/syncing_service.dart

Lines changed: 42 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SyncingService {
1919
onDownloadError;
2020
final Future<bool> Function(Attachment attachment, Object exception)?
2121
onUploadError;
22+
bool isProcessing = false;
2223

2324
SyncingService(this.db, this.remoteStorage, this.localStorage,
2425
this.attachmentsService, this.getLocalUri,
@@ -103,139 +104,79 @@ class SyncingService {
103104
}
104105
}
105106

106-
/// Function to manually run downloads for attachments marked for download
107-
/// in the attachment queue.
108-
/// Once a an attachment marked for download is found it will initiate a
109-
/// download of the file to local storage.
110-
StreamSubscription<void> watchDownloads() {
111-
log.info('Watching downloads...');
112-
return db.watch('''
113-
SELECT * FROM ${attachmentsService.table}
114-
WHERE state = ${AttachmentState.queuedDownload.index}
115-
''').map((results) {
116-
return results.map((row) => Attachment.fromRow(row));
117-
}).listen((attachments) async {
118-
for (Attachment attachment in attachments) {
119-
log.info('Downloading ${attachment.filename}');
120-
await downloadAttachment(attachment);
121-
}
122-
});
123-
}
107+
/// Handle downloading, uploading or deleting of attachments
108+
Future<void> handleSync(Iterable<Attachment> attachments) async {
109+
if (isProcessing == true) {
110+
return;
111+
}
124112

125-
/// Watcher for attachments marked for download in the attachment queue.
126-
/// Once a an attachment marked for download is found it will initiate a
127-
/// download of the file to local storage.
128-
Future<void> runDownloads() async {
129-
List<Attachment> attachments = await db.execute('''
130-
SELECT * FROM ${attachmentsService.table}
131-
WHERE state = ${AttachmentState.queuedDownload.index}
132-
''').then((results) {
133-
return results.map((row) => Attachment.fromRow(row)).toList();
134-
});
113+
isProcessing = true;
135114

136115
for (Attachment attachment in attachments) {
137-
log.info('Downloading ${attachment.filename}');
138-
await downloadAttachment(attachment);
139-
}
140-
}
141-
142-
/// Watcher for attachments marked for upload in the attachment queue.
143-
/// Once a an attachment marked for upload is found it will initiate an
144-
/// upload of the file to remote storage.
145-
StreamSubscription<void> watchUploads() {
146-
log.info('Watching uploads...');
147-
return db.watch('''
148-
SELECT * FROM ${attachmentsService.table}
149-
WHERE local_uri IS NOT NULL
150-
AND state = ${AttachmentState.queuedUpload.index}
151-
''').map((results) {
152-
return results.map((row) => Attachment.fromRow(row));
153-
}).listen((attachments) async {
154-
for (Attachment attachment in attachments) {
116+
if (AttachmentState.queuedDownload.index == attachment.state) {
117+
log.info('Downloading ${attachment.filename}');
118+
await downloadAttachment(attachment);
119+
}
120+
if (AttachmentState.queuedUpload.index == attachment.state) {
155121
log.info('Uploading ${attachment.filename}');
156122
await uploadAttachment(attachment);
157123
}
158-
});
159-
}
160-
161-
/// Function to manually run uploads for attachments marked for upload
162-
/// in the attachment queue.
163-
/// Once a an attachment marked for deletion is found it will initiate an
164-
/// upload of the file to remote storage
165-
Future<void> runUploads() async {
166-
List<Attachment> attachments = await db.execute('''
167-
SELECT * FROM ${attachmentsService.table}
168-
WHERE local_uri IS NOT NULL
169-
AND state = ${AttachmentState.queuedUpload.index}
170-
''').then((results) {
171-
return results.map((row) => Attachment.fromRow(row)).toList();
172-
});
173-
174-
for (Attachment attachment in attachments) {
175-
log.info('Uploading ${attachment.filename}');
176-
await uploadAttachment(attachment);
124+
if (AttachmentState.queuedDelete.index == attachment.state) {
125+
log.info('Deleting ${attachment.filename}');
126+
await deleteAttachment(attachment);
127+
}
177128
}
129+
130+
isProcessing = false;
178131
}
179132

180-
/// Watcher for attachments marked for deletion in the attachment queue.
181-
/// Once a an attachment marked for deletion is found it will initiate remote
182-
/// and local deletions of the file.
183-
StreamSubscription<void> watchDeletes() {
184-
log.info('Watching deletes...');
133+
/// Watcher for changes to attachments table
134+
/// Once a change is detected it will initiate a sync of the attachments
135+
StreamSubscription<void> watchAttachments() {
136+
log.info('Watching attachments...');
185137
return db.watch('''
186138
SELECT * FROM ${attachmentsService.table}
187-
WHERE state = ${AttachmentState.queuedDelete.index}
139+
WHERE state != ${AttachmentState.archived.index}
188140
''').map((results) {
189141
return results.map((row) => Attachment.fromRow(row));
190142
}).listen((attachments) async {
191-
for (Attachment attachment in attachments) {
192-
log.info('Deleting ${attachment.filename}');
193-
await deleteAttachment(attachment);
194-
}
143+
await handleSync(attachments);
195144
});
196145
}
197146

198-
/// Function to manually run deletes for attachments marked for deletion
199-
/// in the attachment queue.
200-
/// Once a an attachment marked for deletion is found it will initiate remote
201-
/// and local deletions of the file.
202-
Future<void> runDeletes() async {
147+
/// Run the sync process on all attachments
148+
Future<void> runSync() async {
203149
List<Attachment> attachments = await db.execute('''
204150
SELECT * FROM ${attachmentsService.table}
205-
WHERE state = ${AttachmentState.queuedDelete.index}
151+
WHERE state != ${AttachmentState.archived.index}
206152
''').then((results) {
207153
return results.map((row) => Attachment.fromRow(row)).toList();
208154
});
209155

210-
for (Attachment attachment in attachments) {
211-
log.info('Deleting ${attachment.filename}');
212-
await deleteAttachment(attachment);
213-
}
156+
await handleSync(attachments);
214157
}
215158

216-
/// Reconcile an ID with ID's in the attachment queue.
217-
/// If the ID is not in the queue, but the file exists locally then it is
218-
/// in local and remote storage.
219-
/// If the ID is in the queue, but the file does not exist locally then it is
220-
/// marked for download.
221-
reconcileId(String id, List<String> idsInQueue, String fileExtension) async {
222-
bool idIsInQueue = idsInQueue.contains(id);
159+
/// Process ID's to be included in the attachment queue.
160+
processIds(List<String> ids, String fileExtension) async {
161+
List<Attachment> attachments = List.empty(growable: true);
223162

224-
String path = await getLocalUri('$id.$fileExtension');
225-
File file = File(path);
226-
bool fileExists = await file.exists();
163+
for (String id in ids) {
164+
String path = await getLocalUri('$id.$fileExtension');
165+
File file = File(path);
166+
bool fileExists = await file.exists();
227167

228-
if (!idIsInQueue) {
229168
if (fileExists) {
230169
log.info('ignore file $id.$fileExtension as it already exists');
231170
return;
232171
}
172+
233173
log.info('Adding $id to queue');
234-
return await attachmentsService.saveAttachment(Attachment(
235-
id: id,
236-
filename: '$id.$fileExtension',
237-
state: AttachmentState.queuedDownload.index,
238-
));
174+
attachments.add(Attachment(
175+
id: id,
176+
filename: '$id.$fileExtension',
177+
state: AttachmentState.queuedDownload.index));
239178
}
179+
180+
await attachmentsService.saveAttachments(attachments);
240181
}
241182
}

packages/powersync_attachments_helper/pubspec.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: powersync_attachments_helper
22
description: A helper library for handling attachments when using PowerSync.
3-
version: 0.2.1
3+
version: 0.3.0
44
repository: https://github.com/powersync-ja/powersync.dart
55
homepage: https://www.powersync.com/
66
environment:
@@ -10,14 +10,14 @@ dependencies:
1010
flutter:
1111
sdk: flutter
1212

13-
powersync: ^1.1.0
13+
powersync: ^1.2.2
1414
logging: ^1.2.0
1515
sqlite_async: ^0.6.0
16-
path_provider: ^2.1.1
16+
path_provider: ^2.1.2
1717

1818
dev_dependencies:
1919
lints: ^3.0.0
20-
test: ^1.25.0
20+
test: ^1.25.2
2121

2222
platforms:
2323
android:

0 commit comments

Comments
 (0)