From a348751d10879893551927c187e7c713416bc273 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Dec 2024 15:33:34 +0200 Subject: [PATCH 1/4] Add failing test. --- .../test/src/large_batch.test.ts | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index 3a3ea4db..bc08083a 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -176,6 +176,71 @@ function defineBatchTests(factory: StorageFactory) { console.log(`Truncated ${truncateCount} ops in ${truncateDuration}ms ${truncatePerSecond} ops/s. ${used}MB heap`); }); + test('large number of bucket_data docs', async () => { + // This tests that we don't run into this error: + // MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data" + // The test is quite sensitive to internals, since we need to + // generate an internal batch that is just below 16MB. + // + // For the test to work, we need a: + // 1. Large number of documents in the batch. + // 2. More bucket_data documents than current_data documents, + // otherwise other batch limiting thresholds are hit. + // 3. A large document to make sure we get to just below the 16MB + // limit. + // 4. Another document to make sure the internal batching overflows + // to a second batch. + + await using context = await WalStreamTestContext.open(factory); + await context.updateSyncRules(`bucket_definitions: + global: + data: + # Sync 4x so we get more bucket_data documents + - SELECT * FROM test_data + - SELECT * FROM test_data + - SELECT * FROM test_data + - SELECT * FROM test_data + `); + const { pool } = context; + + await pool.query(`CREATE TABLE test_data(id serial primary key, description text)`); + + const numDocs = 499; + let description = ''; + while (description.length < 2650) { + description += '.'; + } + + await pool.query({ + statement: `INSERT INTO test_data(description) SELECT $2 FROM generate_series(1, $1) i`, + params: [ + { type: 'int4', value: numDocs }, + { type: 'varchar', value: description } + ] + }); + + let largeDescription = ''; + + while (largeDescription.length < 2_768_000) { + largeDescription += '.'; + } + await pool.query({ + statement: 'INSERT INTO test_data(description) VALUES($1)', + params: [{ type: 'varchar', value: largeDescription }] + }); + await pool.query({ + statement: 'INSERT INTO test_data(description) VALUES($1)', + params: [{ type: 'varchar', value: 'testingthis' }] + }); + await context.replicateSnapshot(); + + context.startStreaming(); + + const checkpoint = await context.getCheckpoint({ timeout: 50_000 }); + const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + expect(checksum.get('global[]')!.count).toEqual((numDocs + 2) * 4); + }); + test('resuming initial replication (1)', async () => { // Stop early - likely to not include deleted row in first replication attempt. await testResumingReplication(2000); From 33c7163a33075af25bf94a153545cbf1d97f82ee Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Dec 2024 15:42:12 +0200 Subject: [PATCH 2/4] Update mongo driver. --- modules/module-mongodb/package.json | 2 +- packages/service-core/package.json | 2 +- pnpm-lock.yaml | 38 +++++++++++++++++------------ service/package.json | 2 +- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/modules/module-mongodb/package.json b/modules/module-mongodb/package.json index 34888409..09d0999a 100644 --- a/modules/module-mongodb/package.json +++ b/modules/module-mongodb/package.json @@ -33,7 +33,7 @@ "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", - "mongodb": "^6.7.0", + "mongodb": "^6.11.0", "ts-codec": "^1.2.2", "uuid": "^9.0.1", "uri-js": "^4.4.1" diff --git a/packages/service-core/package.json b/packages/service-core/package.json index 90ec0adc..174a464a 100644 --- a/packages/service-core/package.json +++ b/packages/service-core/package.json @@ -37,7 +37,7 @@ "jose": "^4.15.1", "lodash": "^4.17.21", "lru-cache": "^10.2.2", - "mongodb": "^6.7.0", + "mongodb": "^6.11.0", "node-fetch": "^3.3.2", "ts-codec": "^1.2.2", "uri-js": "^4.4.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a7ccf60a..1987b9c7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -118,8 +118,8 @@ importers: specifier: workspace:* version: link:../../packages/types mongodb: - specifier: ^6.7.0 - version: 6.8.0(socks@2.8.3) + specifier: ^6.11.0 + version: 6.11.0(socks@2.8.3) ts-codec: specifier: ^1.2.2 version: 1.2.2 @@ -344,8 +344,8 @@ importers: specifier: ^10.2.2 version: 10.4.3 mongodb: - specifier: ^6.7.0 - version: 6.8.0(socks@2.8.3) + specifier: ^6.11.0 + version: 6.11.0(socks@2.8.3) node-fetch: specifier: ^3.3.2 version: 3.3.2 @@ -496,8 +496,8 @@ importers: specifier: ^10.0.1 version: 10.4.3 mongodb: - specifier: ^6.7.0 - version: 6.8.0(socks@2.8.3) + specifier: ^6.11.0 + version: 6.11.0(socks@2.8.3) node-fetch: specifier: ^3.3.2 version: 3.3.2 @@ -830,8 +830,8 @@ packages: '@manypkg/get-packages@1.1.3': resolution: {integrity: sha512-fo+QhuU3qE/2TQMQmbVMqaQ6EWbMhi4ABWP+O4AM1NqPBuy0OrApV5LO6BrrgnhtAHS2NH6RrVk9OL181tTi8A==} - '@mongodb-js/saslprep@1.1.7': - resolution: {integrity: sha512-dCHW/oEX0KJ4NjDULBo3JiOaK5+6axtpBbS+ao2ZInoAL9/YRQLhXzSNAFz7hP4nzLkIqsfYAK/PDE3+XHny0Q==} + '@mongodb-js/saslprep@1.1.9': + resolution: {integrity: sha512-tVkljjeEaAhCqTzajSdgbQ6gE6f3oneVwa3iXR6csiEwXXOFsiC6Uh9iAjAhXPtqa/XMDHWjjeNH/77m/Yq2dw==} '@nodelib/fs.scandir@2.1.5': resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==} @@ -1587,6 +1587,10 @@ packages: resolution: {integrity: sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==} engines: {node: '>=8'} + bson@6.10.1: + resolution: {integrity: sha512-P92xmHDQjSKPLHqFxefqMxASNq/aWJMEZugpCjf+AF/pgcUpMMQCg7t7+ewko0/u8AapvF3luf/FoehddEK+sA==} + engines: {node: '>=16.20.1'} + bson@6.8.0: resolution: {integrity: sha512-iOJg8pr7wq2tg/zSlCCHMi3hMm5JTOxLTagf3zxhcenHsFp+c6uOs6K7W5UE7A4QIJGtqh/ZovFNMP4mOPJynQ==} engines: {node: '>=16.20.1'} @@ -2571,8 +2575,8 @@ packages: mongodb-connection-string-url@3.0.1: resolution: {integrity: sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==} - mongodb@6.8.0: - resolution: {integrity: sha512-HGQ9NWDle5WvwMnrvUxsFYPd3JEbqD3RgABHBQRuoCEND0qzhsd0iH5ypHsf1eJ+sXmvmyKpP+FLOKY8Il7jMw==} + mongodb@6.11.0: + resolution: {integrity: sha512-yVbPw0qT268YKhG241vAMLaDQAPbRyTgo++odSgGc9kXnzOujQI60Iyj23B9sQQFPSvmNPvMZ3dsFz0aN55KgA==} engines: {node: '>=16.20.1'} peerDependencies: '@aws-sdk/credential-providers': ^3.188.0 @@ -4100,7 +4104,7 @@ snapshots: globby: 11.1.0 read-yaml-file: 1.1.0 - '@mongodb-js/saslprep@1.1.7': + '@mongodb-js/saslprep@1.1.9': dependencies: sparse-bitfield: 3.0.3 @@ -4636,7 +4640,7 @@ snapshots: '@opentelemetry/semantic-conventions': 1.25.1 '@prisma/instrumentation': 5.16.1 '@sentry/core': 8.17.0 - '@sentry/opentelemetry': 8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.6.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/semantic-conventions@1.25.1) + '@sentry/opentelemetry': 8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.9.0))(@opentelemetry/semantic-conventions@1.25.1) '@sentry/types': 8.17.0 '@sentry/utils': 8.17.0 optionalDependencies: @@ -4644,7 +4648,7 @@ snapshots: transitivePeerDependencies: - supports-color - '@sentry/opentelemetry@8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.6.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/semantic-conventions@1.25.1)': + '@sentry/opentelemetry@8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.9.0))(@opentelemetry/semantic-conventions@1.25.1)': dependencies: '@opentelemetry/api': 1.9.0 '@opentelemetry/core': 1.25.1(@opentelemetry/api@1.9.0) @@ -4985,6 +4989,8 @@ snapshots: dependencies: fill-range: 7.1.1 + bson@6.10.1: {} + bson@6.8.0: {} buffer-from@1.1.2: {} @@ -6041,10 +6047,10 @@ snapshots: '@types/whatwg-url': 11.0.5 whatwg-url: 13.0.0 - mongodb@6.8.0(socks@2.8.3): + mongodb@6.11.0(socks@2.8.3): dependencies: - '@mongodb-js/saslprep': 1.1.7 - bson: 6.8.0 + '@mongodb-js/saslprep': 1.1.9 + bson: 6.10.1 mongodb-connection-string-url: 3.0.1 optionalDependencies: socks: 2.8.3 diff --git a/service/package.json b/service/package.json index d6337d5b..36d14be7 100644 --- a/service/package.json +++ b/service/package.json @@ -34,7 +34,7 @@ "ix": "^5.0.0", "jose": "^4.15.1", "lru-cache": "^10.0.1", - "mongodb": "^6.7.0", + "mongodb": "^6.11.0", "node-fetch": "^3.3.2", "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87", "ts-codec": "^1.2.2", From 7d955d536f867de03dd55f423c4a61a10a9005ca Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Dec 2024 15:40:05 +0200 Subject: [PATCH 3/4] Workaround for bulkWrite batching issues. --- .../src/storage/mongo/MongoCompactor.ts | 3 +- .../storage/mongo/MongoWriteCheckpointAPI.ts | 7 ++- .../src/storage/mongo/PersistedBatch.ts | 23 +++++++--- .../service-core/src/storage/mongo/util.ts | 45 +++++++++++++++++++ 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/packages/service-core/src/storage/mongo/MongoCompactor.ts b/packages/service-core/src/storage/mongo/MongoCompactor.ts index 102e1daf..7024e5e2 100644 --- a/packages/service-core/src/storage/mongo/MongoCompactor.ts +++ b/packages/service-core/src/storage/mongo/MongoCompactor.ts @@ -5,6 +5,7 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey } from './models.js'; import { CompactOptions } from '../BucketStorage.js'; import { cacheKey } from './OperationBatch.js'; +import { safeBulkWrite } from './util.js'; interface CurrentBucketState { /** Bucket name */ @@ -264,7 +265,7 @@ export class MongoCompactor { private async flush() { if (this.updates.length > 0) { logger.info(`Compacting ${this.updates.length} ops`); - await this.db.bucket_data.bulkWrite(this.updates, { + await safeBulkWrite(this.db.bucket_data, this.updates, { // Order is not important. // Since checksums are not affected, these operations can happen in any order, // and it's fine if the operations are partially applied. diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index 966eb77b..6bf4418a 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -9,6 +9,7 @@ import { WriteCheckpointMode } from '../WriteCheckpointAPI.js'; import { PowerSyncMongo } from './db.js'; +import { safeBulkWrite } from './util.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -134,7 +135,8 @@ export async function batchCreateCustomWriteCheckpoints( return; } - await db.custom_write_checkpoints.bulkWrite( + await safeBulkWrite( + db.custom_write_checkpoints, checkpoints.map((checkpointOptions) => ({ updateOne: { filter: { user_id: checkpointOptions.user_id, sync_rules_id: checkpointOptions.sync_rules_id }, @@ -146,6 +148,7 @@ export async function batchCreateCustomWriteCheckpoints( }, upsert: true } - })) + })), + {} ); } diff --git a/packages/service-core/src/storage/mongo/PersistedBatch.ts b/packages/service-core/src/storage/mongo/PersistedBatch.ts index 106dcadf..77bf76fa 100644 --- a/packages/service-core/src/storage/mongo/PersistedBatch.ts +++ b/packages/service-core/src/storage/mongo/PersistedBatch.ts @@ -16,7 +16,7 @@ import { SourceKey, ReplicaId } from './models.js'; -import { replicaIdToSubkey, serializeLookup } from './util.js'; +import { replicaIdToSubkey, safeBulkWrite, serializeLookup } from './util.js'; import { logger } from '@powersync/lib-services-framework'; /** @@ -33,6 +33,13 @@ import { logger } from '@powersync/lib-services-framework'; */ const MAX_TRANSACTION_BATCH_SIZE = 30_000_000; +/** + * Limit number of documents to write in a single transaction. + * + * This has an effect on error message size in some cases. + */ +const MAX_TRANSACTION_DOC_COUNT = 2_000; + /** * Keeps track of bulkwrite operations within a transaction. * @@ -231,26 +238,32 @@ export class PersistedBatch { } shouldFlushTransaction() { - return this.currentSize >= MAX_TRANSACTION_BATCH_SIZE; + return ( + this.currentSize >= MAX_TRANSACTION_BATCH_SIZE || + this.bucketData.length >= MAX_TRANSACTION_DOC_COUNT || + this.currentData.length >= MAX_TRANSACTION_DOC_COUNT || + this.bucketParameters.length >= MAX_TRANSACTION_DOC_COUNT + ); } async flush(db: PowerSyncMongo, session: mongo.ClientSession) { if (this.bucketData.length > 0) { - await db.bucket_data.bulkWrite(this.bucketData, { + // calculate total size + await safeBulkWrite(db.bucket_data, this.bucketData, { session, // inserts only - order doesn't matter ordered: false }); } if (this.bucketParameters.length > 0) { - await db.bucket_parameters.bulkWrite(this.bucketParameters, { + await safeBulkWrite(db.bucket_parameters, this.bucketParameters, { session, // inserts only - order doesn't matter ordered: false }); } if (this.currentData.length > 0) { - await db.current_data.bulkWrite(this.currentData, { + await safeBulkWrite(db.current_data, this.currentData, { session, // may update and delete data within the same batch - order matters ordered: true diff --git a/packages/service-core/src/storage/mongo/util.ts b/packages/service-core/src/storage/mongo/util.ts index d6b36a7d..cbfe5e4d 100644 --- a/packages/service-core/src/storage/mongo/util.ts +++ b/packages/service-core/src/storage/mongo/util.ts @@ -156,3 +156,48 @@ export function isUUID(value: any): value is bson.UUID { const uuid = value as bson.UUID; return uuid._bsontype == 'Binary' && uuid.sub_type == bson.Binary.SUBTYPE_UUID; } + +/** + * MongoDB bulkWrite internally splits the operations into batches + * so that no batch exceeds 16MB. However, there are cases where + * the batch size is very close to 16MB, where additional metadata + * on the server pushes it over the limit, resulting in this error + * from the server: + * + * > MongoBulkWriteError: BSONObj size: 16814023 (0x1008FC7) is invalid. Size must be between 0 and 16793600(16MB) First element: insert: "bucket_data" + * + * We work around the issue by doing our own batching, limiting the + * batch size to 15MB. This does add additional overhead with + * BSON.calculateObjectSize. + */ +export async function safeBulkWrite( + collection: mongo.Collection, + operations: mongo.AnyBulkWriteOperation[], + options: mongo.BulkWriteOptions +) { + // Must be below 16MB. + // We could probably go a little closer, but 15MB is a safe threshold. + const BULK_WRITE_LIMIT = 15 * 1024 * 1024; + + let batch: mongo.AnyBulkWriteOperation[] = []; + let currentSize = 0; + // Estimated overhead per operation, should be smaller in reality. + const keySize = 8; + for (let op of operations) { + const bsonSize = + mongo.BSON.calculateObjectSize(op, { + checkKeys: false, + ignoreUndefined: true + } as any) + keySize; + if (batch.length > 0 && currentSize + bsonSize > BULK_WRITE_LIMIT) { + await collection.bulkWrite(batch, options); + currentSize = 0; + batch = []; + } + batch.push(op); + currentSize += bsonSize; + } + if (batch.length > 0) { + await collection.bulkWrite(batch, options); + } +} From 5f51e5ffe801a6b9608301e786a4ff2d2dc86ce0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 9 Dec 2024 16:07:07 +0200 Subject: [PATCH 4/4] Add changeset. --- .changeset/perfect-fireants-warn.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/perfect-fireants-warn.md diff --git a/.changeset/perfect-fireants-warn.md b/.changeset/perfect-fireants-warn.md new file mode 100644 index 00000000..6e5bb246 --- /dev/null +++ b/.changeset/perfect-fireants-warn.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Fix "BSONObj size is invalid" error during replication.