diff --git a/demos/supabase-todolist/android/app/src/main/kotlin/com/powersync/powersync_flutter_demo/MainActivity.kt b/demos/supabase-todolist/android/app/src/main/kotlin/com/powersync/powersync_flutter_demo/MainActivity.kt new file mode 100644 index 00000000..46cf42a6 --- /dev/null +++ b/demos/supabase-todolist/android/app/src/main/kotlin/com/powersync/powersync_flutter_demo/MainActivity.kt @@ -0,0 +1,5 @@ +package com.powersync.powersync_flutter_demo + +import io.flutter.embedding.android.FlutterActivity + +class MainActivity : FlutterActivity() diff --git a/demos/supabase-todolist/lib/attachments/photo_capture_widget.dart b/demos/supabase-todolist/lib/attachments/photo_capture_widget.dart index 38838dd7..61d26ea6 100644 --- a/demos/supabase-todolist/lib/attachments/photo_capture_widget.dart +++ b/demos/supabase-todolist/lib/attachments/photo_capture_widget.dart @@ -1,18 +1,16 @@ import 'dart:async'; - +import 'dart:io'; +import 'dart:typed_data'; import 'package:camera/camera.dart'; import 'package:flutter/material.dart'; -import 'package:powersync/powersync.dart' as powersync; +import 'package:logging/logging.dart'; import 'package:powersync_flutter_demo/attachments/queue.dart'; -import 'package:powersync_flutter_demo/models/todo_item.dart'; -import 'package:powersync_flutter_demo/powersync.dart'; class TakePhotoWidget extends StatefulWidget { final String todoId; final CameraDescription camera; - const TakePhotoWidget( - {super.key, required this.todoId, required this.camera}); + const TakePhotoWidget({super.key, required this.todoId, required this.camera}); @override State createState() { @@ -23,6 +21,7 @@ class TakePhotoWidget extends StatefulWidget { class _TakePhotoWidgetState extends State { late CameraController _cameraController; late Future _initializeControllerFuture; + final log = Logger('TakePhotoWidget'); @override void initState() { @@ -37,7 +36,6 @@ class _TakePhotoWidgetState extends State { } @override - // Dispose of the camera controller when the widget is disposed void dispose() { _cameraController.dispose(); super.dispose(); @@ -45,25 +43,26 @@ class _TakePhotoWidgetState extends State { Future _takePhoto(context) async { try { - // Ensure the camera is initialized before taking a photo + log.info('Taking photo for todo: ${widget.todoId}'); await _initializeControllerFuture; - final XFile photo = await _cameraController.takePicture(); - // copy photo to new directory with ID as name - String photoId = powersync.uuid.v4(); - String storageDirectory = await attachmentQueue.getStorageDirectory(); - await attachmentQueue.localStorage - .copyFile(photo.path, '$storageDirectory/$photoId.jpg'); - - int photoSize = await photo.length(); - - TodoItem.addPhoto(photoId, widget.todoId); - attachmentQueue.saveFile(photoId, photoSize); + + // Read the photo data as a stream + final photoFile = File(photo.path); + if (!await photoFile.exists()) { + log.warning('Photo file does not exist: ${photo.path}'); + return; + } + + final photoDataStream = photoFile.openRead().cast(); + + // Save the photo attachment directly with the data stream + final attachment = await savePhotoAttachment(photoDataStream, widget.todoId); + + log.info('Photo attachment saved with ID: ${attachment.id}'); } catch (e) { - log.info('Error taking photo: $e'); + log.severe('Error taking photo: $e'); } - - // After taking the photo, navigate back to the previous screen Navigator.pop(context); } diff --git a/demos/supabase-todolist/lib/attachments/photo_widget.dart b/demos/supabase-todolist/lib/attachments/photo_widget.dart index f034ef5b..ecf5a94e 100644 --- a/demos/supabase-todolist/lib/attachments/photo_widget.dart +++ b/demos/supabase-todolist/lib/attachments/photo_widget.dart @@ -1,7 +1,8 @@ import 'dart:io'; - +import 'package:path_provider/path_provider.dart'; +import 'package:path/path.dart' as p; import 'package:flutter/material.dart'; -import 'package:powersync_attachments_helper/powersync_attachments_helper.dart'; +import 'package:powersync_attachments_stream/powersync_attachments_stream.dart'; import 'package:powersync_flutter_demo/attachments/camera_helpers.dart'; import 'package:powersync_flutter_demo/attachments/photo_capture_widget.dart'; import 'package:powersync_flutter_demo/attachments/queue.dart'; @@ -37,7 +38,8 @@ class _PhotoWidgetState extends State { if (photoId == null) { return _ResolvedPhotoState(photoPath: null, fileExists: false); } - photoPath = await attachmentQueue.getLocalUri('$photoId.jpg'); + final appDocDir = await getApplicationDocumentsDirectory(); + photoPath = p.join(appDocDir.path, '$photoId.jpg'); bool fileExists = await File(photoPath).exists(); diff --git a/demos/supabase-todolist/lib/attachments/queue.dart b/demos/supabase-todolist/lib/attachments/queue.dart index 2a8dd9ca..179c104f 100644 --- a/demos/supabase-todolist/lib/attachments/queue.dart +++ b/demos/supabase-todolist/lib/attachments/queue.dart @@ -1,90 +1,62 @@ import 'dart:async'; - +import 'dart:io'; +import 'dart:typed_data'; +import 'package:logging/logging.dart'; +import 'package:path_provider/path_provider.dart'; import 'package:powersync/powersync.dart'; -import 'package:powersync_attachments_helper/powersync_attachments_helper.dart'; -import 'package:powersync_flutter_demo/app_config.dart'; +import 'package:powersync_attachments_stream/powersync_attachments_stream.dart'; import 'package:powersync_flutter_demo/attachments/remote_storage_adapter.dart'; -import 'package:powersync_flutter_demo/models/schema.dart'; - -/// Global reference to the queue -late final PhotoAttachmentQueue attachmentQueue; +late AttachmentQueue attachmentQueue; final remoteStorage = SupabaseStorageAdapter(); - -/// Function to handle errors when downloading attachments -/// Return false if you want to archive the attachment -Future onDownloadError(Attachment attachment, Object exception) async { - if (exception.toString().contains('Object not found')) { - return false; - } - return true; +final logger = Logger('AttachmentQueue'); + +Future initializeAttachmentQueue(PowerSyncDatabase db) async { + // Use the app's document directory for local storage + final Directory appDocDir = await getApplicationDocumentsDirectory(); + + attachmentQueue = AttachmentQueue( + db: db, + remoteStorage: remoteStorage, + logger: logger, + localStorage: IOLocalStorage(appDocDir.path), + watchAttachments: () => db.watch(''' + SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL + ''').map((results) => results + .map((row) => WatchedAttachmentItem( + id: row['id'] as String, + fileExtension: 'jpg', + )) + .toList()), + ); + + await attachmentQueue.startSync(); } -class PhotoAttachmentQueue extends AbstractAttachmentQueue { - PhotoAttachmentQueue(db, remoteStorage) - : super( - db: db, - remoteStorage: remoteStorage, - onDownloadError: onDownloadError); - - @override - init() async { - if (AppConfig.supabaseStorageBucket.isEmpty) { - log.info( - 'No Supabase bucket configured, skip setting up PhotoAttachmentQueue watches'); - return; - } - - await super.init(); - } - - @override - Future saveFile(String fileId, int size, - {mediaType = 'image/jpeg'}) async { - String filename = '$fileId.jpg'; - - Attachment photoAttachment = Attachment( - id: fileId, - filename: filename, - state: AttachmentState.queuedUpload.index, - mediaType: mediaType, - localUri: getLocalFilePathSuffix(filename), - size: size, - ); - - return attachmentsService.saveAttachment(photoAttachment); - } - - @override - Future deleteFile(String fileId) async { - String filename = '$fileId.jpg'; - - Attachment photoAttachment = Attachment( - id: fileId, - filename: filename, - state: AttachmentState.queuedDelete.index); - - return attachmentsService.saveAttachment(photoAttachment); - } - - @override - StreamSubscription watchIds({String fileExtension = 'jpg'}) { - log.info('Watching photos in $todosTable...'); - return db.watch(''' - SELECT photo_id FROM $todosTable - WHERE photo_id IS NOT NULL - ''').map((results) { - return results.map((row) => row['photo_id'] as String).toList(); - }).listen((ids) async { - List idsInQueue = await attachmentsService.getAttachmentIds(); - List relevantIds = - ids.where((element) => !idsInQueue.contains(element)).toList(); - syncingService.processIds(relevantIds, fileExtension); - }); - } +Future savePhotoAttachment( + Stream photoData, String todoId, + {String mediaType = 'image/jpeg'}) async { + // Save the file using the AttachmentQueue API + return await attachmentQueue.saveFile( + data: photoData, + mediaType: mediaType, + fileExtension: 'jpg', + metaData: 'Photo attachment for todo: $todoId', + updateHook: (context, attachment) async { + // Update the todo item to reference this attachment + await context.execute( + 'UPDATE todos SET photo_id = ? WHERE id = ?', + [attachment.id, todoId], + ); + }, + ); } -initializeAttachmentQueue(PowerSyncDatabase db) async { - attachmentQueue = PhotoAttachmentQueue(db, remoteStorage); - await attachmentQueue.init(); +Future deletePhotoAttachment(String fileId) async { + return await attachmentQueue.deleteFile( + attachmentId: fileId, + updateHook: (context, attachment) async { + // Optionally update relationships in the same transaction + }, + ); } diff --git a/demos/supabase-todolist/lib/attachments/remote_storage_adapter.dart b/demos/supabase-todolist/lib/attachments/remote_storage_adapter.dart index 596c5da5..0a4da3b7 100644 --- a/demos/supabase-todolist/lib/attachments/remote_storage_adapter.dart +++ b/demos/supabase-todolist/lib/attachments/remote_storage_adapter.dart @@ -1,49 +1,95 @@ import 'dart:io'; import 'dart:typed_data'; -import 'package:powersync_attachments_helper/powersync_attachments_helper.dart'; +import 'package:powersync_attachments_stream/powersync_attachments_stream.dart'; import 'package:powersync_flutter_demo/app_config.dart'; import 'package:supabase_flutter/supabase_flutter.dart'; -import 'package:image/image.dart' as img; +import 'package:logging/logging.dart'; class SupabaseStorageAdapter implements AbstractRemoteStorageAdapter { + static final _log = Logger('SupabaseStorageAdapter'); + @override - Future uploadFile(String filename, File file, - {String mediaType = 'text/plain'}) async { + Future uploadFile( + Stream> fileData, Attachment attachment) async { _checkSupabaseBucketIsConfigured(); + // Check if attachment size is specified (required for buffer allocation) + final byteSize = attachment.size; + if (byteSize == null) { + throw Exception('Cannot upload a file with no byte size specified'); + } + + _log.info('uploadFile: ${attachment.filename} (size: $byteSize bytes)'); + + // Collect all stream data into a single Uint8List buffer + final buffer = Uint8List(byteSize); + var position = 0; + + await for (final chunk in fileData) { + if (position + chunk.length > byteSize) { + throw Exception('File data exceeds specified size'); + } + buffer.setRange(position, position + chunk.length, chunk); + position += chunk.length; + } + + if (position != byteSize) { + throw Exception( + 'File data size ($position) does not match specified size ($byteSize)'); + } + + // Create a temporary file from the buffer for upload + final tempFile = + File('${Directory.systemTemp.path}/${attachment.filename}'); try { + await tempFile.writeAsBytes(buffer); + await Supabase.instance.client.storage .from(AppConfig.supabaseStorageBucket) - .upload(filename, file, - fileOptions: FileOptions(contentType: mediaType)); + .upload(attachment.filename, tempFile, + fileOptions: FileOptions( + contentType: + attachment.mediaType ?? 'application/octet-stream')); + + _log.info('Successfully uploaded ${attachment.filename}'); } catch (error) { + _log.severe('Error uploading ${attachment.filename}', error); throw Exception(error); + } finally { + if (await tempFile.exists()) { + await tempFile.delete(); + } } } @override - Future downloadFile(String filePath) async { + Future>> downloadFile(Attachment attachment) async { _checkSupabaseBucketIsConfigured(); try { + _log.info('downloadFile: ${attachment.filename}'); + Uint8List fileBlob = await Supabase.instance.client.storage .from(AppConfig.supabaseStorageBucket) - .download(filePath); - final image = img.decodeImage(fileBlob); - Uint8List blob = img.JpegEncoder().encode(image!); - return blob; + .download(attachment.filename); + + _log.info( + 'Successfully downloaded ${attachment.filename} (${fileBlob.length} bytes)'); + + // Return the raw file data as a stream + return Stream.value(fileBlob); } catch (error) { + _log.severe('Error downloading ${attachment.filename}', error); throw Exception(error); } } @override - Future deleteFile(String filename) async { + Future deleteFile(Attachment attachment) async { _checkSupabaseBucketIsConfigured(); - try { await Supabase.instance.client.storage .from(AppConfig.supabaseStorageBucket) - .remove([filename]); + .remove([attachment.filename]); } catch (error) { throw Exception(error); } diff --git a/demos/supabase-todolist/lib/models/schema.dart b/demos/supabase-todolist/lib/models/schema.dart index 89b69b0c..0ca9722d 100644 --- a/demos/supabase-todolist/lib/models/schema.dart +++ b/demos/supabase-todolist/lib/models/schema.dart @@ -1,5 +1,5 @@ import 'package:powersync/powersync.dart'; -import 'package:powersync_attachments_helper/powersync_attachments_helper.dart'; +import 'package:powersync_attachments_stream/powersync_attachments_stream.dart'; const todosTable = 'todos'; diff --git a/demos/supabase-todolist/lib/widgets/todo_item_widget.dart b/demos/supabase-todolist/lib/widgets/todo_item_widget.dart index a59812ed..c8d82a5f 100644 --- a/demos/supabase-todolist/lib/widgets/todo_item_widget.dart +++ b/demos/supabase-todolist/lib/widgets/todo_item_widget.dart @@ -23,7 +23,13 @@ class TodoItemWidget extends StatelessWidget { Future deleteTodo(TodoItem todo) async { if (todo.photoId != null) { - attachmentQueue.deleteFile(todo.photoId!); + + await attachmentQueue.deleteFile( + attachmentId: todo.photoId!, + updateHook: (context, attachment) async { + // await context.execute("UPDATE todos SET photo_id = NULL WHERE id = ?", [todo.id]); + }, + ); } await todo.delete(); } diff --git a/demos/supabase-todolist/pubspec.lock b/demos/supabase-todolist/pubspec.lock index cea2023c..0237ed2a 100644 --- a/demos/supabase-todolist/pubspec.lock +++ b/demos/supabase-todolist/pubspec.lock @@ -1,6 +1,22 @@ # Generated by pub # See https://dart.dev/tools/pub/glossary#lockfile packages: + _fe_analyzer_shared: + dependency: transitive + description: + name: _fe_analyzer_shared + sha256: da0d9209ca76bde579f2da330aeb9df62b6319c834fa7baae052021b0462401f + url: "https://pub.dev" + source: hosted + version: "85.0.0" + analyzer: + dependency: transitive + description: + name: analyzer + sha256: b1ade5707ab7a90dfd519eaac78a7184341d19adb6096c68d499b59c7c6cf880 + url: "https://pub.dev" + source: hosted + version: "7.7.0" app_links: dependency: transitive description: @@ -121,6 +137,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.0.3" + cli_config: + dependency: transitive + description: + name: cli_config + sha256: ac20a183a07002b700f0c25e61b7ee46b23c309d76ab7b7640a028f18e4d99ec + url: "https://pub.dev" + source: hosted + version: "0.2.0" clock: dependency: transitive description: @@ -137,6 +161,22 @@ packages: url: "https://pub.dev" source: hosted version: "1.19.1" + convert: + dependency: transitive + description: + name: convert + sha256: b30acd5944035672bc15c6b7a8b47d773e41e2f17de064350988c5d02adb1c68 + url: "https://pub.dev" + source: hosted + version: "3.1.2" + coverage: + dependency: transitive + description: + name: coverage + sha256: "5da775aa218eaf2151c721b16c01c7676fbfdd99cebba2bf64e8b807a28ff94d" + url: "https://pub.dev" + source: hosted + version: "1.15.0" cross_file: dependency: transitive description: @@ -216,6 +256,14 @@ packages: description: flutter source: sdk version: "0.0.0" + frontend_server_client: + dependency: transitive + description: + name: frontend_server_client + sha256: f64a0333a82f30b0cca061bc3d143813a486dc086b574bfb233b7c1372427694 + url: "https://pub.dev" + source: hosted + version: "4.0.0" functions_client: dependency: transitive description: @@ -224,6 +272,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.4.2" + glob: + dependency: transitive + description: + name: glob + sha256: c3f1ee72c96f8f78935e18aa8cecced9ab132419e8625dc187e1c2408efc20de + url: "https://pub.dev" + source: hosted + version: "2.1.3" gotrue: dependency: transitive description: @@ -248,6 +304,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.4.0" + http_multi_server: + dependency: transitive + description: + name: http_multi_server + sha256: aa6199f908078bb1c5efb8d8638d4ae191aac11b311132c3ef48ce352fb52ef8 + url: "https://pub.dev" + source: hosted + version: "3.2.2" http_parser: dependency: transitive description: @@ -264,6 +328,22 @@ packages: url: "https://pub.dev" source: hosted version: "4.5.4" + io: + dependency: transitive + description: + name: io + sha256: dfd5a80599cf0165756e3181807ed3e77daf6dd4137caaad72d0b7931597650b + url: "https://pub.dev" + source: hosted + version: "1.0.5" + js: + dependency: transitive + description: + name: js + sha256: "53385261521cc4a0c4658fd0ad07a7d14591cf8fc33abbceae306ddb974888dc" + url: "https://pub.dev" + source: hosted + version: "0.7.2" json_annotation: dependency: transitive description: @@ -360,6 +440,22 @@ packages: url: "https://pub.dev" source: hosted version: "3.1.0" + node_preamble: + dependency: transitive + description: + name: node_preamble + sha256: "6e7eac89047ab8a8d26cf16127b5ed26de65209847630400f9aefd7cd5c730db" + url: "https://pub.dev" + source: hosted + version: "2.0.2" + package_config: + dependency: transitive + description: + name: package_config + sha256: f096c55ebb7deb7e384101542bfba8c52696c1b56fca2eb62827989ef2353bbc + url: "https://pub.dev" + source: hosted + version: "2.2.0" path: dependency: "direct main" description: @@ -440,6 +536,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.8" + pool: + dependency: transitive + description: + name: pool + sha256: "20fe868b6314b322ea036ba325e6fc0711a22948856475e2c2b6306e8ab39c2a" + url: "https://pub.dev" + source: hosted + version: "1.5.1" posix: dependency: transitive description: @@ -464,12 +568,19 @@ packages: source: path version: "1.15.0" powersync_attachments_helper: - dependency: "direct main" + dependency: "direct overridden" description: path: "../../packages/powersync_attachments_helper" relative: true source: path version: "0.6.18+11" + powersync_attachments_stream: + dependency: "direct main" + description: + path: "../../packages/powersync_attachments_stream" + relative: true + source: path + version: "0.0.1" powersync_core: dependency: "direct overridden" description: @@ -580,11 +691,59 @@ packages: url: "https://pub.dev" source: hosted version: "2.4.1" + shelf: + dependency: transitive + description: + name: shelf + sha256: e7dd780a7ffb623c57850b33f43309312fc863fb6aa3d276a754bb299839ef12 + url: "https://pub.dev" + source: hosted + version: "1.4.2" + shelf_packages_handler: + dependency: transitive + description: + name: shelf_packages_handler + sha256: "89f967eca29607c933ba9571d838be31d67f53f6e4ee15147d5dc2934fee1b1e" + url: "https://pub.dev" + source: hosted + version: "3.0.2" + shelf_static: + dependency: transitive + description: + name: shelf_static + sha256: c87c3875f91262785dade62d135760c2c69cb217ac759485334c5857ad89f6e3 + url: "https://pub.dev" + source: hosted + version: "1.1.3" + shelf_web_socket: + dependency: transitive + description: + name: shelf_web_socket + sha256: "3632775c8e90d6c9712f883e633716432a27758216dfb61bd86a8321c0580925" + url: "https://pub.dev" + source: hosted + version: "3.0.0" sky_engine: dependency: transitive description: flutter source: sdk version: "0.0.0" + source_map_stack_trace: + dependency: transitive + description: + name: source_map_stack_trace + sha256: c0713a43e323c3302c2abe2a1cc89aa057a387101ebd280371d6a6c9fa68516b + url: "https://pub.dev" + source: hosted + version: "2.1.2" + source_maps: + dependency: transitive + description: + name: source_maps + sha256: "190222579a448b03896e0ca6eca5998fa810fda630c1d65e2f78b3f638f54812" + url: "https://pub.dev" + source: hosted + version: "0.10.13" source_span: dependency: transitive description: @@ -697,6 +856,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.2.2" + test: + dependency: "direct dev" + description: + name: test + sha256: "301b213cd241ca982e9ba50266bd3f5bd1ea33f1455554c5abb85d1be0e2d87e" + url: "https://pub.dev" + source: hosted + version: "1.25.15" test_api: dependency: transitive description: @@ -705,6 +872,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.7.4" + test_core: + dependency: transitive + description: + name: test_core + sha256: "84d17c3486c8dfdbe5e12a50c8ae176d15e2a771b96909a9442b40173649ccaa" + url: "https://pub.dev" + source: hosted + version: "0.6.8" typed_data: dependency: transitive description: @@ -809,6 +984,14 @@ packages: url: "https://pub.dev" source: hosted version: "15.0.0" + watcher: + dependency: transitive + description: + name: watcher + sha256: "0b7fd4a0bbc4b92641dbf20adfd7e3fd1398fe17102d94b674234563e110088a" + url: "https://pub.dev" + source: hosted + version: "1.1.2" web: dependency: transitive description: @@ -833,6 +1016,14 @@ packages: url: "https://pub.dev" source: hosted version: "3.0.3" + webkit_inspection_protocol: + dependency: transitive + description: + name: webkit_inspection_protocol + sha256: "87d3f2333bb240704cd3f1c6b5b7acd8a10e7f0bc28c28dcf14e782014f4a572" + url: "https://pub.dev" + source: hosted + version: "1.2.1" xdg_directories: dependency: transitive description: @@ -866,5 +1057,5 @@ packages: source: hosted version: "2.1.0" sdks: - dart: ">=3.7.0 <4.0.0" + dart: ">=3.8.1 <4.0.0" flutter: ">=3.27.0" diff --git a/demos/supabase-todolist/pubspec.yaml b/demos/supabase-todolist/pubspec.yaml index 725e4d64..abd5987b 100644 --- a/demos/supabase-todolist/pubspec.yaml +++ b/demos/supabase-todolist/pubspec.yaml @@ -10,7 +10,8 @@ environment: dependencies: flutter: sdk: flutter - powersync_attachments_helper: ^0.6.18+11 + powersync_attachments_stream: + path: ../../packages/powersync_attachments_stream powersync: ^1.15.0 path_provider: ^2.1.1 supabase_flutter: ^2.0.1 @@ -26,6 +27,7 @@ dev_dependencies: sdk: flutter flutter_lints: ^3.0.1 + test: ^1.25.15 flutter: uses-material-design: true diff --git a/demos/supabase-todolist/test/widget_test.dart b/demos/supabase-todolist/test/widget_test.dart new file mode 100644 index 00000000..77913e2e --- /dev/null +++ b/demos/supabase-todolist/test/widget_test.dart @@ -0,0 +1,30 @@ +// This is a basic Flutter widget test. +// +// To perform an interaction with a widget in your test, use the WidgetTester +// utility in the flutter_test package. For example, you can send tap and scroll +// gestures. You can also use WidgetTester to find child widgets in the widget +// tree, read text, and verify that the values of widget properties are correct. + +import 'package:flutter/material.dart'; +import 'package:flutter_test/flutter_test.dart'; + +import 'package:powersync_flutter_demo/main.dart'; + +void main() { + testWidgets('Counter increments smoke test', (WidgetTester tester) async { + // Build our app and trigger a frame. + await tester.pumpWidget(const MyApp(loggedIn: true,)); + + // Verify that our counter starts at 0. + expect(find.text('0'), findsOneWidget); + expect(find.text('1'), findsNothing); + + // Tap the '+' icon and trigger a frame. + await tester.tap(find.byIcon(Icons.add)); + await tester.pump(); + + // Verify that our counter has incremented. + expect(find.text('0'), findsNothing); + expect(find.text('1'), findsOneWidget); + }); +} diff --git a/packages/powersync_attachments_stream/.gitignore b/packages/powersync_attachments_stream/.gitignore new file mode 100644 index 00000000..eb6c05cd --- /dev/null +++ b/packages/powersync_attachments_stream/.gitignore @@ -0,0 +1,31 @@ +# Miscellaneous +*.class +*.log +*.pyc +*.swp +.DS_Store +.atom/ +.buildlog/ +.history +.svn/ +migrate_working_dir/ + +# IntelliJ related +*.iml +*.ipr +*.iws +.idea/ + +# The .vscode folder contains launch configuration and tasks you configure in +# VS Code which you may wish to be included in version control, so this line +# is commented out by default. +#.vscode/ + +# Flutter/Dart/Pub related +# Libraries should not include pubspec.lock, per https://dart.dev/guides/libraries/private-files#pubspeclock. +/pubspec.lock +**/doc/api/ +.dart_tool/ +.flutter-plugins +.flutter-plugins-dependencies +build/ diff --git a/packages/powersync_attachments_stream/.metadata b/packages/powersync_attachments_stream/.metadata new file mode 100644 index 00000000..8c47ce66 --- /dev/null +++ b/packages/powersync_attachments_stream/.metadata @@ -0,0 +1,10 @@ +# This file tracks properties of this Flutter project. +# Used by Flutter tool to assess capabilities and perform upgrades etc. +# +# This file should be version controlled and should not be manually edited. + +version: + revision: "d7b523b356d15fb81e7d340bbe52b47f93937323" + channel: "stable" + +project_type: package diff --git a/packages/powersync_attachments_stream/CHANGELOG.md b/packages/powersync_attachments_stream/CHANGELOG.md new file mode 100644 index 00000000..41cc7d81 --- /dev/null +++ b/packages/powersync_attachments_stream/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.0.1 + +* TODO: Describe initial release. diff --git a/packages/powersync_attachments_stream/LICENSE b/packages/powersync_attachments_stream/LICENSE new file mode 100644 index 00000000..ba75c69f --- /dev/null +++ b/packages/powersync_attachments_stream/LICENSE @@ -0,0 +1 @@ +TODO: Add your license here. diff --git a/packages/powersync_attachments_stream/README.md b/packages/powersync_attachments_stream/README.md new file mode 100644 index 00000000..f706ba5d --- /dev/null +++ b/packages/powersync_attachments_stream/README.md @@ -0,0 +1,261 @@ +# PowerSync Attachment Stream + +A [PowerSync](https://powersync.com) library to manage attachments (such as images or files) in Dart apps. + + +### Alpha Release + +Attachment stream is currently in an alpha state, intended strictly for testing. Expect breaking changes and instability as development continues. + +Do not rely on this package for production use. + +## Usage + +An `AttachmentQueue` is used to manage and sync attachments in your app. The attachments' state is stored in a local-only attachments table. + +### Key Assumptions + +- Each attachment is identified by a unique ID +- Attachments are immutable once created +- Relational data should reference attachments using a foreign key column +- Relational data should reflect the holistic state of attachments at any given time. An existing local attachment will be deleted locally if no relational data references it. + +### Example Implementation + +See the [Flutter Supabase Demo](../../demos/supabase-todolist/README.md) for a basic example of attachment syncing. + +In the example below, the user captures photos when checklist items are completed as part of an inspection workflow. + +1. First, define your schema including the `checklist` table and the local-only attachments table: + +```dart +Schema schema = Schema(([ + const Table('checklists', [ + Column.text('description'), + Column.integer('completed'), + Column.text('photo_id'), + ]), + AttachmentsQueueTable( + attachmentsQueueTableName: defaultAttachmentsQueueTableName) +])); +``` + +2. Create an `AttachmentQueue` instance. This class provides default syncing utilities and implements a default sync strategy. This class is open and can be overridden for custom functionality: + +```dart +final Directory appDocDir = await getApplicationDocumentsDirectory(); +final localStorage = IOLocalStorage('${appDocDir.path}/attachments'); + +final queue = AttachmentQueue( + db: db, + remoteStorage: remoteStorage, + localStorage: localStorage, + watchAttachments: () => db.watch(''' + SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL + ''').map((results) => results + .map((row) => WatchedAttachmentItem( + id: row['id'] as String, + fileExtension: 'jpg', + )) + .toList()), + ); +``` + +* The `localStorage` is an implementation of `AbstractLocalStorageAdapter` that specifies where and how local attachment files should be stored. For mobile and desktop apps, `IOLocalStorage` can be used, which requires a directory path. In Flutter, `path_provider`'s `getApplicationDocumentsDirectory()` with a subdirectory like `/attachments` is a good choice. +* The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` interface definition [here](https://github.com/powersync-ja/powersync.dart/blob/main/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart). +* `watchAttachments` is a `Stream` of `WatchedAttachmentItem`. The `WatchedAttachmentItem`s represent the attachments which should be present in the application. We recommend using `PowerSync`'s `watch` query as shown above. In this example, we provide the `fileExtension` for all photos. This information could also be obtained from the query if necessary. + +3. Implement a `RemoteStorageAdapter` which interfaces with a remote storage provider. This will be used for downloading, uploading, and deleting attachments: + +```dart +final remote = _RemoteStorageAdapter(); + +class _RemoteStorageAdapter implements AbstractRemoteStorageAdapter { + @override + Future uploadFile(Stream> fileData, Attachment attachment) async { + // TODO: Implement upload to your backend + } + + @override + Future>> downloadFile(Attachment attachment) async { + // TODO: Implement download from your backend + } + + @override + Future deleteFile(Attachment attachment) async { + // TODO: Implement delete in your backend + } +} +``` + +4. Start the sync process: + +```dart +await queue.startSync(); +``` + +5. Create and save attachments using `saveFile()`. This method will save the file to local storage, create an attachment record which queues the file for upload to the remote storage, and allows assigning the newly created attachment ID to a checklist item: + +```dart +await queue.saveFile( + data: photoData, + mediaType: 'image/jpg', + fileExtension: 'jpg', + metaData: 'Test meta data', + updateHook: (context, attachment) async { + // Update the todo item to reference this attachment + await context.execute( + 'UPDATE checklists SET photo_id = ? WHERE id = ?', + [attachment.id, checklistId], + ); + }, + ); +``` + +## Implementation Details + +### Attachment Table Structure + +The `AttachmentsQueueTable` class creates a **local-only table** for tracking the states and metadata of file attachments. It allows customization of the table name, additional columns, indexes, and optionally a view name. + +An attachments table definition can be created with the following options: + +| Option | Description | Default | +| ---------------------- | -------------------------------| ----------------------------| +| `attachmentsQueueTableName` | The name of the table | `defaultAttachmentsQueueTableName` | +| `additionalColumns` | Extra columns to add to the table | `[]` (empty list) | +| `indexes` | Indexes to optimize queries | `[]` (empty list) | +| `viewName` | Optional associated view name | `null` | + +The default columns included in the table are: + +| Column Name | Type | Description | +| ------------ | --------- | -------------------------------------------------------------------------------- | +| `filename` | `TEXT` | The filename of the attachment | +| `local_uri` | `TEXT` | Local file URI or path | +| `timestamp` | `INTEGER` | The timestamp of the last update to the attachment | +| `size` | `INTEGER` | File size in bytes | +| `media_type` | `TEXT` | The media (MIME) type of the attachment | +| `state` | `INTEGER` | Current state of the attachment (e.g., queued, syncing, synced) | +| `has_synced` | `INTEGER` | Internal flag indicating if the attachment has ever been synced (for caching) | +| `meta_data` | `TEXT` | Additional metadata stored as JSON | + +The class extends a base `Table` class using a `localOnly` constructor, so this table exists **only locally** on the device and is not synchronized with a remote database. + +This design allows flexible tracking and management of attachment syncing state and metadata within the local database. | + +### Attachment States + +Attachments are managed through the following states, which represent their current synchronization status with remote storage: + +| State | Description | +| ----------------- | ---------------------------------------------------------------------- | +| `queuedUpload` | Attachment is queued for upload to remote/cloud storage | +| `queuedDelete` | Attachment is queued for deletion from both remote and local storage | +| `queuedDownload` | Attachment is queued for download from remote/cloud storage | +| `synced` | Attachment is fully synchronized with remote storage | +| `archived` | Attachment is archived — no longer actively synchronized or referenced | + +--- + +The `AttachmentState` enum also provides helper methods for converting between the enum and its integer representation: + +- `AttachmentState.fromInt(int value)` — Constructs an `AttachmentState` from its corresponding integer index. Throws an `ArgumentError` if the value is out of range. +- `toInt()` — Returns the integer index of the current `AttachmentState` instance. + +### Sync Process + +The `AttachmentQueue` implements a sync process with these components: + +1. **State Monitoring**: The queue watches the attachments table for records in `queuedUpload`, `queuedDelete`, and `queuedDownload` states. An event loop triggers calls to the remote storage for these operations. + +2. **Periodic Sync**: By default, the queue triggers a sync every 30 seconds to retry failed uploads/downloads, in particular after the app was offline. This interval can be configured by setting `syncInterval` in the `AttachmentQueue` constructor options, or disabled by setting the interval to `0`. + +3. **Watching State**: The `watchAttachments` stream in the `AttachmentQueue` constructor is used to maintain consistency between local and remote states: + - New items trigger downloads - see the Download Process below. + - Missing items trigger archiving - see Cache Management below. + +### Upload Process + +The `saveFile` method handles attachment creation and upload: + +1. The attachment is saved to local storage +2. An `AttachmentRecord` is created with `queuedUpload` state, linked to the local file using `localUri` +3. The attachment must be assigned to relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +4. The `RemoteStorageAdapter` `uploadFile` function is called +5. On successful upload, the state changes to `synced` +6. If upload fails, the record stays in `queuedUpload` state for retry + +### Download Process + +Attachments are scheduled for download when the stream from `watchAttachments` emits a new item that is not present locally: + +1. An `AttachmentRecord` is created with `queuedDownload` state +2. The `RemoteStorageAdapter` `downloadFile` function is called +3. The received data is saved to local storage +4. On successful download, the state changes to `synced` +5. If download fails, the operation is retried in the next sync cycle + +### Delete Process + +The `deleteFile` method deletes attachments from both local and remote storage: + +1. The attachment record moves to `queuedDelete` state +2. The attachment must be unassigned from relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state +3. On successful deletion, the record is removed +4. If deletion fails, the operation is retried in the next sync cycle + +### Cache Management + +The `AttachmentQueue` implements a caching system for archived attachments: + +1. Local attachments are marked as `archived` if the stream from `watchAttachments` no longer references them +2. Archived attachments are kept in the cache for potential future restoration +3. The cache size is controlled by the `archivedCacheLimit` parameter in the `AttachmentQueue` constructor +4. By default, the queue keeps the last 100 archived attachment records +5. When the cache limit is reached, the oldest archived attachments are permanently deleted +6. If an archived attachment is referenced again while still in the cache, it can be restored +7. The cache limit can be configured in the `AttachmentQueue` constructor + +### Error Handling + +1. **Automatic Retries**: + - Failed uploads/downloads/deletes are automatically retried + - The sync interval (default 30 seconds) ensures periodic retry attempts + - Retries continue indefinitely until successful + +2. **Custom Error Handling**: + - A `SyncErrorHandler` can be implemented to customize retry behavior (see example below) + - The handler can decide whether to retry or archive failed operations + - Different handlers can be provided for upload, download, and delete operations + +Example of a custom `SyncErrorHandler`: + +```dart +final errorHandler = _SyncErrorHandler(); + +class _SyncErrorHandler implements AbstractSyncErrorHandler { + @override + Future onDownloadError(Attachment attachment, Object exception) async { + // TODO: Return if the attachment sync should be retried + return false; + } + + @override + Future onUploadError(Attachment attachment, Object exception) async { + // TODO: Return if the attachment sync should be retried + return false; + } + + @override + Future onDeleteError(Attachment attachment, Object exception) async { + // TODO: Return if the attachment sync should be retried + return false; + } +} + +final queue = AttachmentQueue( + // ... other parameters ... + errorHandler: errorHandler, +); +``` \ No newline at end of file diff --git a/packages/powersync_attachments_stream/analysis_options.yaml b/packages/powersync_attachments_stream/analysis_options.yaml new file mode 100644 index 00000000..79238a7a --- /dev/null +++ b/packages/powersync_attachments_stream/analysis_options.yaml @@ -0,0 +1,10 @@ +include: package:flutter_lints/flutter.yaml + +analyzer: + language: + strict-casts: true + strict-inference: true + strict-raw-types: true + +# Additional information about this file can be found at +# https://dart.dev/guides/language/analysis-options diff --git a/packages/powersync_attachments_stream/lib/common.dart b/packages/powersync_attachments_stream/lib/common.dart new file mode 100644 index 00000000..e83f6928 --- /dev/null +++ b/packages/powersync_attachments_stream/lib/common.dart @@ -0,0 +1,8 @@ +/// Platform-agnostic exports for all platforms (including web). +export 'src/attachment.dart'; +export 'src/abstractions/sync_error_handler.dart'; +export 'src/abstractions/attachment_service.dart'; +export 'src/abstractions/attachment_context.dart'; +export 'src/abstractions/local_storage.dart'; +export 'src/abstractions/remote_storage.dart'; +export 'src/attachment_queue_service.dart'; \ No newline at end of file diff --git a/packages/powersync_attachments_stream/lib/powersync_attachments_stream.dart b/packages/powersync_attachments_stream/lib/powersync_attachments_stream.dart new file mode 100644 index 00000000..26356fc2 --- /dev/null +++ b/packages/powersync_attachments_stream/lib/powersync_attachments_stream.dart @@ -0,0 +1,4 @@ +/// Default exports for native platforms (dart:io). For web, use 'common.dart'. +export 'common.dart'; +export 'src/storage/io_local_storage.dart'; +export 'src/attachment_queue_service.dart'; \ No newline at end of file diff --git a/packages/powersync_attachments_stream/lib/src/abstractions/attachment_context.dart b/packages/powersync_attachments_stream/lib/src/abstractions/attachment_context.dart new file mode 100644 index 00000000..268ed0b8 --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/abstractions/attachment_context.dart @@ -0,0 +1,49 @@ +import '../attachment.dart'; + +/// Context for performing Attachment operations. +/// +/// This is typically provided through a locking/exclusivity method and allows +/// safe, transactional operations on the attachment queue. +abstract class AbstractAttachmentContext { + /// Delete the attachment from the attachment queue. + /// + /// [id]: The ID of the attachment to delete. + /// [tx]: The database context to use for the operation. + Future deleteAttachment(String id, dynamic context); + + /// Set the state of the attachment to ignore. + Future ignoreAttachment(String id); + + /// Get the attachment from the attachment queue using an ID. + Future getAttachment(String id); + + /// Save the attachment to the attachment queue. + Future saveAttachment(Attachment attachment); + + /// Save the attachments to the attachment queue. + Future saveAttachments(List attachments); + + /// Get all the IDs of attachments in the attachment queue. + Future> getAttachmentIds(); + + /// Get all Attachment records present in the database. + Future> getAttachments(); + + /// Gets all the active attachments which require an operation to be performed. + Future> getActiveAttachments(); + + /// Helper function to clear the attachment queue. Currently only used for testing purposes. + Future clearQueue(); + + /// Delete attachments which have been archived. + /// + /// Returns true if all items have been deleted. Returns false if there might be more archived items remaining. + Future deleteArchivedAttachments(Future Function(List) callback); + + /// Upserts an attachment record given a database connection context. + /// + /// [attachment]: The attachment to upsert. + /// [context]: The database transaction/context to use for the operation. + /// Returns the upserted [Attachment]. + Future upsertAttachment(Attachment attachment, dynamic context); +} \ No newline at end of file diff --git a/packages/powersync_attachments_stream/lib/src/abstractions/attachment_service.dart b/packages/powersync_attachments_stream/lib/src/abstractions/attachment_service.dart new file mode 100644 index 00000000..d8d26d9c --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/abstractions/attachment_service.dart @@ -0,0 +1,12 @@ +import 'attachment_context.dart'; + +/// Service for interacting with the local attachment records. +abstract class AbstractAttachmentService { + /// Watcher for changes to attachments table. + /// Once a change is detected it will initiate a sync of the attachments. + Stream watchActiveAttachments(); + + /// Executes a callback with an exclusive lock on all attachment operations. + /// This helps prevent race conditions between different updates. + Future withContext(Future Function(AbstractAttachmentContext context) action); +} \ No newline at end of file diff --git a/packages/powersync_attachments_stream/lib/src/abstractions/local_storage.dart b/packages/powersync_attachments_stream/lib/src/abstractions/local_storage.dart new file mode 100644 index 00000000..ddc85b8b --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/abstractions/local_storage.dart @@ -0,0 +1,34 @@ +import 'dart:typed_data'; + +abstract class AbstractLocalStorageAdapter { + /// Saves binary data stream to storage at the specified file path + /// + /// [filePath] - Path where the file will be stored + /// [data] - Stream of binary data to store + /// Returns the total size of the written data in bytes + Future saveFile(String filePath, Stream data); + + /// Retrieves binary data stream from storage at the specified file path + /// + /// [filePath] - Path of the file to read + /// [mediaType] - Optional MIME type of the data + /// Returns a stream of binary data + Stream readFile(String filePath, {String? mediaType}); + + /// Deletes a file at the specified path + /// + /// [filePath] - Path of the file to delete + Future deleteFile(String filePath); + + /// Checks if a file exists at the specified path + /// + /// [filePath] - Path to check + /// Returns true if the file exists, false otherwise + Future fileExists(String filePath); + + /// Initializes the storage, performing any necessary setup. + Future initialize(); + + /// Clears all data from the storage. + Future clear(); +} diff --git a/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart b/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart new file mode 100644 index 00000000..80256a2f --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart @@ -0,0 +1,25 @@ +import 'dart:async'; +import '../attachment.dart'; + +/// Adapter for interfacing with remote attachment storage. +abstract class AbstractRemoteStorageAdapter { + /// Uploads a file to remote storage. + /// + /// [fileData] is a stream of byte arrays representing the file data. + /// [attachment] is the attachment record associated with the file. + Future uploadFile( + Stream> fileData, + Attachment attachment, + ); + + /// Downloads a file from remote storage. + /// + /// [attachment] is the attachment record associated with the file. + /// Returns a stream of byte arrays representing the file data. + Future>> downloadFile(Attachment attachment); + + /// Deletes a file from remote storage. + /// + /// [attachment] is the attachment record associated with the file. + Future deleteFile(Attachment attachment); +} diff --git a/packages/powersync_attachments_stream/lib/src/abstractions/sync_error_handler.dart b/packages/powersync_attachments_stream/lib/src/abstractions/sync_error_handler.dart new file mode 100644 index 00000000..cc2e3cab --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/abstractions/sync_error_handler.dart @@ -0,0 +1,36 @@ +import '../attachment.dart'; + +/// Interface for handling errors during attachment operations. +/// Implementations determine whether failed operations should be retried. +/// Attachment records are archived if an operation fails and should not be retried. +abstract class AbstractSyncErrorHandler { + /// Determines whether the provided attachment download operation should be retried. + /// + /// [attachment] The attachment involved in the failed download operation. + /// [exception] The exception that caused the download failure. + /// Returns `true` if the download operation should be retried, `false` otherwise. + Future onDownloadError( + Attachment attachment, + Object exception, + ); + + /// Determines whether the provided attachment upload operation should be retried. + /// + /// [attachment] The attachment involved in the failed upload operation. + /// [exception] The exception that caused the upload failure. + /// Returns `true` if the upload operation should be retried, `false` otherwise. + Future onUploadError( + Attachment attachment, + Object exception, + ); + + /// Determines whether the provided attachment delete operation should be retried. + /// + /// [attachment] The attachment involved in the failed delete operation. + /// [exception] The exception that caused the delete failure. + /// Returns `true` if the delete operation should be retried, `false` otherwise. + Future onDeleteError( + Attachment attachment, + Object exception, + ); +} \ No newline at end of file diff --git a/packages/powersync_attachments_stream/lib/src/attachment.dart b/packages/powersync_attachments_stream/lib/src/attachment.dart new file mode 100644 index 00000000..62e3eef4 --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/attachment.dart @@ -0,0 +1,156 @@ +/// Defines attachment states and the Attachment model for the PowerSync attachments system. +/// +/// Includes metadata, state, and utility methods for working with attachments. + +import 'package:powersync_core/sqlite3_common.dart' show Row; +import 'package:powersync_core/powersync_core.dart'; +import './attachment_queue_service.dart'; + +/// Represents the state of an attachment. +enum AttachmentState { + /// The attachment is queued for download from the remote storage. + queuedDownload, + + /// The attachment is queued for upload to the remote storage. + queuedUpload, + + /// The attachment is queued for deletion from the remote storage. + queuedDelete, + + /// The attachment is fully synchronized with the remote storage. + synced, + + /// The attachment is archived and no longer actively synchronized. + archived; + + /// Constructs an [AttachmentState] from the corresponding integer value. + /// + /// Throws [ArgumentError] if the value does not match any [AttachmentState]. + static AttachmentState fromInt(int value) { + if (value < 0 || value >= AttachmentState.values.length) { + throw ArgumentError('Invalid value for AttachmentState: $value'); + } + return AttachmentState.values[value]; + } + + /// Returns the ordinal value of this [AttachmentState]. + int toInt() => index; +} + +const defaultAttachmentsQueueTableName = AttachmentQueue.defaultTableName; + +/// Represents an attachment with metadata and state information. +/// +/// {@category Attachments} +/// +/// Properties: +/// - [id]: Unique identifier for the attachment. +/// - [timestamp]: Timestamp of the last record update. +/// - [filename]: Name of the attachment file, e.g., `[id].jpg`. +/// - [state]: Current state of the attachment, represented as an ordinal of [AttachmentState]. +/// - [localUri]: Local URI pointing to the attachment file, if available. +/// - [mediaType]: Media type of the attachment, typically represented as a MIME type. +/// - [size]: Size of the attachment in bytes, if available. +/// - [hasSynced]: Indicates whether the attachment has been synced locally before. +/// - [metaData]: Additional metadata associated with the attachment. +class Attachment { + /// Unique identifier for the attachment. + final String id; + /// Timestamp of the last record update. + final int timestamp; + /// Name of the attachment file, e.g., `[id].jpg`. + final String filename; + /// Current state of the attachment, represented as an ordinal of [AttachmentState]. + final AttachmentState state; + /// Local URI pointing to the attachment file, if available. + final String? localUri; + /// Media type of the attachment, typically represented as a MIME type. + final String? mediaType; + /// Size of the attachment in bytes, if available. + final int? size; + /// Indicates whether the attachment has been synced locally before. + final bool hasSynced; + /// Additional metadata associated with the attachment. + final String? metaData; + + /// Creates an [Attachment] instance. + const Attachment({ + required this.id, + this.timestamp = 0, + required this.filename, + this.state = AttachmentState.queuedDownload, + this.localUri, + this.mediaType, + this.size, + this.hasSynced = false, + this.metaData, + }); + + /// Creates an [Attachment] instance from a database row. + /// + /// [row]: The [Row] containing attachment data. + /// Returns an [Attachment] instance populated with data from the row. + factory Attachment.fromRow(Row row) { + return Attachment( + id: row['id'] as String, + timestamp: row['timestamp'] as int? ?? 0, + filename: row['filename'] as String, + localUri: row['local_uri'] as String?, + mediaType: row['media_type'] as String?, + size: row['size'] as int?, + state: AttachmentState.fromInt(row['state'] as int), + hasSynced: (row['has_synced'] as int? ?? 0) > 0, + metaData: row['meta_data']?.toString(), + ); + } + + /// Returns a copy of this attachment with the given fields replaced. + Attachment copyWith({ + String? id, + int? timestamp, + String? filename, + AttachmentState? state, + String? localUri, + String? mediaType, + int? size, + bool? hasSynced, + String? metaData, + }) { + return Attachment( + id: id ?? this.id, + timestamp: timestamp ?? this.timestamp, + filename: filename ?? this.filename, + state: state ?? this.state, + localUri: localUri ?? this.localUri, + mediaType: mediaType ?? this.mediaType, + size: size ?? this.size, + hasSynced: hasSynced ?? this.hasSynced, + metaData: metaData ?? this.metaData, + ); + } +} + +/// Table definition for the attachments queue. +class AttachmentsQueueTable extends Table { + AttachmentsQueueTable({ + String attachmentsQueueTableName = defaultAttachmentsQueueTableName, + List additionalColumns = const [], + List indexes = const [], + String? viewName, + }) : super.localOnly( + attachmentsQueueTableName, + [ + const Column.text('filename'), + const Column.text('local_uri'), + const Column.integer('timestamp'), + const Column.integer('size'), + const Column.text('media_type'), + const Column.integer('state'), + const Column.integer('has_synced'), + const Column.text('meta_data'), + ...additionalColumns, + ], + viewName: viewName, + indexes: indexes, + ); +} diff --git a/packages/powersync_attachments_stream/lib/src/attachment_queue_service.dart b/packages/powersync_attachments_stream/lib/src/attachment_queue_service.dart new file mode 100644 index 00000000..9081f63e --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/attachment_queue_service.dart @@ -0,0 +1,391 @@ +// Implements the attachment queue for PowerSync attachments. +// +// This class manages the lifecycle of attachment records, including watching for new attachments, +// syncing with remote storage, handling uploads, downloads, and deletes, and managing local storage. +// It provides hooks for error handling, cache management, and custom filename resolution. + +import 'dart:async'; +import 'dart:typed_data'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; +import 'package:sqlite_async/mutex.dart'; +import 'attachment.dart'; +import 'abstractions/attachment_service.dart'; +import 'abstractions/attachment_context.dart'; +import 'abstractions/local_storage.dart'; +import 'abstractions/remote_storage.dart'; +import 'abstractions/sync_error_handler.dart'; +import 'implementations/attachment_service.dart'; +import 'sync/syncing_service.dart'; + +/// A watched attachment record item. +/// +/// This is usually returned from watching all relevant attachment IDs. +/// +/// - [id]: Id for the attachment record. +/// - [fileExtension]: File extension used to determine an internal filename for storage if no [filename] is provided. +/// - [filename]: Filename to store the attachment with. +/// - [metaData]: Optional metadata for the attachment record. +class WatchedAttachmentItem { + /// Id for the attachment record. + final String id; + + /// File extension used to determine an internal filename for storage if no [filename] is provided. + final String? fileExtension; + + /// Filename to store the attachment with. + final String? filename; + + /// Optional metadata for the attachment record. + final String? metaData; + + /// Creates a [WatchedAttachmentItem]. + /// + /// Either [fileExtension] or [filename] must be provided. + WatchedAttachmentItem({ + required this.id, + this.fileExtension, + this.filename, + this.metaData, + }) : assert( + fileExtension != null || filename != null, + 'Either fileExtension or filename must be provided.', + ); +} + +/// Class used to implement the attachment queue. +/// +/// Manages the lifecycle of attachment records, including watching for new attachments, +/// syncing with remote storage, handling uploads, downloads, and deletes, and managing local storage. +/// +/// Properties: +/// - [db]: PowerSync database client. +/// - [remoteStorage]: Adapter which interfaces with the remote storage backend. +/// - [watchAttachments]: A stream generator for the current state of local attachments. +/// - [localStorage]: Provides access to local filesystem storage methods. +/// - [attachmentsQueueTableName]: SQLite table where attachment state will be recorded. +/// - [errorHandler]: Attachment operation error handler. Specifies if failed attachment operations should be retried. +/// - [syncInterval]: Periodic interval to trigger attachment sync operations. +/// - [archivedCacheLimit]: Defines how many archived records are retained as a cache. +/// - [syncThrottleDuration]: Throttles remote sync operations triggering. +/// - [downloadAttachments]: Should attachments be downloaded. +/// - [logger]: Logging interface used for all log operations. +class AttachmentQueue { + final PowerSyncDatabase db; + final AbstractRemoteStorageAdapter remoteStorage; + final Stream> Function() watchAttachments; + final AbstractLocalStorageAdapter localStorage; + final String attachmentsQueueTableName; + final AbstractSyncErrorHandler? errorHandler; + final Duration syncInterval; + final int archivedCacheLimit; + final Duration syncThrottleDuration; + final bool downloadAttachments; + final Logger logger; + + static const String defaultTableName = 'attachments_queue'; + + final Mutex _mutex = Mutex(); + bool _closed = false; + StreamSubscription>? _syncStatusSubscription; + late final AbstractAttachmentService attachmentsService; + late final SyncingService syncingService; + + AttachmentQueue({ + required this.db, + required this.remoteStorage, + required this.watchAttachments, + required this.localStorage, + this.attachmentsQueueTableName = defaultTableName, + this.errorHandler, + this.syncInterval = const Duration(seconds: 30), + this.archivedCacheLimit = 100, + this.syncThrottleDuration = const Duration(seconds: 1), + this.downloadAttachments = true, + Logger? logger, + }) : logger = logger ?? Logger('AttachmentQueue') { + attachmentsService = AttachmentServiceImpl( + db: db, + logger: logger ?? Logger('AttachmentQueue'), + maxArchivedCount: archivedCacheLimit, + attachmentsQueueTableName: attachmentsQueueTableName, + ); + + syncingService = SyncingService( + remoteStorage: remoteStorage, + localStorage: localStorage, + attachmentsService: attachmentsService, + errorHandler: errorHandler, + syncThrottle: syncThrottleDuration, + period: syncInterval, + ); + } + + /// Initialize the attachment queue by: + /// 1. Creating the attachments directory. + /// 2. Adding watches for uploads, downloads, and deletes. + /// 3. Adding a trigger to run uploads, downloads, and deletes when the device is online after being offline. + Future startSync() async { + await _mutex.lock(() async { + if (_closed) { + throw Exception('Attachment queue has been closed'); + } + + await _stopSyncingInternal(); + + await localStorage.initialize(); + + await attachmentsService.withContext((context) async { + await _verifyAttachments(context); + }); + + await syncingService.startSync(); + + // Listen for connectivity changes and watched attachments + _syncStatusSubscription = watchAttachments().listen((items) async { + await _processWatchedAttachments(items); + }); + + logger.info('AttachmentQueue started syncing.'); + }); + } + + /// Stops syncing. Syncing may be resumed with [startSync]. + Future stopSyncing() async { + await _mutex.lock(() async { + await _stopSyncingInternal(); + }); + } + + Future _stopSyncingInternal() async { + if (_closed) return; + + await _syncStatusSubscription?.cancel(); + _syncStatusSubscription = null; + await syncingService.stopSync(); + + logger.info('AttachmentQueue stopped syncing.'); + } + + /// Closes the queue. The queue cannot be used after closing. + Future close() async { + await _mutex.lock(() async { + if (_closed) return; + + await _syncStatusSubscription?.cancel(); + await syncingService.close(); + _closed = true; + + logger.info('AttachmentQueue closed.'); + }); + } + + /// Resolves the filename for new attachment items. + /// Concatenates the attachment ID and extension by default. + Future resolveNewAttachmentFilename( + String attachmentId, + String? fileExtension, + ) async { + return fileExtension != null + ? '$attachmentId.$fileExtension' + : '$attachmentId.dat'; + } + + /// Processes attachment items returned from [watchAttachments]. + /// The default implementation asserts the items returned from [watchAttachments] as the definitive + /// state for local attachments. + Future _processWatchedAttachments( + List items, + ) async { + await attachmentsService.withContext((context) async { + final currentAttachments = await context.getAttachments(); + final List attachmentUpdates = []; + + for (final item in items) { + final existingQueueItem = currentAttachments + .cast() + .firstWhere( + (a) => a != null && a.id == item.id, + orElse: () => null, + ); + + if (existingQueueItem == null) { + if (!downloadAttachments) continue; + + // This item should be added to the queue. + // This item is assumed to be coming from an upstream sync. + final String filename = + item.filename ?? + await resolveNewAttachmentFilename(item.id, item.fileExtension); + + attachmentUpdates.add( + Attachment( + id: item.id, + filename: filename, + state: AttachmentState.queuedDownload, + metaData: item.metaData, + ), + ); + } else if (existingQueueItem.state == AttachmentState.archived) { + // The attachment is present again. Need to queue it for sync. + if (existingQueueItem.hasSynced) { + // No remote action required, we can restore the record (avoids deletion). + attachmentUpdates.add( + existingQueueItem.copyWith(state: AttachmentState.synced), + ); + } else { + // The localURI should be set if the record was meant to be downloaded + // and has been synced. If it's missing and hasSynced is false then + // it must be an upload operation. + attachmentUpdates.add( + existingQueueItem.copyWith( + state: existingQueueItem.localUri == null + ? AttachmentState.queuedDownload + : AttachmentState.queuedUpload, + ), + ); + } + } + } + + // Archive any items not specified in the watched items. + // For queuedDelete or queuedUpload states, archive only if hasSynced is true. + // For other states, archive if the record is not found in the items. + for (final attachment in currentAttachments) { + final notInWatchedItems = items.every( + (update) => update.id != attachment.id, + ); + + if (notInWatchedItems) { + switch (attachment.state) { + case AttachmentState.queuedDelete: + case AttachmentState.queuedUpload: + if (attachment.hasSynced) { + attachmentUpdates.add( + attachment.copyWith(state: AttachmentState.archived), + ); + } + break; + default: + attachmentUpdates.add( + attachment.copyWith(state: AttachmentState.archived), + ); + } + } + } + + await context.saveAttachments(attachmentUpdates); + }); + } + + /// Creates a new attachment locally and queues it for upload. + /// The filename is resolved using [resolveNewAttachmentFilename]. + Future saveFile({ + required Stream data, + required String mediaType, + String? fileExtension, + String? metaData, + required Future Function(dynamic context, Attachment attachment) + updateHook, + }) async { + final row = await db.get('SELECT uuid() as id'); + final id = row['id'] as String; + final String filename = await resolveNewAttachmentFilename( + id, + fileExtension, + ); + + // Write the file to the filesystem. + final fileSize = await localStorage.saveFile(filename, data); + + return await attachmentsService.withContext((attachmentContext) async { + return await db.writeTransaction((tx) async { + final attachment = Attachment( + id: id, + filename: filename, + size: fileSize, + mediaType: mediaType, + state: AttachmentState.queuedUpload, + localUri: filename, + metaData: metaData, + ); + + // Allow consumers to set relationships to this attachment ID. + await updateHook(tx, attachment); + + return await attachmentContext.upsertAttachment(attachment, tx); + }); + }); + } + + /// Queues an attachment for delete. + /// The default implementation assumes the attachment record already exists locally. + Future deleteFile({ + required String attachmentId, + required Future Function(dynamic context, Attachment attachment) + updateHook, + }) async { + return await attachmentsService.withContext((attachmentContext) async { + final attachment = await attachmentContext.getAttachment(attachmentId); + if (attachment == null) { + throw Exception( + 'Attachment record with id $attachmentId was not found.', + ); + } + + return await db.writeTransaction((tx) async { + await updateHook(tx, attachment); + return await attachmentContext.upsertAttachment( + attachment.copyWith( + state: AttachmentState.queuedDelete, + hasSynced: false, + ), + tx, + ); + }); + }); + } + + /// Removes all archived items. + Future expireCache() async { + await attachmentsService.withContext((context) async { + bool done; + do { + done = await syncingService.deleteArchivedAttachments(context); + } while (!done); + }); + } + + /// Clears the attachment queue and deletes all attachment files. + Future clearQueue() async { + await attachmentsService.withContext((context) async { + await context.clearQueue(); + }); + await localStorage.clear(); + } + + /// Cleans up stale attachments. + Future _verifyAttachments(AbstractAttachmentContext context) async { + final attachments = await context.getActiveAttachments(); + final List updates = []; + + for (final attachment in attachments) { + // Only check attachments that should have local files + if (attachment.localUri == null) { + // Skip attachments that don't have localUri (like queued downloads) + continue; + } + + final exists = await localStorage.fileExists(attachment.localUri!); + if ((attachment.state == AttachmentState.synced || + attachment.state == AttachmentState.queuedUpload) && + !exists) { + updates.add( + attachment.copyWith(state: AttachmentState.archived, localUri: null), + ); + } + } + + await context.saveAttachments(updates); + } +} diff --git a/packages/powersync_attachments_stream/lib/src/implementations/attachment_context.dart b/packages/powersync_attachments_stream/lib/src/implementations/attachment_context.dart new file mode 100644 index 00000000..8c7176af --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/implementations/attachment_context.dart @@ -0,0 +1,162 @@ +import '../abstractions/attachment_context.dart'; +import '../attachment.dart'; +import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/sqlite3_common.dart'; +import 'package:logging/logging.dart'; + +class AttachmentContextImpl implements AbstractAttachmentContext { + final PowerSyncDatabase db; + final Logger log; + final int maxArchivedCount; + final String attachmentsQueueTableName; + + AttachmentContextImpl( + this.db, + this.log, + this.maxArchivedCount, + this.attachmentsQueueTableName, + ); + + /// Table used for storing attachments in the attachment queue. + String get table { + return attachmentsQueueTableName; + } + + @override + Future deleteAttachment(String id, dynamic context) async { + log.info('deleteAttachment: $id'); + await context.execute('DELETE FROM $table WHERE id = ?', [id]); + } + + @override + Future ignoreAttachment(String id) async { + await db.execute( + 'UPDATE $table SET state = ${AttachmentState.archived.index} WHERE id = ?', + [id], + ); + } + + @override + Future getAttachment(String id) async { + final row = await db.getOptional('SELECT * FROM $table WHERE id = ?', [id]); + if (row == null) { + return null; + } + return Attachment.fromRow(row); + } + + @override + Future saveAttachment(Attachment attachment) async { + return await db.writeLock((ctx) async { + return await upsertAttachment(attachment, ctx); + }); + } + + @override + Future saveAttachments(List attachments) async { + if (attachments.isEmpty) { + log.info('No attachments to save.'); + return; + } + await db.writeTransaction((tx) async { + for (final attachment in attachments) { + await upsertAttachment(attachment, tx); + } + }); + } + + @override + Future> getAttachmentIds() async { + ResultSet results = await db.getAll( + 'SELECT id FROM $table WHERE id IS NOT NULL', + ); + + List ids = results.map((row) => row['id'] as String).toList(); + + return ids; + } + + @override + Future> getAttachments() async { + final results = await db.getAll('SELECT * FROM $table'); + return results.map((row) => Attachment.fromRow(row)).toList(); + } + + @override + Future> getActiveAttachments() async { + // Return all attachments that are not archived (i.e., state != AttachmentState.archived) + final results = await db.getAll('SELECT * FROM $table WHERE state != ?', [ + AttachmentState.archived.index, + ]); + return results.map((row) => Attachment.fromRow(row)).toList(); + } + + @override + Future clearQueue() async { + log.info('Clearing attachment queue...'); + await db.execute('DELETE FROM $table'); + } + + @override + Future deleteArchivedAttachments( + Future Function(List) callback, + ) async { + // Only delete archived attachments exceeding the maxArchivedCount, ordered by timestamp DESC + const limit = 1000; + final results = await db.getAll( + 'SELECT * FROM $table WHERE state = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?', + [ + AttachmentState.archived.index, + limit, + maxArchivedCount, + ], + ); + final archivedAttachments = results.map((row) => Attachment.fromRow(row)).toList(); + + if (archivedAttachments.isEmpty) { + return false; + } + + log.info('Deleting ${archivedAttachments.length} archived attachments (exceeding maxArchivedCount=$maxArchivedCount)...'); + // Call the callback with the list of archived attachments before deletion + await callback(archivedAttachments); + + // Delete the archived attachments from the table + final ids = archivedAttachments.map((a) => a.id).toList(); + if (ids.isNotEmpty) { + await db.executeBatch('DELETE FROM $table WHERE id = ?', [ + for (final id in ids) [id], + ]); + } + + log.info('Deleted ${archivedAttachments.length} archived attachments.'); + return archivedAttachments.length < limit; + } + + @override + Future upsertAttachment( + Attachment attachment, + dynamic context, + ) async { + + await context.execute( + '''INSERT OR REPLACE INTO + $table (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?)''', + [ + attachment.id, + attachment.timestamp, + attachment.filename, + attachment.localUri, + attachment.mediaType, + attachment.size, + attachment.state.index, + attachment.hasSynced ? 1 : 0, + attachment.metaData, + ], + ); + + return attachment; + } +} diff --git a/packages/powersync_attachments_stream/lib/src/implementations/attachment_service.dart b/packages/powersync_attachments_stream/lib/src/implementations/attachment_service.dart new file mode 100644 index 00000000..9cb263aa --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/implementations/attachment_service.dart @@ -0,0 +1,75 @@ +import 'dart:async'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; +import 'package:sqlite_async/sqlite_async.dart'; + +import '../abstractions/attachment_service.dart'; +import '../abstractions/attachment_context.dart'; +import '../attachment.dart'; +import 'attachment_context.dart'; + +class AttachmentServiceImpl implements AbstractAttachmentService { + final PowerSyncDatabase db; + final Logger logger; + final int maxArchivedCount; + final String attachmentsQueueTableName; + final Mutex _mutex = Mutex(); + + late final AbstractAttachmentContext _context; + + AttachmentServiceImpl({ + required this.db, + required this.logger, + required this.maxArchivedCount, + required this.attachmentsQueueTableName, + }) { + _context = AttachmentContextImpl( + db, + logger, + maxArchivedCount, + attachmentsQueueTableName, + ); + } + + @override + Stream watchActiveAttachments() async* { + logger.info('Watching attachments...'); + + // Watch for attachments with active states (queued for upload, download, or delete) + final stream = db.watch( + ''' + SELECT + id + FROM + $attachmentsQueueTableName + WHERE + state = ? + OR state = ? + OR state = ? + ORDER BY + timestamp ASC + ''', + parameters: [ + AttachmentState.queuedUpload.index, + AttachmentState.queuedDownload.index, + AttachmentState.queuedDelete.index, + ], + ); + + yield* stream; + } + + @override + Future withContext( + Future Function(AbstractAttachmentContext ctx) action, + ) async { + return await _mutex.lock(() async { + try { + return await action(_context); + } catch (e, stackTrace) { + // Re-throw the error to be handled by the caller + Error.throwWithStackTrace(e, stackTrace); + } + }); + } +} diff --git a/packages/powersync_attachments_stream/lib/src/storage/io_local_storage.dart b/packages/powersync_attachments_stream/lib/src/storage/io_local_storage.dart new file mode 100644 index 00000000..9a118178 --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/storage/io_local_storage.dart @@ -0,0 +1,104 @@ +/// Local storage adapter for handling file operations on the device filesystem. +/// +/// This file implements the [IOLocalStorage] class, which provides methods for +/// saving, reading, deleting, copying, and managing files and directories using +/// the Dart IO library. It is used as the local storage backend for attachments +/// in the PowerSync attachments system. +/// +/// Features: +/// - Save files from streams (creates directories and all necessary parents dynamically if they do not exist) +/// - Read files as streams +/// - Delete files and their metadata +/// - Copy files and their metadata +/// - Create and remove directories (creates all necessary parents dynamically) +/// - Check file existence + +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:path/path.dart' as p; +import '../abstractions/local_storage.dart'; + +/// Implements [AbstractLocalStorageAdapter] for device filesystem using Dart IO. +/// +/// Handles file and directory operations for attachments. +class IOLocalStorage implements AbstractLocalStorageAdapter { + final String attachmentsDirectory; + late final Directory baseDir; + + IOLocalStorage(this.attachmentsDirectory) { + baseDir = Directory(attachmentsDirectory); + } + + File _fileFor(String filePath) => File(p.join(baseDir.path, filePath)); + File _metaFileFor(String filePath) => + File(p.join(baseDir.path, '$filePath.meta.json')); + + /// Saves a file from a stream of [Uint8List] chunks. + /// Creates the file's directory and all necessary parent directories dynamically if they do not exist. + /// Returns the total number of bytes written. + @override + Future saveFile(String filePath, Stream data) async { + final file = _fileFor(filePath); + await file.parent.create(recursive: true); + var totalSize = 0; + final sink = file.openWrite(); + try { + await for (final chunk in data) { + sink.add(chunk); + totalSize += chunk.length; + } + await sink.flush(); + } finally { + await sink.close(); + } + return totalSize; + } + + /// Reads a file as a stream of [Uint8List] chunks. + /// Throws if the file does not exist. + @override + Stream readFile(String filePath, {String? mediaType}) async* { + final file = _fileFor(filePath); + if (!await file.exists()) { + throw FileSystemException('File does not exist', filePath); + } + final source = file.openRead(); + await for (final chunk in source) { + yield chunk is Uint8List ? chunk : Uint8List.fromList(chunk); + } + } + + /// Deletes a file and its metadata file. + @override + Future deleteFile(String filePath) async { + final file = _fileFor(filePath); + if (await file.exists()) { + await file.delete(); + } + final metaFile = _metaFileFor(filePath); + if (await metaFile.exists()) { + await metaFile.delete(); + } + } + + /// Checks if a file exists. + @override + Future fileExists(String filePath) async { + return await _fileFor(filePath).exists(); + } + + /// Creates a directory and all necessary parent directories dynamically if they do not exist. + @override + Future initialize() async { + await baseDir.create(recursive: true); + } + + @override + Future clear() async { + if (await baseDir.exists()) { + await baseDir.delete(recursive: true); + } + await baseDir.create(recursive: true); + } +} diff --git a/packages/powersync_attachments_stream/lib/src/sync/syncing_service.dart b/packages/powersync_attachments_stream/lib/src/sync/syncing_service.dart new file mode 100644 index 00000000..b3f97e1b --- /dev/null +++ b/packages/powersync_attachments_stream/lib/src/sync/syncing_service.dart @@ -0,0 +1,324 @@ +// Service responsible for syncing attachments between local and remote storage. +// +// This service handles downloading, uploading, and deleting attachments, as well as +// periodically syncing attachment states. It ensures proper lifecycle management +// of sync operations and provides mechanisms for error handling and retries. +// +// The class provides a default implementation for syncing operations, which can be +// extended or customized as needed. + +import 'dart:async'; +import 'dart:typed_data'; +import 'package:logging/logging.dart'; +import 'package:async/async.dart'; + +import '../abstractions/attachment_service.dart'; +import '../abstractions/attachment_context.dart'; +import '../attachment.dart'; +import '../abstractions/local_storage.dart'; +import '../abstractions/remote_storage.dart'; +import '../abstractions/sync_error_handler.dart'; + +/// SyncingService is responsible for syncing attachments between local and remote storage. +/// +/// This service handles downloading, uploading, and deleting attachments, as well as +/// periodically syncing attachment states. It ensures proper lifecycle management +/// of sync operations and provides mechanisms for error handling and retries. +/// +/// Properties: +/// - [remoteStorage]: The remote storage implementation for handling file operations. +/// - [localStorage]: The local storage implementation for managing files locally. +/// - [attachmentsService]: The service for managing attachment states and operations. +/// - [getLocalUri]: A function to resolve the local URI for a given filename. +/// - [onDownloadError], [onUploadError], [onDeleteError]: Optional error handlers for managing sync-related errors. +class SyncingService { + final AbstractRemoteStorageAdapter remoteStorage; + final AbstractLocalStorageAdapter localStorage; + final AbstractAttachmentService attachmentsService; + final AbstractSyncErrorHandler? errorHandler; + final Duration syncThrottle; + final Duration period; + final Logger logger; + + StreamSubscription? _syncSubscription; + StreamSubscription? _periodicSubscription; + bool _isClosed = false; + final _syncTriggerController = StreamController.broadcast(); + + SyncingService({ + required this.remoteStorage, + required this.localStorage, + required this.attachmentsService, + this.errorHandler, + this.syncThrottle = const Duration(seconds: 5), + this.period = const Duration(seconds: 30), + Logger? logger, + }) : logger = logger ?? Logger('SyncingService'); + + /// Starts the syncing process, including periodic and event-driven sync operations. + /// + /// [period] is the interval at which periodic sync operations are triggered. + Future startSync({Duration period = const Duration(seconds: 30)}) async { + if (_isClosed) return; + + _syncSubscription?.cancel(); + _periodicSubscription?.cancel(); + + // Create a merged stream of manual triggers and attachment changes + final attachmentChanges = attachmentsService.watchActiveAttachments(); + final manualTriggers = _syncTriggerController.stream; + + // Merge both streams and apply throttling + final mergedStream = + StreamGroup.merge([attachmentChanges, manualTriggers]) + .transform(_throttleTransformer(syncThrottle)) + .listen((_) async { + try { + await attachmentsService.withContext((context) async { + final attachments = await context.getActiveAttachments(); + logger.info( + 'Found ${attachments.length} active attachments', + ); + await handleSync(attachments, context); + await deleteArchivedAttachments(context); + }); + } catch (e, st) { + if (e is! StateError && e.toString().contains('cancelled')) { + logger.severe( + 'Caught exception when processing attachments', + e, + st, + ); + } else { + rethrow; + } + } + }); + + _syncSubscription = mergedStream; + + // Start periodic sync + _periodicSubscription = + Stream.periodic(period, (_) => null).listen((_) { + logger.info('Periodically syncing attachments'); + triggerSync(); + }); + } + + StreamTransformer _throttleTransformer(Duration throttle) { + return StreamTransformer.fromHandlers( + handleData: (data, sink) { + sink.add(data); + // Simple throttle implementation - just delay the next event + Future.delayed(throttle); + }, + ); + } + + /// Enqueues a sync operation (manual trigger). + Future triggerSync() async { + if (_isClosed) return; + _syncTriggerController.add(null); + } + + /// Stops all ongoing sync operations. + Future stopSync() async { + await _syncSubscription?.cancel(); + await _periodicSubscription?.cancel(); + } + + /// Closes the syncing service, stopping all operations and releasing resources. + Future close() async { + _isClosed = true; + await stopSync(); + await _syncTriggerController.close(); + } + + /// Handles syncing operations for a list of attachments, including downloading, + /// uploading, and deleting files based on their states. + /// + /// [attachments]: The list of attachments to process. + /// [context]: The attachment context used for managing attachment states. + Future handleSync( + List attachments, + AbstractAttachmentContext context, + ) async { + logger.info( + 'Starting handleSync with ${attachments.length} attachments', + ); + final updatedAttachments = []; + + for (final attachment in attachments) { + logger.info( + 'Processing attachment ${attachment.id} with state: ${attachment.state}', + ); + try { + switch (attachment.state) { + case AttachmentState.queuedDownload: + logger.info('Downloading [${attachment.filename}]'); + updatedAttachments.add(await downloadAttachment(attachment)); + break; + case AttachmentState.queuedUpload: + logger.info('Uploading [${attachment.filename}]'); + updatedAttachments.add(await uploadAttachment(attachment)); + break; + case AttachmentState.queuedDelete: + logger.info('Deleting [${attachment.filename}]'); + updatedAttachments.add(await deleteAttachment(attachment)); + break; + case AttachmentState.synced: + logger.info( + 'Attachment ${attachment.id} is already synced', + ); + break; + case AttachmentState.archived: + logger.info( + 'Attachment ${attachment.id} is archived', + ); + break; + } + } catch (e, st) { + logger.warning( + 'Error during sync for ${attachment.id}', + e, + st, + ); + } + } + + if (updatedAttachments.isNotEmpty) { + logger.info( + 'Saving ${updatedAttachments.length} updated attachments', + ); + await context.saveAttachments(updatedAttachments); + } + } + + /// Uploads an attachment from local storage to remote storage. + /// + /// [attachment]: The attachment to upload. + /// Returns the updated attachment with its new state. + Future uploadAttachment(Attachment attachment) async { + logger.info( + 'Starting upload for attachment ${attachment.id}', + ); + try { + if (attachment.localUri == null) { + throw Exception('No localUri for attachment $attachment'); + } + await remoteStorage.uploadFile( + localStorage.readFile(attachment.localUri!), + attachment, + ); + logger.info( + 'Successfully uploaded attachment "${attachment.id}" to Cloud Storage', + ); + return attachment.copyWith( + state: AttachmentState.synced, + hasSynced: true, + ); + } catch (e, st) { + logger.warning( + 'Upload attachment error for attachment $attachment', + e, + st, + ); + if (errorHandler != null) { + final shouldRetry = await errorHandler!.onUploadError(attachment, e); + if (!shouldRetry) { + logger.info( + 'Attachment with ID ${attachment.id} has been archived', + ); + return attachment.copyWith(state: AttachmentState.archived); + } + } + return attachment; + } + } + + /// Downloads an attachment from remote storage and saves it to local storage. + /// + /// [attachment]: The attachment to download. + /// Returns the updated attachment with its new state. + Future downloadAttachment(Attachment attachment) async { + logger.info( + 'Starting download for attachment ${attachment.id}', + ); + final attachmentPath = attachment.filename; + try { + final fileStream = await remoteStorage.downloadFile(attachment); + await localStorage.saveFile( + attachmentPath, + fileStream.map((chunk) => Uint8List.fromList(chunk)), + ); + logger.info( + 'Successfully downloaded file "${attachment.id}"', + ); + + return attachment.copyWith( + localUri: attachmentPath, + state: AttachmentState.synced, + hasSynced: true, + ); + } catch (e, st) { + if (errorHandler != null) { + final shouldRetry = await errorHandler!.onDownloadError(attachment, e); + if (!shouldRetry) { + logger.info( + 'Attachment with ID ${attachment.id} has been archived', + ); + return attachment.copyWith(state: AttachmentState.archived); + } + } + logger.warning( + 'Download attachment error for attachment $attachment', + e, + st, + ); + return attachment; + } + } + + /// Deletes an attachment from remote and local storage, and removes it from the queue. + /// + /// [attachment]: The attachment to delete. + /// Returns the updated attachment with its new state. + Future deleteAttachment(Attachment attachment) async { + try { + logger.info( + 'Deleting attachment ${attachment.id} from remote storage', + ); + await remoteStorage.deleteFile(attachment); + + if (attachment.localUri != null && + await localStorage.fileExists(attachment.localUri!)) { + await localStorage.deleteFile(attachment.localUri!); + } + return attachment.copyWith(state: AttachmentState.archived); + } catch (e, st) { + if (errorHandler != null) { + final shouldRetry = await errorHandler!.onDeleteError(attachment, e); + if (!shouldRetry) { + logger.info('Attachment with ID ${attachment.id} has been archived'); + return attachment.copyWith(state: AttachmentState.archived); + } + } + logger.warning('Error deleting attachment: $e', e, st); + return attachment; + } + } + + /// Deletes archived attachments from local storage. + /// + /// [context]: The attachment context used to retrieve and manage archived attachments. + /// Returns `true` if all archived attachments were successfully deleted, `false` otherwise. + Future deleteArchivedAttachments(AbstractAttachmentContext context) async { + return context.deleteArchivedAttachments((pendingDelete) async { + for (final attachment in pendingDelete) { + if (attachment.localUri == null) continue; + if (!await localStorage.fileExists(attachment.localUri!)) continue; + await localStorage.deleteFile(attachment.localUri!); + } + }); + } +} diff --git a/packages/powersync_attachments_stream/pubspec.lock b/packages/powersync_attachments_stream/pubspec.lock index ae003e13..c10c760c 100644 --- a/packages/powersync_attachments_stream/pubspec.lock +++ b/packages/powersync_attachments_stream/pubspec.lock @@ -26,7 +26,7 @@ packages: source: hosted version: "2.7.0" async: - dependency: transitive + dependency: "direct main" description: name: async sha256: "758e6d74e971c3e5aceb4110bfd6698efc7f501675bcfe0c775459a8140750eb" @@ -398,10 +398,9 @@ packages: powersync_core: dependency: "direct main" description: - name: powersync_core - sha256: d8ae292bc77f0a96f44c6cc2911d1b781760a87a919e5045e75458cba83bb759 - url: "https://pub.dev" - source: hosted + path: "../powersync_core" + relative: true + source: path version: "1.5.0" pub_semver: dependency: transitive @@ -568,6 +567,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.6.8" + test_descriptor: + dependency: "direct dev" + description: + name: test_descriptor + sha256: "9ce468c97ae396e8440d26bb43763f84e2a2a5331813ee5a397cb4da481aaf9a" + url: "https://pub.dev" + source: hosted + version: "2.0.2" typed_data: dependency: transitive description: diff --git a/packages/powersync_attachments_stream/pubspec.yaml b/packages/powersync_attachments_stream/pubspec.yaml new file mode 100644 index 00000000..cdb51763 --- /dev/null +++ b/packages/powersync_attachments_stream/pubspec.yaml @@ -0,0 +1,25 @@ +name: powersync_attachments_stream +description: "A helper library for handling attachments when using PowerSync." +version: 0.0.1 +repository: https://github.com/powersync-ja/powersync.dart +homepage: https://www.powersync.com/ + +environment: + sdk: ^3.8.1 + +dependencies: + flutter: + sdk: flutter + path: ^1.8.0 + async: ^2.11.0 + powersync_core: ^1.4.1 + logging: ^1.2.0 + sqlite_async: ^0.11.0 + path_provider: ^2.0.13 + +dev_dependencies: + flutter_test: + sdk: flutter + test: ^1.25.15 + test_descriptor: ^2.0.2 + flutter_lints: ^5.0.0 diff --git a/packages/powersync_attachments_stream/test/local_storage_test.dart b/packages/powersync_attachments_stream/test/local_storage_test.dart new file mode 100644 index 00000000..fb1bfc49 --- /dev/null +++ b/packages/powersync_attachments_stream/test/local_storage_test.dart @@ -0,0 +1,369 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:test/test.dart'; +import 'package:path/path.dart' as p; +import 'package:powersync_attachments_stream/src/storage/io_local_storage.dart'; +import 'package:test_descriptor/test_descriptor.dart' as d; + +void main() { + group('IOLocalStorage', () { + late IOLocalStorage storage; + + setUp(() async { + storage = IOLocalStorage(d.sandbox); + }); + + tearDown(() async { + // Clean up is handled automatically by test_descriptor + // No manual cleanup needed + }); + + group('saveFile and readFile', () { + test('saves and reads binary data stream successfully', () async { + const filePath = 'test_file'; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); + final dataStream = Stream.fromIterable([data]); + + final size = await storage.saveFile(filePath, dataStream); + expect(size, equals(data.length)); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, equals([data])); + + // Assert filesystem state using test_descriptor + await d.file(filePath, data).validate(); + }); + + test('throws when reading non-existent file', () async { + const filePath = 'non_existent'; + expect( + () => storage.readFile(filePath).toList(), + throwsA(isA()), + ); + + // Assert file does not exist using Dart's File API + expect(await File(p.join(d.sandbox, filePath)).exists(), isFalse); + }); + + test('creates parent directories if they do not exist', () async { + const filePath = 'subdir/nested/test'; + final nonExistentDir = Directory(p.join(d.sandbox, 'subdir', 'nested')); + final data = Uint8List.fromList([1, 2, 3]); + final dataStream = Stream.fromIterable([data]); + + expect(await nonExistentDir.exists(), isFalse); + + final size = await storage.saveFile(filePath, dataStream); + expect(size, equals(data.length)); + expect(await nonExistentDir.exists(), isTrue); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, equals([data])); + + // Assert directory structure + await d.dir('subdir/nested', [d.file('test', data)]).validate(); + }); + + test('creates all parent directories for deeply nested file', () async { + const filePath = 'a/b/c/d/e/f/g/h/i/j/testfile'; + final nestedDir = Directory( + p.join(d.sandbox, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'), + ); + final data = Uint8List.fromList([42, 43, 44]); + final dataStream = Stream.fromIterable([data]); + + expect(await nestedDir.exists(), isFalse); + + final size = await storage.saveFile(filePath, dataStream); + expect(size, equals(data.length)); + expect(await nestedDir.exists(), isTrue); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, equals([data])); + + // Assert deep directory structure + await d.dir('a/b/c/d/e/f/g/h/i/j', [ + d.file('testfile', data), + ]).validate(); + }); + + test('overwrites existing file', () async { + const filePath = 'overwrite_test'; + final originalData = Uint8List.fromList([1, 2, 3]); + final newData = Uint8List.fromList([4, 5, 6, 7]); + final originalStream = Stream.fromIterable([originalData]); + final newStream = Stream.fromIterable([newData]); + + await storage.saveFile(filePath, originalStream); + final size = await storage.saveFile(filePath, newStream); + expect(size, equals(newData.length)); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, equals([newData])); + + // Assert file content + await d.file(filePath, newData).validate(); + }); + }); + + group('edge cases and robustness', () { + test('saveFile with empty stream writes empty file and returns 0 size', () async { + const filePath = 'empty_file'; + final emptyStream = Stream.fromIterable(const []); + + final size = await storage.saveFile(filePath, emptyStream); + expect(size, 0); + + final resultStream = storage.readFile(filePath); + final chunks = await resultStream.toList(); + expect(chunks, isEmpty); + + final file = File(p.join(d.sandbox, filePath)); + expect(await file.exists(), isTrue); + expect(await file.length(), 0); + }); + + test('readFile preserves byte order (chunking may differ)', () async { + const filePath = 'ordered_chunks'; + final chunks = [ + Uint8List.fromList([0, 1, 2]), + Uint8List.fromList([3, 4]), + Uint8List.fromList([5, 6, 7, 8]), + ]; + await storage.saveFile(filePath, Stream.fromIterable(chunks)); + + final outChunks = await storage.readFile(filePath).toList(); + final outBytes = Uint8List.fromList( + outChunks.expand((c) => c).toList(), + ); + final expectedBytes = Uint8List.fromList( + chunks.expand((c) => c).toList(), + ); + expect(outBytes, equals(expectedBytes)); + }); + + test('fileExists becomes false after deleteFile', () async { + const filePath = 'exists_then_delete'; + await storage.saveFile(filePath, Stream.value(Uint8List.fromList([1]))); + expect(await storage.fileExists(filePath), isTrue); + await storage.deleteFile(filePath); + expect(await storage.fileExists(filePath), isFalse); + }); + + test('initialize is idempotent', () async { + await storage.initialize(); + await storage.initialize(); + + // Create a file, then re-initialize again + const filePath = 'idempotent_test'; + await storage.saveFile(filePath, Stream.value(Uint8List.fromList([9]))); + await storage.initialize(); + + // File should still exist (initialize should not clear data) + expect(await storage.fileExists(filePath), isTrue); + }); + + test('clear works even if base directory was removed externally', () async { + await storage.initialize(); + + // Remove the base dir manually + final baseDir = Directory(d.sandbox); + if (await baseDir.exists()) { + await baseDir.delete(recursive: true); + } + + // Calling clear should recreate base dir + await storage.clear(); + expect(await baseDir.exists(), isTrue); + }); + + test('supports unicode and emoji filenames', () async { + const filePath = '測試_файл_📷.bin'; + final bytes = Uint8List.fromList([10, 20, 30, 40]); + await storage.saveFile(filePath, Stream.value(bytes)); + + final out = await storage.readFile(filePath).toList(); + expect(out, equals([bytes])); + + await d.file(filePath, bytes).validate(); + }); + + test('readFile accepts mediaType parameter (ignored by IO impl)', () async { + const filePath = 'with_media_type'; + final data = Uint8List.fromList([1, 2, 3]); + await storage.saveFile(filePath, Stream.value(data)); + + final result = await storage.readFile(filePath, mediaType: 'image/jpeg').toList(); + expect(result, equals([data])); + }); + }); + + group('deleteFile', () { + test('deletes existing file', () async { + const filePath = 'delete_test'; + final data = Uint8List.fromList([1, 2, 3]); + final dataStream = Stream.fromIterable([data]); + + await storage.saveFile(filePath, dataStream); + expect(await storage.fileExists(filePath), isTrue); + + await storage.deleteFile(filePath); + expect(await storage.fileExists(filePath), isFalse); + + // Assert file does not exist + expect(await File(p.join(d.sandbox, filePath)).exists(), isFalse); + }); + + test('does not throw when deleting non-existent file', () async { + const filePath = 'non_existent'; + await storage.deleteFile(filePath); + expect(await File(p.join(d.sandbox, filePath)).exists(), isFalse); + }); + }); + + group('initialize and clear', () { + test('initialize creates the base directory', () async { + final newStorage = IOLocalStorage(p.join(d.sandbox, 'new_dir')); + final baseDir = Directory(p.join(d.sandbox, 'new_dir')); + + expect(await baseDir.exists(), isFalse); + + await newStorage.initialize(); + + expect(await baseDir.exists(), isTrue); + }); + + test('clear removes and recreates the base directory', () async { + await storage.initialize(); + final testFile = p.join(d.sandbox, 'test_file'); + await File(testFile).writeAsString('test'); + + expect(await File(testFile).exists(), isTrue); + + await storage.clear(); + + expect(await Directory(d.sandbox).exists(), isTrue); + expect(await File(testFile).exists(), isFalse); + }); + }); + + group('fileExists', () { + test('returns true for existing file', () async { + const filePath = 'exists_test'; + final data = Uint8List.fromList([1, 2, 3]); + final dataStream = Stream.fromIterable([data]); + + await storage.saveFile(filePath, dataStream); + expect(await storage.fileExists(filePath), isTrue); + + await d.file(filePath, data).validate(); + }); + + test('returns false for non-existent file', () async { + const filePath = 'non_existent'; + expect(await storage.fileExists(filePath), isFalse); + expect(await File(p.join(d.sandbox, filePath)).exists(), isFalse); + }); + }); + + group('file system integration', () { + test('handles special characters in file path', () async { + const filePath = 'file with spaces & symbols!@#'; + final data = Uint8List.fromList([1, 2, 3]); + final dataStream = Stream.fromIterable([data]); + + final size = await storage.saveFile(filePath, dataStream); + expect(size, equals(data.length)); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, equals([data])); + + await d.file(filePath, data).validate(); + }); + + test('handles large binary data stream', () async { + const filePath = 'large_file'; + final data = Uint8List.fromList(List.generate(10000, (i) => i % 256)); + final chunkSize = 1000; + final chunks = []; + for (var i = 0; i < data.length; i += chunkSize) { + chunks.add( + Uint8List.fromList( + data.sublist( + i, + i + chunkSize < data.length ? i + chunkSize : data.length, + ), + ), + ); + } + final dataStream = Stream.fromIterable(chunks); + + final size = await storage.saveFile(filePath, dataStream); + expect(size, equals(data.length)); + + final resultStream = storage.readFile(filePath); + final result = Uint8List.fromList( + (await resultStream.toList()).expand((chunk) => chunk).toList(), + ); + expect(result, equals(data)); + + await d.file(filePath, data).validate(); + }); + }); + + group('concurrent operations', () { + test('handles concurrent saves to different files', () async { + final futures = >[]; + final fileCount = 10; + + for (int i = 0; i < fileCount; i++) { + final data = Uint8List.fromList([i, i + 1, i + 2]); + futures.add(storage.saveFile('file_$i', Stream.fromIterable([data]))); + } + + await Future.wait(futures); + + for (int i = 0; i < fileCount; i++) { + final resultStream = storage.readFile('file_$i'); + final result = await resultStream.toList(); + expect( + result, + equals([ + Uint8List.fromList([i, i + 1, i + 2]), + ]), + ); + await d + .file('file_$i', Uint8List.fromList([i, i + 1, i + 2])) + .validate(); + } + }); + + test('handles concurrent saves to the same file', () async { + const filePath = 'concurrent_test'; + final data1 = Uint8List.fromList([1, 2, 3]); + final data2 = Uint8List.fromList([4, 5, 6]); + final futures = [ + storage.saveFile(filePath, Stream.fromIterable([data1])), + storage.saveFile(filePath, Stream.fromIterable([data2])), + ]; + + await Future.wait(futures); + + final resultStream = storage.readFile(filePath); + final result = await resultStream.toList(); + expect(result, anyOf(equals([data1]), equals([data2]))); + + // Assert one of the possible outcomes + final file = File(p.join(d.sandbox, filePath)); + final fileData = await file.readAsBytes(); + expect(fileData, anyOf(equals(data1), equals(data2))); + }); + }); + }); +} \ No newline at end of file