From d21f2eb8633756b8bba38464fdf50edd592ae7fb Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 20 Sep 2024 09:53:59 +0200 Subject: [PATCH 1/3] fix: crud upload queue monitoring --- .changeset/afraid-apples-learn.md | 7 ++ .../AbstractStreamingSyncImplementation.ts | 4 +- packages/web/src/db/PowerSyncDatabase.ts | 2 +- packages/web/tests/stream.test.ts | 60 ++++++--- packages/web/tests/uploads.test.ts | 116 ++++++++++++++++++ .../web/tests/utils/MockStreamOpenFactory.ts | 3 +- 6 files changed, 170 insertions(+), 22 deletions(-) create mode 100644 .changeset/afraid-apples-learn.md create mode 100644 packages/web/tests/uploads.test.ts diff --git a/.changeset/afraid-apples-learn.md b/.changeset/afraid-apples-learn.md new file mode 100644 index 000000000..7e318c7a8 --- /dev/null +++ b/.changeset/afraid-apples-learn.md @@ -0,0 +1,7 @@ +--- +'@powersync/common': patch +'@powersync/web': patch +'@powersync/react-native': patch +--- + +Fixed issue where sequentially mutating the same row multiple times could cause the CRUD upload queue monitoring to think CRUD operations have not been processed correctly by the `BackendConnector` `uploadData` method. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 658fc61da..40553d459 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -3,6 +3,7 @@ import Logger, { ILogger } from 'js-logger'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; +import { throttleLeadingTrailing } from '../../../utils/throttle.js'; import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; @@ -16,7 +17,6 @@ import { isStreamingSyncCheckpointDiff, isStreamingSyncData } from './streaming-sync-types.js'; -import { throttleLeadingTrailing } from '../../../utils/throttle.js'; export enum LockType { CRUD = 'crud', @@ -230,7 +230,7 @@ export abstract class AbstractStreamingSyncImplementation */ const nextCrudItem = await this.options.adapter.nextCrudItem(); if (nextCrudItem) { - if (nextCrudItem.id == checkedCrudItem?.id) { + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 243ff49f7..8ac280005 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -16,8 +16,8 @@ import { import { Mutex } from 'async-mutex'; import { WASQLiteOpenFactory } from './adapters/wa-sqlite/WASQLiteOpenFactory'; import { - ResolvedWebSQLOpenOptions, DEFAULT_WEB_SQL_FLAGS, + ResolvedWebSQLOpenOptions, resolveWebSQLFlags, WebSQLFlags } from './adapters/web-sql-flags'; diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index f8119d419..1d195d211 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -1,12 +1,39 @@ -import { Schema, TableV2, column } from '@powersync/common'; +import { Schema, Table, column } from '@powersync/common'; +import { WebPowerSyncOpenFactoryOptions } from '@powersync/web'; import Logger from 'js-logger'; import { v4 as uuid } from 'uuid'; import { beforeAll, describe, expect, it, vi } from 'vitest'; import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory'; +type UnwrapPromise = T extends Promise ? U : T; + +export type ConnectedDatabaseUtils = UnwrapPromise>; +export type GenerateConnectedDatabaseOptions = { + powerSyncOptions: Partial; +}; + const UPLOAD_TIMEOUT_MS = 3000; -export async function generateConnectedDatabase({ useWebWorker } = { useWebWorker: true }) { +export const DEFAULT_CONNECTED_POWERSYNC_OPTIONS = { + powerSyncOptions: { + dbFilename: 'test-stream-connection.db', + flags: { + enableMultiTabs: false, + useWebWorker: true + }, + // Makes tests faster + crudUploadThrottleMs: 0, + schema: new Schema({ + users: new Table({ name: column.text }) + }) + } +}; + +export async function generateConnectedDatabase( + options: GenerateConnectedDatabaseOptions = DEFAULT_CONNECTED_POWERSYNC_OPTIONS +) { + const { powerSyncOptions } = options; + const { powerSyncOptions: defaultPowerSyncOptions } = DEFAULT_CONNECTED_POWERSYNC_OPTIONS; /** * Very basic implementation of a listener pattern. * Required since we cannot extend multiple classes. @@ -16,24 +43,14 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke const uploadSpy = vi.spyOn(connector, 'uploadData'); const remote = new MockRemote(connector, () => callbacks.forEach((c) => c())); - const users = new TableV2({ - name: column.text - }); - - const schema = new Schema({ - users - }); - const factory = new MockStreamOpenFactory( { - dbFilename: 'test-stream-connection.db', + ...defaultPowerSyncOptions, + ...powerSyncOptions, flags: { - enableMultiTabs: false, - useWebWorker - }, - // Makes tests faster - crudUploadThrottleMs: 0, - schema + ...(defaultPowerSyncOptions.flags ?? {}), + ...(powerSyncOptions.flags ?? {}) + } }, remote ); @@ -83,7 +100,14 @@ describe('Streaming', () => { test: (createConnectedDatabase: () => ReturnType) => Promise ) => { const funcWithWebWorker = generateConnectedDatabase; - const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false }); + const funcWithoutWebWorker = () => + generateConnectedDatabase({ + powerSyncOptions: { + flags: { + useWebWorker: false + } + } + }); it(`${name} - with web worker`, () => test(funcWithWebWorker)); it(`${name} - without web worker`, () => test(funcWithoutWebWorker)); diff --git a/packages/web/tests/uploads.test.ts b/packages/web/tests/uploads.test.ts new file mode 100644 index 000000000..c61207e2b --- /dev/null +++ b/packages/web/tests/uploads.test.ts @@ -0,0 +1,116 @@ +import Logger from 'js-logger'; +import p from 'p-defer'; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ConnectedDatabaseUtils, generateConnectedDatabase } from './stream.test'; + +describe('CRUD Uploads', () => { + let connectedUtils: ConnectedDatabaseUtils; + const logger = Logger.get('crud-logger'); + + beforeAll(() => Logger.useDefaults()); + + beforeEach(async () => { + connectedUtils = await generateConnectedDatabase({ + powerSyncOptions: { + logger, + /** + * The timeout here is set to longer than the default test timeout + * A retry wil cause tests to fail. + */ + crudUploadThrottleMs: 10_000, + flags: { + enableMultiTabs: false + } + } + }); + }); + + afterEach(async () => { + connectedUtils.remote.streamController?.close(); + await connectedUtils.powersync.disconnectAndClear(); + await connectedUtils.powersync.close(); + }); + + it('should warn for missing upload operations in uploadData', async () => { + const { powersync, uploadSpy } = connectedUtils; + const loggerSpy = vi.spyOn(logger, 'warn'); + + const deferred = p(); + + uploadSpy.mockImplementation(async (db) => { + // This upload method does not perform an upload + deferred.resolve(); + }); + + // Create something with CRUD in it. + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']); + + // The empty upload handler should have been called + // Timeouts seem to be weird in Vitest Browser mode. + // This makes the check below more stable. + await deferred.promise; + + await vi.waitFor( + () => { + expect( + loggerSpy.mock.calls.find((logArgs) => + logArgs[0].includes('Potentially previously uploaded CRUD entries are still present') + ) + ).exist; + }, + { + timeout: 500 + } + ); + }); + + it('should immediately upload sequential transactions', async () => { + const { powersync, uploadSpy } = connectedUtils; + const deferred = p(); + + uploadSpy.mockImplementation(async (db) => { + // This upload method does not perform an upload + const nextTransaction = await db.getNextCrudTransaction(); + console.log('uploading trans', nextTransaction); + if (!nextTransaction) { + return; + } + + // Mockingly delete the crud op in order to progress through the CRUD queue + for (const op of nextTransaction.crud) { + await db.execute(`DELETE FROM ps_crud WHERE id = ?`, [op.clientId]); + } + + deferred.resolve(); + }); + + // Create the first item + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']); + + // Modify the first item in a new transaction + await powersync.execute(` + UPDATE + users + SET + name = 'Mugi' + WHERE + name = 'steven'`); + + // Create a second item + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven2']); + + // The empty upload handler should have been called + // Timeouts seem to be weird in Vitest Browser mode. + // This makes the check below more stable. + await deferred.promise; + + await vi.waitFor( + () => { + expect(uploadSpy.mock.calls.length).eq(3); + }, + { + timeout: 5_000 + } + ); + }); +}); diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index 71a6d0266..991bee99b 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -133,6 +133,7 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { connector: PowerSyncBackendConnector ): AbstractStreamingSyncImplementation { return new WebStreamingSyncImplementation({ + logger: this.options.logger, adapter: this.bucketStorageAdapter, remote: this.remote, uploadCrud: async () => { @@ -140,7 +141,7 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { await connector.uploadData(this); }, identifier: this.database.name, - retryDelayMs: 0 + retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster }); } } From c7b303186a3794fc495b7e3cc4768e4a638276dc Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 20 Sep 2024 10:06:21 +0200 Subject: [PATCH 2/3] cleanup --- packages/web/tests/uploads.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/web/tests/uploads.test.ts b/packages/web/tests/uploads.test.ts index c61207e2b..434f63fd0 100644 --- a/packages/web/tests/uploads.test.ts +++ b/packages/web/tests/uploads.test.ts @@ -71,7 +71,6 @@ describe('CRUD Uploads', () => { uploadSpy.mockImplementation(async (db) => { // This upload method does not perform an upload const nextTransaction = await db.getNextCrudTransaction(); - console.log('uploading trans', nextTransaction); if (!nextTransaction) { return; } @@ -99,7 +98,7 @@ describe('CRUD Uploads', () => { // Create a second item await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven2']); - // The empty upload handler should have been called + // The empty upload handler should have been called. // Timeouts seem to be weird in Vitest Browser mode. // This makes the check below more stable. await deferred.promise; From e70477b8b76b0768b1010e67402fce0434225f50 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 20 Sep 2024 10:27:40 +0200 Subject: [PATCH 3/3] Fix failing test due to lock timeout --- packages/web/tests/uploads.test.ts | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/packages/web/tests/uploads.test.ts b/packages/web/tests/uploads.test.ts index 434f63fd0..982fcf8e9 100644 --- a/packages/web/tests/uploads.test.ts +++ b/packages/web/tests/uploads.test.ts @@ -3,6 +3,9 @@ import p from 'p-defer'; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { ConnectedDatabaseUtils, generateConnectedDatabase } from './stream.test'; +// Don't want to actually export the warning string from the package +const PARTIAL_WARNING = 'Potentially previously uploaded CRUD entries are still present'; + describe('CRUD Uploads', () => { let connectedUtils: ConnectedDatabaseUtils; const logger = Logger.get('crud-logger'); @@ -13,11 +16,7 @@ describe('CRUD Uploads', () => { connectedUtils = await generateConnectedDatabase({ powerSyncOptions: { logger, - /** - * The timeout here is set to longer than the default test timeout - * A retry wil cause tests to fail. - */ - crudUploadThrottleMs: 10_000, + crudUploadThrottleMs: 1_000, flags: { enableMultiTabs: false } @@ -52,11 +51,7 @@ describe('CRUD Uploads', () => { await vi.waitFor( () => { - expect( - loggerSpy.mock.calls.find((logArgs) => - logArgs[0].includes('Potentially previously uploaded CRUD entries are still present') - ) - ).exist; + expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).exist; }, { timeout: 500 @@ -66,15 +61,15 @@ describe('CRUD Uploads', () => { it('should immediately upload sequential transactions', async () => { const { powersync, uploadSpy } = connectedUtils; + const loggerSpy = vi.spyOn(logger, 'warn'); + const deferred = p(); uploadSpy.mockImplementation(async (db) => { - // This upload method does not perform an upload const nextTransaction = await db.getNextCrudTransaction(); if (!nextTransaction) { return; } - // Mockingly delete the crud op in order to progress through the CRUD queue for (const op of nextTransaction.crud) { await db.execute(`DELETE FROM ps_crud WHERE id = ?`, [op.clientId]); @@ -111,5 +106,7 @@ describe('CRUD Uploads', () => { timeout: 5_000 } ); + + expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).toBeUndefined; }); });