diff --git a/.changeset/heavy-turkeys-end.md b/.changeset/heavy-turkeys-end.md new file mode 100644 index 00000000..c3cb2cf7 --- /dev/null +++ b/.changeset/heavy-turkeys-end.md @@ -0,0 +1,31 @@ +--- +'@powersync/common': minor +'@powersync/node': minor +'@powersync/web': minor +'@powersync/react-native': minor +--- + +This adds a new (and currently experimental) sync client implementation +implemented natively in the PowerSync SQLite extension. + +This implementation will eventually become the default, but we encourage +interested users to try it out. In particular, we expect that it can improve +sync performance (especially on platforms with challenging JS performance, +like React Native). + +On all our JavaScript SDKs, the new implementation can be enabled with a +sync option entry when connecting: + +```JS +await db.connect(new MyConnector(), { + clientImplementation: SyncClientImplementation.RUST +}); +``` + +Since the new client implements the same protocol, you can also migrate back +to the JavaScript client later by removing the `clientImplementation` option. + +__However__: After enabling the `RUST` client, you cannot downgrade your +PowerSync SDK below this version. When enabled for the first time, databases +will be migrated. The JavaScript sync client from this and later SDK versions +understands the new format, but the client from an older SDK version will not! diff --git a/demos/example-node/src/main.ts b/demos/example-node/src/main.ts index c68c0136..96e86301 100644 --- a/demos/example-node/src/main.ts +++ b/demos/example-node/src/main.ts @@ -1,7 +1,13 @@ import { once } from 'node:events'; import repl_factory from 'node:repl'; -import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node'; +import { + createBaseLogger, + createLogger, + PowerSyncDatabase, + SyncClientImplementation, + SyncStreamConnectionMethod +} from '@powersync/node'; import { exit } from 'node:process'; import { AppSchema, DemoConnector } from './powersync.js'; import { enableUncidiDiagnostics } from './UndiciDiagnostics.js'; @@ -34,7 +40,8 @@ const main = async () => { console.log(await db.get('SELECT powersync_rs_version();')); await db.connect(new DemoConnector(), { - connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET + connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, + clientImplementation: SyncClientImplementation.RUST }); // Example using a proxy agent for more control over the connection: // const proxyAgent = new (await import('undici')).ProxyAgent({ diff --git a/packages/common/package.json b/packages/common/package.json index 6a8d0f92..e14d6195 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -51,7 +51,6 @@ "async-mutex": "^0.4.0", "bson": "^6.6.0", "buffer": "^6.0.3", - "can-ndjson-stream": "^1.0.2", "cross-fetch": "^4.0.0", "event-iterator": "^2.0.0", "rollup": "4.14.3", diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index c7bf525e..76e92e4f 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -59,13 +59,22 @@ export enum PSInternalTable { UNTYPED = 'ps_untyped' } +export enum PowerSyncControlCommand { + PROCESS_TEXT_LINE = 'line_text', + PROCESS_BSON_LINE = 'line_binary', + STOP = 'stop', + START = 'start', + NOTIFY_TOKEN_REFRESHED = 'refreshed_token', + NOTIFY_CRUD_UPLOAD_COMPLETED = 'completed_upload' +} + export interface BucketStorageListener extends BaseListener { crudUpdate: () => void; } export interface BucketStorageAdapter extends BaseObserver, Disposable { init(): Promise; - saveSyncData(batch: SyncDataBatch): Promise; + saveSyncData(batch: SyncDataBatch, fixedKeyFormat?: boolean): Promise; removeBuckets(buckets: string[]): Promise; setTargetCheckpoint(checkpoint: Checkpoint): Promise; @@ -73,6 +82,8 @@ export interface BucketStorageAdapter extends BaseObserver; getBucketOperationProgress(): Promise; + hasMigratedSubkeys(): Promise; + migrateToFixedSubkeys(): Promise; syncLocalDatabase( checkpoint: Checkpoint, @@ -101,4 +112,9 @@ export interface BucketStorageAdapter extends BaseObserver; + + /** + * Invokes the `powersync_control` function for the sync client. + */ + control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise; } diff --git a/packages/common/src/client/sync/bucket/OplogEntry.ts b/packages/common/src/client/sync/bucket/OplogEntry.ts index cc834452..a8e3893a 100644 --- a/packages/common/src/client/sync/bucket/OplogEntry.ts +++ b/packages/common/src/client/sync/bucket/OplogEntry.ts @@ -8,7 +8,7 @@ export interface OplogEntryJSON { object_type?: string; op_id: string; op: OpTypeJSON; - subkey?: string | object; + subkey?: string; } export class OplogEntry { @@ -17,7 +17,7 @@ export class OplogEntry { row.op_id, OpType.fromJSON(row.op), row.checksum, - typeof row.subkey == 'string' ? row.subkey : JSON.stringify(row.subkey), + row.subkey, row.object_type, row.object_id, row.data @@ -28,13 +28,13 @@ export class OplogEntry { public op_id: OpId, public op: OpType, public checksum: number, - public subkey: string, + public subkey?: string, public object_type?: string, public object_id?: string, public data?: string ) {} - toJSON(): OplogEntryJSON { + toJSON(fixedKeyEncoding = false): OplogEntryJSON { return { op_id: this.op_id, op: this.op.toJSON(), @@ -42,7 +42,9 @@ export class OplogEntry { object_id: this.object_id, checksum: this.checksum, data: this.data, - subkey: JSON.stringify(this.subkey) + // Older versions of the JS SDK used to always JSON.stringify here. That has always been wrong, + // but we need to migrate gradually to not break existing databases. + subkey: fixedKeyEncoding ? this.subkey : JSON.stringify(this.subkey) }; } } diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index d626f06d..a2bd3014 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -10,6 +10,7 @@ import { BucketStorageAdapter, BucketStorageListener, Checkpoint, + PowerSyncControlCommand, PSInternalTable, SyncLocalDatabaseResult } from './BucketStorageAdapter.js'; @@ -99,13 +100,13 @@ export class SqliteBucketStorage extends BaseObserver imp return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }])); } - async saveSyncData(batch: SyncDataBatch) { + async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean = false) { await this.writeTransaction(async (tx) => { let count = 0; for (const b of batch.buckets) { const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [ 'save', - JSON.stringify({ buckets: [b.toJSON()] }) + JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] }) ]); this.logger.debug('saveSyncData', JSON.stringify(result)); count += b.data.length; @@ -413,6 +414,32 @@ export class SqliteBucketStorage extends BaseObserver imp async setTargetCheckpoint(checkpoint: Checkpoint) { // No-op for now } + + async control(op: PowerSyncControlCommand, payload: string | ArrayBuffer | null): Promise { + return await this.writeTransaction(async (tx) => { + const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]); + return raw; + }); + } + + async hasMigratedSubkeys(): Promise { + const { r } = await this.db.get<{ r: number }>('SELECT EXISTS(SELECT * FROM ps_kv WHERE key = ?) as r', [ + SqliteBucketStorage._subkeyMigrationKey + ]); + return r != 0; + } + + async migrateToFixedSubkeys(): Promise { + await this.writeTransaction(async (tx) => { + await tx.execute('UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);'); + await tx.execute('INSERT OR REPLACE INTO ps_kv (key, value) VALUES (?, ?);', [ + SqliteBucketStorage._subkeyMigrationKey, + '1' + ]); + }); + } + + static _subkeyMigrationKey = 'powersync_js_migrated_subkeys'; } function hasMatchingPriority(priority: number, bucket: BucketChecksum) { diff --git a/packages/common/src/client/sync/bucket/SyncDataBucket.ts b/packages/common/src/client/sync/bucket/SyncDataBucket.ts index 4f9c6edb..45fe8de2 100644 --- a/packages/common/src/client/sync/bucket/SyncDataBucket.ts +++ b/packages/common/src/client/sync/bucket/SyncDataBucket.ts @@ -37,13 +37,13 @@ export class SyncDataBucket { public next_after?: OpId ) {} - toJSON(): SyncDataBucketJSON { + toJSON(fixedKeyEncoding = false): SyncDataBucketJSON { return { bucket: this.bucket, has_more: this.has_more, after: this.after, next_after: this.next_after, - data: this.data.map((entry) => entry.toJSON()) + data: this.data.map((entry) => entry.toJSON(fixedKeyEncoding)) }; } } diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 1d0b4993..adb4cef8 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -1,6 +1,5 @@ import type { BSON } from 'bson'; import { Buffer } from 'buffer'; -import ndjsonStream from 'can-ndjson-stream'; import { type fetch } from 'cross-fetch'; import Logger, { ILogger } from 'js-logger'; import { RSocket, RSocketConnector, Requestable } from 'rsocket-core'; @@ -253,40 +252,6 @@ export abstract class AbstractRemote { return res.json(); } - async postStreaming( - path: string, - data: any, - headers: Record = {}, - signal?: AbortSignal - ): Promise { - const request = await this.buildRequest(path); - - const res = await this.fetch(request.url, { - method: 'POST', - headers: { ...headers, ...request.headers }, - body: JSON.stringify(data), - signal, - cache: 'no-store' - }).catch((ex) => { - this.logger.error(`Caught ex when POST streaming to ${path}`, ex); - throw ex; - }); - - if (res.status === 401) { - this.invalidateCredentials(); - } - - if (!res.ok) { - const text = await res.text(); - this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`); - const error: any = new Error(`HTTP ${res.statusText}: ${text}`); - error.status = res.status; - throw error; - } - - return res; - } - /** * Provides a BSON implementation. The import nature of this varies depending on the platform */ @@ -297,16 +262,43 @@ export abstract class AbstractRemote { } /** - * Connects to the sync/stream websocket endpoint + * Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events + * sent by the server. */ async socketStream(options: SocketSyncStreamOptions): Promise> { + const bson = await this.getBSON(); + return await this.socketStreamRaw(options, (data) => bson.deserialize(data), bson); + } + + /** + * Returns a data stream of sync line data. + * + * @param map Maps received payload frames to the typed event value. + * @param bson A BSON encoder and decoder. When set, the data stream will be requested with a BSON payload + * (required for compatibility with older sync services). + */ + async socketStreamRaw( + options: SocketSyncStreamOptions, + map: (buffer: Buffer) => T, + bson?: typeof BSON + ): Promise { const { path, fetchStrategy = FetchStrategy.Buffered } = options; + const mimeType = bson == null ? 'application/json' : 'application/bson'; + + function toBuffer(js: any): Buffer { + let contents: any; + if (bson != null) { + contents = bson.serialize(js); + } else { + contents = JSON.stringify(js); + } + + return Buffer.from(contents); + } const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1; const request = await this.buildRequest(path); - const bson = await this.getBSON(); - // Add the user agent in the setup payload - we can't set custom // headers with websockets on web. The browser userAgent is however added // automatically as a header. @@ -323,16 +315,14 @@ export abstract class AbstractRemote { setup: { keepAlive: KEEP_ALIVE_MS, lifetime: KEEP_ALIVE_LIFETIME_MS, - dataMimeType: 'application/bson', - metadataMimeType: 'application/bson', + dataMimeType: mimeType, + metadataMimeType: mimeType, payload: { data: null, - metadata: Buffer.from( - bson.serialize({ - token: request.headers.Authorization, - user_agent: userAgent - }) - ) + metadata: toBuffer({ + token: request.headers.Authorization, + user_agent: userAgent + }) } } }); @@ -377,12 +367,10 @@ export abstract class AbstractRemote { const res = rsocket.requestStream( { - data: Buffer.from(bson.serialize(options.data)), - metadata: Buffer.from( - bson.serialize({ - path - }) - ) + data: toBuffer(options.data), + metadata: toBuffer({ + path + }) }, syncQueueRequestSize, // The initial N amount { @@ -423,8 +411,7 @@ export abstract class AbstractRemote { return; } - const deserializedData = bson.deserialize(data); - stream.enqueueData(deserializedData); + stream.enqueueData(map(data)); }, onComplete: () => { stream.close(); @@ -464,9 +451,18 @@ export abstract class AbstractRemote { } /** - * Connects to the sync/stream http endpoint + * Connects to the sync/stream http endpoint, parsing lines as JSON. */ async postStream(options: SyncStreamOptions): Promise> { + return await this.postStreamRaw(options, (line) => { + return JSON.parse(line) as StreamingSyncLine; + }); + } + + /** + * Connects to the sync/stream http endpoint, mapping and emitting each received string line. + */ + async postStreamRaw(options: SyncStreamOptions, mapLine: (line: string) => T): Promise> { const { data, path, headers, abortSignal } = options; const request = await this.buildRequest(path); @@ -521,11 +517,8 @@ export abstract class AbstractRemote { throw error; } - /** - * The can-ndjson-stream does not handle aborted streams well. - * This will intercept the readable stream and close the stream if - * aborted. - */ + // Create a new stream splitting the response at line endings while also handling cancellations + // by closing the reader. const reader = res.body.getReader(); // This will close the network request and read stream const closeReader = async () => { @@ -541,52 +534,44 @@ export abstract class AbstractRemote { closeReader(); }); - const outputStream = new ReadableStream({ - start: (controller) => { - const processStream = async () => { - while (!abortSignal?.aborted) { - try { - const { done, value } = await reader.read(); - // When no more data needs to be consumed, close the stream - if (done) { - break; - } - // Enqueue the next data chunk into our target stream - controller.enqueue(value); - } catch (ex) { - this.logger.error('Caught exception when reading sync stream', ex); - break; - } - } - if (!abortSignal?.aborted) { - // Close the downstream readable stream - await closeReader(); - } - controller.close(); - }; - processStream(); - } - }); - - const jsonS = ndjsonStream(outputStream); + const decoder = new TextDecoder(); + let buffer = ''; - const stream = new DataStream({ + const stream = new DataStream({ logger: this.logger }); - const r = jsonS.getReader(); - const l = stream.registerListener({ lowWater: async () => { try { - const { done, value } = await r.read(); - // Exit if we're done - if (done) { - stream.close(); - l?.(); - return; + let didCompleteLine = false; + while (!didCompleteLine) { + const { done, value } = await reader.read(); + if (done) { + const remaining = buffer.trim(); + if (remaining.length != 0) { + stream.enqueueData(mapLine(remaining)); + } + + stream.close(); + await closeReader(); + return; + } + + const data = decoder.decode(value, { stream: true }); + buffer += data; + + const lines = buffer.split('\n'); + for (var i = 0; i < lines.length - 1; i++) { + var l = lines[i].trim(); + if (l.length > 0) { + stream.enqueueData(mapLine(l)); + didCompleteLine = true; + } + } + + buffer = lines[lines.length - 1]; } - stream.enqueueData(value); } catch (ex) { stream.close(); throw ex; diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 9997da2f..f24e72e7 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -1,12 +1,19 @@ import Logger, { ILogger } from 'js-logger'; -import { InternalProgressInformation } from 'src/db/crud/SyncProgress.js'; -import { DataStream } from 'src/utils/DataStream.js'; +import { DataStream } from '../../../utils/DataStream.js'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; +import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/crud/SyncProgress.js'; +import * as sync_status from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js'; -import { BucketChecksum, BucketDescription, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; +import { + BucketChecksum, + BucketDescription, + BucketStorageAdapter, + Checkpoint, + PowerSyncControlCommand +} from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; import { AbstractRemote, FetchStrategy, SyncStreamOptions } from './AbstractRemote.js'; @@ -21,6 +28,7 @@ import { isStreamingSyncCheckpointPartiallyComplete, isStreamingSyncData } from './streaming-sync-types.js'; +import { EstablishSyncStream, Instruction, SyncPriorityStatus } from './core-instruction.js'; export enum LockType { CRUD = 'crud', @@ -32,6 +40,50 @@ export enum SyncStreamConnectionMethod { WEB_SOCKET = 'web-socket' } +export enum SyncClientImplementation { + /** + * Decodes and handles sync lines received from the sync service in JavaScript. + * + * This is the default option. + * + * @deprecated Don't use {@link SyncClientImplementation.JAVASCRIPT} directly. Instead, use + * {@link DEFAULT_SYNC_CLIENT_IMPLEMENTATION} or omit the option. The explicit choice to use + * the JavaScript-based sync implementation will be removed from a future version of the SDK. + */ + JAVASCRIPT = 'js', + /** + * This implementation offloads the sync line decoding and handling into the PowerSync + * core extension. + * + * @experimental + * While this implementation is more performant than {@link SyncClientImplementation.JAVASCRIPT}, + * it has seen less real-world testing and is marked as __experimental__ at the moment. + * + * ## Compatibility warning + * + * The Rust sync client stores sync data in a format that is slightly different than the one used + * by the old {@link JAVASCRIPT} implementation. When adopting the {@link RUST} client on existing + * databases, the PowerSync SDK will migrate the format automatically. + * Further, the {@link JAVASCRIPT} client in recent versions of the PowerSync JS SDK (starting from + * the version introducing {@link RUST} as an option) also supports the new format, so you can switch + * back to {@link JAVASCRIPT} later. + * + * __However__: Upgrading the SDK version, then adopting {@link RUST} as a sync client and later + * downgrading the SDK to an older version (necessarily using the JavaScript-based implementation then) + * can lead to sync issues. + */ + RUST = 'rust' +} + +/** + * The default {@link SyncClientImplementation} to use. + * + * Please use this field instead of {@link SyncClientImplementation.JAVASCRIPT} directly. A future version + * of the PowerSync SDK will enable {@link SyncClientImplementation.RUST} by default and remove the JavaScript + * option. + */ +export const DEFAULT_SYNC_CLIENT_IMPLEMENTATION = SyncClientImplementation.JAVASCRIPT; + /** * Abstract Lock to be implemented by various JS environments */ @@ -73,6 +125,15 @@ export interface PowerSyncConnectionOptions extends BaseConnectionOptions, Addit /** @internal */ export interface BaseConnectionOptions { + /** + * Whether to use a JavaScript implementation to handle received sync lines from the sync + * service, or whether this work should be offloaded to the PowerSync core extension. + * + * This defaults to the JavaScript implementation ({@link SyncClientImplementation.JAVASCRIPT}) + * since the ({@link SyncClientImplementation.RUST}) implementation is experimental at the moment. + */ + clientImplementation?: SyncClientImplementation; + /** * The connection method to use when streaming updates from * the PowerSync backend instance. @@ -143,6 +204,7 @@ export type RequiredPowerSyncConnectionOptions = Required export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = { connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, + clientImplementation: DEFAULT_SYNC_CLIENT_IMPLEMENTATION, fetchStrategy: FetchStrategy.Buffered, params: {} }; @@ -162,7 +224,9 @@ export abstract class AbstractStreamingSyncImplementation protected abortController: AbortController | null; protected crudUpdateListener?: () => void; protected streamingSyncPromise?: Promise; + private pendingCrudUpload?: Promise; + private notifyCompletedUploads?: () => void; syncStatus: SyncStatus; triggerCrudUpload: () => void; @@ -189,6 +253,7 @@ export abstract class AbstractStreamingSyncImplementation this.pendingCrudUpload = new Promise((resolve) => { this._uploadAllCrud().finally(() => { + this.notifyCompletedUploads?.(); this.pendingCrudUpload = undefined; resolve(); }); @@ -509,6 +574,31 @@ The next upload iteration will be delayed.`); return [req, localDescriptions]; } + /** + * Older versions of the JS SDK used to encode subkeys as JSON in {@link OplogEntry.toJSON}. + * Because subkeys are always strings, this leads to quotes being added around them in `ps_oplog`. + * While this is not a problem as long as it's done consistently, it causes issues when a database + * created by the JS SDK is used with other SDKs, or (more likely) when the new Rust sync client + * is enabled. + * + * So, we add a migration from the old key format (with quotes) to the new one (no quotes). The + * migration is only triggered when necessary (for now). The function returns whether the new format + * should be used, so that the JS SDK is able to write to updated databases. + * + * @param requireFixedKeyFormat Whether we require the new format or also support the old one. + * The Rust client requires the new subkey format. + * @returns Whether the database is now using the new, fixed subkey format. + */ + private async requireKeyFormat(requireFixedKeyFormat: boolean): Promise { + const hasMigrated = await this.options.adapter.hasMigratedSubkeys(); + if (requireFixedKeyFormat && !hasMigrated) { + await this.options.adapter.migrateToFixedSubkeys(); + return true; + } else { + return hasMigrated; + } + } + protected async streamingSyncIteration(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise { await this.obtainLock({ type: LockType.SYNC, @@ -519,229 +609,397 @@ The next upload iteration will be delayed.`); ...(options ?? {}) }; - this.logger.debug('Streaming sync iteration started'); - this.options.adapter.startSession(); - let [req, bucketMap] = await this.collectLocalBucketState(); + if (resolvedOptions.clientImplementation == SyncClientImplementation.JAVASCRIPT) { + await this.legacyStreamingSyncIteration(signal, resolvedOptions); + } else { + await this.requireKeyFormat(true); + await this.rustSyncIteration(signal, resolvedOptions); + } + } + }); + } + + private async legacyStreamingSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) { + this.logger.debug('Streaming sync iteration started'); + this.options.adapter.startSession(); + let [req, bucketMap] = await this.collectLocalBucketState(); + + // These are compared by reference + let targetCheckpoint: Checkpoint | null = null; + let validatedCheckpoint: Checkpoint | null = null; + let appliedCheckpoint: Checkpoint | null = null; - // These are compared by reference - let targetCheckpoint: Checkpoint | null = null; - let validatedCheckpoint: Checkpoint | null = null; - let appliedCheckpoint: Checkpoint | null = null; + const clientId = await this.options.adapter.getClientId(); + const usingFixedKeyFormat = await this.requireKeyFormat(false); + + this.logger.debug('Requesting stream from server'); + + const syncOptions: SyncStreamOptions = { + path: '/sync/stream', + abortSignal: signal, + data: { + buckets: req, + include_checksum: true, + raw_data: true, + parameters: resolvedOptions.params, + client_id: clientId + } + }; - const clientId = await this.options.adapter.getClientId(); + let stream: DataStream; + if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) { + stream = await this.options.remote.postStream(syncOptions); + } else { + stream = await this.options.remote.socketStream({ + ...syncOptions, + ...{ fetchStrategy: resolvedOptions.fetchStrategy } + }); + } + + this.logger.debug('Stream established. Processing events'); + + while (!stream.closed) { + const line = await stream.read(); + if (!line) { + // The stream has closed while waiting + return; + } - if (signal.aborted) { + // A connection is active and messages are being received + if (!this.syncStatus.connected) { + // There is a connection now + Promise.resolve().then(() => this.triggerCrudUpload()); + this.updateSyncStatus({ + connected: true + }); + } + + if (isStreamingSyncCheckpoint(line)) { + targetCheckpoint = line.checkpoint; + const bucketsToDelete = new Set(bucketMap.keys()); + const newBuckets = new Map(); + for (const checksum of line.checkpoint.buckets) { + newBuckets.set(checksum.bucket, { + name: checksum.bucket, + priority: checksum.priority ?? FALLBACK_PRIORITY + }); + bucketsToDelete.delete(checksum.bucket); + } + if (bucketsToDelete.size > 0) { + this.logger.debug('Removing buckets', [...bucketsToDelete]); + } + bucketMap = newBuckets; + await this.options.adapter.removeBuckets([...bucketsToDelete]); + await this.options.adapter.setTargetCheckpoint(targetCheckpoint); + await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); + } else if (isStreamingSyncCheckpointComplete(line)) { + const result = await this.applyCheckpoint(targetCheckpoint!, signal); + if (result.endIteration) { return; + } else if (result.applied) { + appliedCheckpoint = targetCheckpoint; } + validatedCheckpoint = targetCheckpoint; + } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { + const priority = line.partial_checkpoint_complete.priority; + this.logger.debug('Partial checkpoint complete', priority); + const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!, priority); + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + await new Promise((resolve) => setTimeout(resolve, 50)); + return; + } else if (!result.ready) { + // If we have pending uploads, we can't complete new checkpoints outside of priority 0. + // We'll resolve this for a complete checkpoint. + } else { + // We'll keep on downloading, but can report that this priority is synced now. + this.logger.debug('partial checkpoint validation succeeded'); + + // All states with a higher priority can be deleted since this partial sync includes them. + const priorityStates = this.syncStatus.priorityStatusEntries.filter((s) => s.priority <= priority); + priorityStates.push({ + priority, + lastSyncedAt: new Date(), + hasSynced: true + }); - this.logger.debug('Requesting stream from server'); - - const syncOptions: SyncStreamOptions = { - path: '/sync/stream', - abortSignal: signal, - data: { - buckets: req, - include_checksum: true, - raw_data: true, - parameters: resolvedOptions.params, - client_id: clientId - } + this.updateSyncStatus({ + connected: true, + priorityStatusEntries: priorityStates + }); + } + } else if (isStreamingSyncCheckpointDiff(line)) { + // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint + if (targetCheckpoint == null) { + throw new Error('Checkpoint diff without previous checkpoint'); + } + const diff = line.checkpoint_diff; + const newBuckets = new Map(); + for (const checksum of targetCheckpoint.buckets) { + newBuckets.set(checksum.bucket, checksum); + } + for (const checksum of diff.updated_buckets) { + newBuckets.set(checksum.bucket, checksum); + } + for (const bucket of diff.removed_buckets) { + newBuckets.delete(bucket); + } + + const newCheckpoint: Checkpoint = { + last_op_id: diff.last_op_id, + buckets: [...newBuckets.values()], + write_checkpoint: diff.write_checkpoint }; + targetCheckpoint = newCheckpoint; + await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); + + bucketMap = new Map(); + newBuckets.forEach((checksum, name) => + bucketMap.set(name, { + name: checksum.bucket, + priority: checksum.priority ?? FALLBACK_PRIORITY + }) + ); + + const bucketsToDelete = diff.removed_buckets; + if (bucketsToDelete.length > 0) { + this.logger.debug('Remove buckets', bucketsToDelete); + } + await this.options.adapter.removeBuckets(bucketsToDelete); + await this.options.adapter.setTargetCheckpoint(targetCheckpoint); + } else if (isStreamingSyncData(line)) { + const { data } = line; + const previousProgress = this.syncStatus.dataFlowStatus.downloadProgress; + let updatedProgress: InternalProgressInformation | null = null; + if (previousProgress) { + updatedProgress = { ...previousProgress }; + const progressForBucket = updatedProgress[data.bucket]; + if (progressForBucket) { + updatedProgress[data.bucket] = { + ...progressForBucket, + since_last: progressForBucket.since_last + data.data.length + }; + } + } - let stream: DataStream; - if (resolvedOptions?.connectionMethod == SyncStreamConnectionMethod.HTTP) { - stream = await this.options.remote.postStream(syncOptions); - } else { - stream = await this.options.remote.socketStream({ - ...syncOptions, - ...{ fetchStrategy: resolvedOptions.fetchStrategy } + this.updateSyncStatus({ + dataFlow: { + downloading: true, + downloadProgress: updatedProgress + } + }); + await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] }, usingFixedKeyFormat); + } else if (isStreamingKeepalive(line)) { + const remaining_seconds = line.token_expires_in; + if (remaining_seconds == 0) { + // Connection would be closed automatically right after this + this.logger.debug('Token expiring; reconnect'); + /** + * For a rare case where the backend connector does not update the token + * (uses the same one), this should have some delay. + */ + await this.delayRetry(); + return; + } else if (remaining_seconds < 30) { + this.logger.debug('Token will expire soon; reconnect'); + // Pre-emptively refresh the token + this.options.remote.invalidateCredentials(); + return; + } + this.triggerCrudUpload(); + } else { + this.logger.debug('Sync complete'); + + if (targetCheckpoint === appliedCheckpoint) { + this.updateSyncStatus({ + connected: true, + lastSyncedAt: new Date(), + priorityStatusEntries: [], + dataFlow: { + downloadError: undefined + } }); + } else if (validatedCheckpoint === targetCheckpoint) { + const result = await this.applyCheckpoint(targetCheckpoint!, signal); + if (result.endIteration) { + return; + } else if (result.applied) { + appliedCheckpoint = targetCheckpoint; + } } + } + } + this.logger.debug('Stream input empty'); + // Connection closed. Likely due to auth issue. + return; + } + + private async rustSyncIteration(signal: AbortSignal, resolvedOptions: RequiredPowerSyncConnectionOptions) { + const syncImplementation = this; + const adapter = this.options.adapter; + const remote = this.options.remote; + let receivingLines: Promise | null = null; + + const abortController = new AbortController(); + signal.addEventListener('abort', () => abortController.abort()); + + // Pending sync lines received from the service, as well as local events that trigger a powersync_control + // invocation (local events include refreshed tokens and completed uploads). + // This is a single data stream so that we can handle all control calls from a single place. + let controlInvocations: DataStream<{ + command: PowerSyncControlCommand; + payload?: ArrayBuffer | string; + }> | null = null; + + async function connect(instr: EstablishSyncStream) { + const syncOptions: SyncStreamOptions = { + path: '/sync/stream', + abortSignal: abortController.signal, + data: instr.request + }; - this.logger.debug('Stream established. Processing events'); + if (resolvedOptions.connectionMethod == SyncStreamConnectionMethod.HTTP) { + controlInvocations = await remote.postStreamRaw(syncOptions, (line) => ({ + command: PowerSyncControlCommand.PROCESS_TEXT_LINE, + payload: line + })); + } else { + controlInvocations = await remote.socketStreamRaw( + { + ...syncOptions, + fetchStrategy: resolvedOptions.fetchStrategy + }, + (buffer) => ({ + command: PowerSyncControlCommand.PROCESS_BSON_LINE, + payload: buffer + }) + ); + } - while (!stream.closed) { - const line = await stream.read(); - if (!line) { - // The stream has closed while waiting + try { + while (!controlInvocations.closed) { + const line = await controlInvocations.read(); + if (line == null) { return; } - // A connection is active and messages are being received - if (!this.syncStatus.connected) { - // There is a connection now - Promise.resolve().then(() => this.triggerCrudUpload()); - this.updateSyncStatus({ - connected: true - }); - } + await control(line.command, line.payload); + } + } finally { + const activeInstructions = controlInvocations; + // We concurrently add events to the active data stream when e.g. a CRUD upload is completed or a token is + // refreshed. That would throw after closing (and we can't handle those events either way), so set this back + // to null. + controlInvocations = null; + await activeInstructions.close(); + } + } - if (isStreamingSyncCheckpoint(line)) { - targetCheckpoint = line.checkpoint; - const bucketsToDelete = new Set(bucketMap.keys()); - const newBuckets = new Map(); - for (const checksum of line.checkpoint.buckets) { - newBuckets.set(checksum.bucket, { - name: checksum.bucket, - priority: checksum.priority ?? FALLBACK_PRIORITY - }); - bucketsToDelete.delete(checksum.bucket); - } - if (bucketsToDelete.size > 0) { - this.logger.debug('Removing buckets', [...bucketsToDelete]); - } - bucketMap = newBuckets; - await this.options.adapter.removeBuckets([...bucketsToDelete]); - await this.options.adapter.setTargetCheckpoint(targetCheckpoint); - await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); - } else if (isStreamingSyncCheckpointComplete(line)) { - const result = await this.applyCheckpoint(targetCheckpoint!, signal); - if (result.endIteration) { - return; - } else if (result.applied) { - appliedCheckpoint = targetCheckpoint; - } - validatedCheckpoint = targetCheckpoint; - } else if (isStreamingSyncCheckpointPartiallyComplete(line)) { - const priority = line.partial_checkpoint_complete.priority; - this.logger.debug('Partial checkpoint complete', priority); - const result = await this.options.adapter.syncLocalDatabase(targetCheckpoint!, priority); - if (!result.checkpointValid) { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - await new Promise((resolve) => setTimeout(resolve, 50)); - return; - } else if (!result.ready) { - // If we have pending uploads, we can't complete new checkpoints outside of priority 0. - // We'll resolve this for a complete checkpoint. - } else { - // We'll keep on downloading, but can report that this priority is synced now. - this.logger.debug('partial checkpoint validation succeeded'); - - // All states with a higher priority can be deleted since this partial sync includes them. - const priorityStates = this.syncStatus.priorityStatusEntries.filter((s) => s.priority <= priority); - priorityStates.push({ - priority, - lastSyncedAt: new Date(), - hasSynced: true - }); + async function stop() { + await control(PowerSyncControlCommand.STOP); + } - this.updateSyncStatus({ - connected: true, - priorityStatusEntries: priorityStates - }); - } - } else if (isStreamingSyncCheckpointDiff(line)) { - // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint - if (targetCheckpoint == null) { - throw new Error('Checkpoint diff without previous checkpoint'); - } - const diff = line.checkpoint_diff; - const newBuckets = new Map(); - for (const checksum of targetCheckpoint.buckets) { - newBuckets.set(checksum.bucket, checksum); - } - for (const checksum of diff.updated_buckets) { - newBuckets.set(checksum.bucket, checksum); - } - for (const bucket of diff.removed_buckets) { - newBuckets.delete(bucket); - } + async function control(op: PowerSyncControlCommand, payload?: ArrayBuffer | string) { + const rawResponse = await adapter.control(op, payload ?? null); + await handleInstructions(JSON.parse(rawResponse)); + } - const newCheckpoint: Checkpoint = { - last_op_id: diff.last_op_id, - buckets: [...newBuckets.values()], - write_checkpoint: diff.write_checkpoint - }; - targetCheckpoint = newCheckpoint; - await this.updateSyncStatusForStartingCheckpoint(targetCheckpoint); - - bucketMap = new Map(); - newBuckets.forEach((checksum, name) => - bucketMap.set(name, { - name: checksum.bucket, - priority: checksum.priority ?? FALLBACK_PRIORITY - }) - ); + async function handleInstruction(instruction: Instruction) { + if ('LogLine' in instruction) { + switch (instruction.LogLine.severity) { + case 'DEBUG': + syncImplementation.logger.debug(instruction.LogLine.line); + break; + case 'INFO': + syncImplementation.logger.info(instruction.LogLine.line); + break; + case 'WARNING': + syncImplementation.logger.warn(instruction.LogLine.line); + break; + } + } else if ('UpdateSyncStatus' in instruction) { + function coreStatusToJs(status: SyncPriorityStatus): sync_status.SyncPriorityStatus { + return { + priority: status.priority, + hasSynced: status.has_synced ?? undefined, + lastSyncedAt: status?.last_synced_at != null ? new Date(status!.last_synced_at!) : undefined + }; + } - const bucketsToDelete = diff.removed_buckets; - if (bucketsToDelete.length > 0) { - this.logger.debug('Remove buckets', bucketsToDelete); - } - await this.options.adapter.removeBuckets(bucketsToDelete); - await this.options.adapter.setTargetCheckpoint(targetCheckpoint); - } else if (isStreamingSyncData(line)) { - const { data } = line; - const previousProgress = this.syncStatus.dataFlowStatus.downloadProgress; - let updatedProgress: InternalProgressInformation | null = null; - if (previousProgress) { - updatedProgress = { ...previousProgress }; - const progressForBucket = updatedProgress[data.bucket]; - if (progressForBucket) { - updatedProgress[data.bucket] = { - ...progressForBucket, - sinceLast: Math.min( - progressForBucket.sinceLast + data.data.length, - progressForBucket.targetCount - progressForBucket.atLast - ) - }; - } - } + const info = instruction.UpdateSyncStatus.status; + const coreCompleteSync = info.priority_status.find((s) => s.priority == FULL_SYNC_PRIORITY); + const completeSync = coreCompleteSync != null ? coreStatusToJs(coreCompleteSync) : null; - this.updateSyncStatus({ - dataFlow: { - downloading: true, - downloadProgress: updatedProgress - } - }); - await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] }); - } else if (isStreamingKeepalive(line)) { - const remaining_seconds = line.token_expires_in; - if (remaining_seconds == 0) { - // Connection would be closed automatically right after this - this.logger.debug('Token expiring; reconnect'); - this.options.remote.invalidateCredentials(); - - /** - * For a rare case where the backend connector does not update the token - * (uses the same one), this should have some delay. - */ - await this.delayRetry(); - return; - } else if (remaining_seconds < 30) { - this.logger.debug('Token will expire soon; reconnect'); - // Pre-emptively refresh the token - this.options.remote.invalidateCredentials(); - return; - } - this.triggerCrudUpload(); - } else { - this.logger.debug('Sync complete'); + syncImplementation.updateSyncStatus({ + connected: info.connected, + connecting: info.connecting, + dataFlow: { + downloading: info.downloading != null, + downloadProgress: info.downloading?.buckets + }, + lastSyncedAt: completeSync?.lastSyncedAt, + hasSynced: completeSync?.hasSynced, + priorityStatusEntries: info.priority_status.map(coreStatusToJs) + }); + } else if ('EstablishSyncStream' in instruction) { + if (receivingLines != null) { + // Already connected, this shouldn't happen during a single iteration. + throw 'Unexpected request to establish sync stream, already connected'; + } - if (targetCheckpoint === appliedCheckpoint) { - this.updateSyncStatus({ - connected: true, - lastSyncedAt: new Date(), - priorityStatusEntries: [], - dataFlow: { - downloadError: undefined - } - }); - } else if (validatedCheckpoint === targetCheckpoint) { - const result = await this.applyCheckpoint(targetCheckpoint!, signal); - if (result.endIteration) { - return; - } else if (result.applied) { - appliedCheckpoint = targetCheckpoint; - } + receivingLines = connect(instruction.EstablishSyncStream); + } else if ('FetchCredentials' in instruction) { + if (instruction.FetchCredentials.did_expire) { + remote.invalidateCredentials(); + } else { + remote.invalidateCredentials(); + + // Restart iteration after the credentials have been refreshed. + remote.fetchCredentials().then( + (_) => { + controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_TOKEN_REFRESHED }); + }, + (err) => { + syncImplementation.logger.warn('Could not prefetch credentials', err); } - } + ); } - this.logger.debug('Stream input empty'); - // Connection closed. Likely due to auth issue. - return; + } else if ('CloseSyncStream' in instruction) { + abortController.abort(); + } else if ('FlushFileSystem' in instruction) { + // Not necessary on JS platforms. + } else if ('DidCompleteSync' in instruction) { + syncImplementation.updateSyncStatus({ + dataFlow: { + downloadError: undefined + } + }); } - }); + } + + async function handleInstructions(instructions: Instruction[]) { + for (const instr of instructions) { + await handleInstruction(instr); + } + } + + try { + await control( + PowerSyncControlCommand.START, + JSON.stringify({ + parameters: resolvedOptions.params + }) + ); + + this.notifyCompletedUploads = () => { + controlInvocations?.enqueueData({ command: PowerSyncControlCommand.NOTIFY_CRUD_UPLOAD_COMPLETED }); + }; + await receivingLines; + } finally { + this.notifyCompletedUploads = undefined; + await stop(); + } } private async updateSyncStatusForStartingCheckpoint(checkpoint: Checkpoint) { @@ -758,9 +1016,9 @@ The next upload iteration will be delayed.`); // The fallback priority doesn't matter here, but 3 is the one newer versions of the sync service // will use by default. priority: bucket.priority ?? 3, - atLast: atLast, - sinceLast: sinceLast, - targetCount: bucket.count ?? 0 + at_last: atLast, + since_last: sinceLast, + target_count: bucket.count ?? 0 }; if (bucket.count != null && bucket.count < atLast + sinceLast) { @@ -774,8 +1032,8 @@ The next upload iteration will be delayed.`); if (invalidated) { for (const bucket in progress) { const bucketProgress = progress[bucket]; - bucketProgress.atLast = 0; - bucketProgress.sinceLast = 0; + bucketProgress.at_last = 0; + bucketProgress.since_last = 0; } } diff --git a/packages/common/src/client/sync/stream/core-instruction.ts b/packages/common/src/client/sync/stream/core-instruction.ts new file mode 100644 index 00000000..db1f56e8 --- /dev/null +++ b/packages/common/src/client/sync/stream/core-instruction.ts @@ -0,0 +1,55 @@ +import { StreamingSyncRequest } from './streaming-sync-types.js'; + +/** + * An internal instruction emitted by the sync client in the core extension in response to the JS + * SDK passing sync data into the extension. + */ +export type Instruction = + | { LogLine: LogLine } + | { UpdateSyncStatus: UpdateSyncStatus } + | { EstablishSyncStream: EstablishSyncStream } + | { FetchCredentials: FetchCredentials } + | { CloseSyncStream: any } + | { FlushFileSystem: any } + | { DidCompleteSync: any }; + +export interface LogLine { + severity: 'DEBUG' | 'INFO' | 'WARNING'; + line: string; +} + +export interface EstablishSyncStream { + request: StreamingSyncRequest; +} + +export interface UpdateSyncStatus { + status: CoreSyncStatus; +} + +export interface CoreSyncStatus { + connected: boolean; + connecting: boolean; + priority_status: SyncPriorityStatus[]; + downloading: DownloadProgress | null; +} + +export interface SyncPriorityStatus { + priority: number; + last_synced_at: number | number; + has_synced: boolean | null; +} + +export interface DownloadProgress { + buckets: Record; +} + +export interface BucketProgress { + priority: number; + at_last: number; + since_last: number; + target_count: number; +} + +export interface FetchCredentials { + did_expire: boolean; +} diff --git a/packages/common/src/db/crud/SyncProgress.ts b/packages/common/src/db/crud/SyncProgress.ts index 63e9a938..2de28821 100644 --- a/packages/common/src/db/crud/SyncProgress.ts +++ b/packages/common/src/db/crud/SyncProgress.ts @@ -1,16 +1,9 @@ +import { BucketProgress } from 'src/client/sync/stream/core-instruction.js'; import type { SyncStatus } from './SyncStatus.js'; // (bucket, progress) pairs /** @internal */ -export type InternalProgressInformation = Record< - string, - { - priority: number; // Priority of the associated buckets - atLast: number; // Total ops at last completed sync, or 0 - sinceLast: number; // Total ops _since_ the last completed sync. - targetCount: number; // Total opcount for next checkpoint as indicated by service. - } ->; +export type InternalProgressInformation = Record; /** * @internal The priority used by the core extension to indicate that a full sync was completed. @@ -38,7 +31,7 @@ export interface ProgressWithOperations { * Relative progress, as {@link downloadedOperations} of {@link totalOperations}. * * This will be a number between `0.0` and `1.0` (inclusive). - * + * * When this number reaches `1.0`, all changes have been received from the sync service. * Actually applying these changes happens before the `downloadProgress` field is cleared from * {@link SyncStatus}, so progress can stay at `1.0` for a short while before completing. @@ -92,8 +85,8 @@ export class SyncProgress implements ProgressWithOperations { for (const progress of Object.values(this.internal)) { // Include higher-priority buckets, which are represented by lower numbers. if (progress.priority <= priority) { - downloaded += progress.sinceLast; - total += progress.targetCount - progress.atLast; + downloaded += progress.since_last; + total += progress.target_count - progress.at_last; } } diff --git a/packages/node/tests/sync.test.ts b/packages/node/tests/sync.test.ts index a1acc8e5..14e12349 100644 --- a/packages/node/tests/sync.test.ts +++ b/packages/node/tests/sync.test.ts @@ -5,7 +5,9 @@ import { AbstractPowerSyncDatabase, BucketChecksum, OplogEntryJSON, + PowerSyncConnectionOptions, ProgressWithOperations, + SyncClientImplementation, SyncStreamConnectionMethod } from '@powersync/common'; import Logger from 'js-logger'; @@ -13,6 +15,103 @@ import Logger from 'js-logger'; Logger.useDefaults({ defaultLevel: Logger.WARN }); describe('Sync', () => { + describe('js client', () => { + defineSyncTests(SyncClientImplementation.JAVASCRIPT); + }); + + describe('rust client', () => { + defineSyncTests(SyncClientImplementation.RUST); + }); + + mockSyncServiceTest('can migrate between sync implementations', async ({ syncService }) => { + function addData(id: string) { + syncService.pushLine({ + data: { + bucket: 'a', + data: [ + { + checksum: 0, + op_id: id, + op: 'PUT', + object_id: id, + object_type: 'lists', + subkey: `subkey_${id}`, + data: '{}' + } + ] + } + }); + } + const checkpoint = { + checkpoint: { + last_op_id: '3', + buckets: [bucket('a', 3)] + } + }; + + let database = await syncService.createDatabase(); + database.connect(new TestConnector(), { + clientImplementation: SyncClientImplementation.JAVASCRIPT, + connectionMethod: SyncStreamConnectionMethod.HTTP + }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + syncService.pushLine(checkpoint); + addData('1'); + + await vi.waitFor(async () => { + expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(1); + }); + await database.disconnect(); + // The JavaScript client encodes subkeys to JSON when it shouldn't... + expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([ + expect.objectContaining({ key: 'lists/1/"subkey_1"' }) + ]); + + // Connecting again with the new client should fix the format + database.connect(new TestConnector(), { + clientImplementation: SyncClientImplementation.RUST, + connectionMethod: SyncStreamConnectionMethod.HTTP + }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + syncService.pushLine(checkpoint); + addData('2'); + await vi.waitFor(async () => { + expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(2); + }); + await database.disconnect(); + expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([ + // Existing entry should be fixed too! + expect.objectContaining({ key: 'lists/1/subkey_1' }), + expect.objectContaining({ key: 'lists/2/subkey_2' }) + ]); + + // Finally, connecting with JS again should keep the fixed subkey format. + database.connect(new TestConnector(), { + clientImplementation: SyncClientImplementation.RUST, + connectionMethod: SyncStreamConnectionMethod.HTTP + }); + await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); + syncService.pushLine(checkpoint); + addData('3'); + await vi.waitFor(async () => { + expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(3); + }); + await database.disconnect(); + expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([ + // Existing entry should be fixed too! + expect.objectContaining({ key: 'lists/1/subkey_1' }), + expect.objectContaining({ key: 'lists/2/subkey_2' }), + expect.objectContaining({ key: 'lists/3/subkey_3' }) + ]); + }); +}); + +function defineSyncTests(impl: SyncClientImplementation) { + const options: PowerSyncConnectionOptions = { + clientImplementation: impl, + connectionMethod: SyncStreamConnectionMethod.HTTP + }; + describe('reports progress', () => { let lastOpId = 0; @@ -60,7 +159,7 @@ describe('Sync', () => { mockSyncServiceTest('without priorities', async ({ syncService }) => { const database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -95,7 +194,7 @@ describe('Sync', () => { mockSyncServiceTest('interrupted sync', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -115,7 +214,7 @@ describe('Sync', () => { // And open a new one database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); // Send same checkpoint again @@ -134,7 +233,7 @@ describe('Sync', () => { mockSyncServiceTest('interrupted sync with new checkpoint', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -152,7 +251,7 @@ describe('Sync', () => { await database.close(); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); // Send checkpoint with new data @@ -170,7 +269,7 @@ describe('Sync', () => { mockSyncServiceTest('different priorities', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -272,7 +371,7 @@ describe('Sync', () => { mockSyncServiceTest('uses correct state when reconnecting', async ({ syncService }) => { let database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); syncService.pushLine({ @@ -291,7 +390,7 @@ describe('Sync', () => { await database.close(); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0)); database = await syncService.createDatabase(); - database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP }); + database.connect(new TestConnector(), options); await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1)); expect(syncService.connectedListeners[0].buckets).toStrictEqual([ @@ -337,7 +436,7 @@ describe('Sync', () => { await waitForSyncStatus(database, (s) => s.downloadProgress == null); }); }); -}); +} function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum { return { @@ -354,6 +453,10 @@ async function waitForProgress( forPriorities: [number, [number, number]][] = [] ) { await waitForSyncStatus(database, (status) => { + if (status.dataFlowStatus.downloadError != null) { + throw `Unexpected sync error: ${status.dataFlowStatus.downloadError}`; + } + const progress = status.downloadProgress; if (!progress) { return false; diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 6e85a3b9..9eebc48d 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -1,12 +1,15 @@ import os from 'node:os'; import fs from 'node:fs/promises'; import path from 'node:path'; +import { ReadableStream, TransformStream } from 'node:stream/web'; + import { onTestFinished, test } from 'vitest'; import { AbstractPowerSyncDatabase, column, NodePowerSyncDatabaseOptions, PowerSyncBackendConnector, + PowerSyncConnectionOptions, PowerSyncCredentials, PowerSyncDatabase, Schema, @@ -73,11 +76,13 @@ export const databaseTest = tempDirectoryTest.extend<{ database: PowerSyncDataba }); // TODO: Unify this with the test setup for the web SDK. -export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockSyncService }>({ +export const mockSyncServiceTest = tempDirectoryTest.extend<{ + syncService: MockSyncService; +}>({ syncService: async ({ tmpdir }, use) => { interface Listener { - request: any, - stream: ReadableStreamDefaultController, + request: any; + stream: ReadableStreamDefaultController; } const listeners: Listener[] = []; @@ -92,7 +97,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS start(controller) { listener = { request: body, - stream: controller, + stream: controller }; listeners.push(listener); @@ -110,7 +115,7 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ syncService: MockS } }); - return new Response(syncLines.pipeThrough(asLines), { status: 200 }); + return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 }); } else { return new Response('Not found', { status: 404 }); } @@ -164,16 +169,21 @@ export function waitForSyncStatus( database: AbstractPowerSyncDatabase, matcher: (status: SyncStatus) => boolean ): Promise { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { if (matcher(database.currentStatus)) { return resolve(); } const dispose = database.registerListener({ statusChanged: (status) => { - if (matcher(status)) { + try { + if (matcher(status)) { + dispose(); + resolve(); + } + } catch (e) { + reject(e); dispose(); - resolve(); } } }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5fab235f..cd2d05ad 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1682,9 +1682,6 @@ importers: buffer: specifier: ^6.0.3 version: 6.0.3 - can-ndjson-stream: - specifier: ^1.0.2 - version: 1.0.2 cross-fetch: specifier: ^4.0.0 version: 4.1.0(encoding@0.1.13) @@ -10398,12 +10395,6 @@ packages: resolution: {integrity: sha512-xlx1yCK2Oc1APsPXDL2LdlNP6+uu8OCDdhOBSVT279M/S+y75O30C2VuD8T2ogdePBBl7PfPF4504tnLgX3zfw==} engines: {node: '>=14.16'} - can-namespace@1.0.0: - resolution: {integrity: sha512-1sBY/SLwwcmxz3NhyVhLjt2uD/dZ7V1mII82/MIXSDn5QXnslnosJnjlP8+yTx2uTCRvw1jlFDElRs4pX7AG5w==} - - can-ndjson-stream@1.0.2: - resolution: {integrity: sha512-//tM8wcTV42SyD1JGua7WMVftZEeTwapcHJTTe3vJwuVywXD01CJbdEkgwRYjy2evIByVJV21ZKBdSv5ygIw1w==} - can-write-to-dir@1.1.1: resolution: {integrity: sha512-eOgiEWqjppB+3DN/5E82EQ8dTINus8d9GXMCbEsUnp2hcUIcXmBvzWmD3tXMk3CuBK0v+ddK9qw0EAF+JVRMjQ==} engines: {node: '>=10.13'} @@ -32899,12 +32890,6 @@ snapshots: camelcase@7.0.1: {} - can-namespace@1.0.0: {} - - can-ndjson-stream@1.0.2: - dependencies: - can-namespace: 1.0.0 - can-write-to-dir@1.1.1: dependencies: path-temp: 2.1.0