From eb99a036bcfa1f1013a67a13ef070d68d0376340 Mon Sep 17 00:00:00 2001 From: Charlie Hsieh Date: Fri, 17 Jan 2025 16:53:18 +0800 Subject: [PATCH 1/2] feat: queue up ydoc sync task --- src/y-socket-io/y-socket-io.js | 55 +++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 2993fd0..877cbfc 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -90,6 +90,12 @@ export class YSocketIO { */ namespaceMap = new Map() + /** + * @type {Promise[]} + * @private + */ + syncQueue = [] + /** * YSocketIO constructor. * @constructor @@ -157,16 +163,26 @@ export class YSocketIO { this.initAwarenessListeners(socket) this.initSocketListeners(socket) - const doc = await this.client.getDoc(namespace, 'index') - - if ( - api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) - ) { - // our subscription is newer than the content that we received from the api - // need to renew subscription id and make sure that we catch the latest content. - this.subscriber.ensureSubId(stream, doc.redisLastId) - } - this.startSynchronization(socket, doc) + /** + * @type {Promise} + */ + const task = new Promise((resolve) => { + assert(this.client) + this.client.getDoc(namespace, 'index').then((doc) => { + assert(socket.user) + assert(this.subscriber) + if ( + api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) + ) { + // our subscription is newer than the content that we received from the api + // need to renew subscription id and make sure that we catch the latest content. + this.subscriber.ensureSubId(stream, doc.redisLastId) + } + this.startSynchronization(socket, doc) + resolve() + }) + }) + this.queueUpSyncTask(task) }) return { client, subscriber } @@ -336,6 +352,25 @@ export class YSocketIO { } } + /** + * @private + * @param {Promise} task + */ + queueUpSyncTask (task) { + const len = this.syncQueue.push(task) + if (len === 1) this.consumeSyncQueue() + } + + /** + * @private + */ + async consumeSyncQueue () { + if (this.syncQueue.length === 0) return + const task = this.syncQueue.shift() + await task + this.consumeSyncQueue() + } + /** * @param {Namespace} namespace */ From 847c7e68ad8db77192acc3ca47bf364309a09e02 Mon Sep 17 00:00:00 2001 From: Charlie Hsieh Date: Fri, 17 Jan 2025 17:03:26 +0800 Subject: [PATCH 2/2] feat: delay client sync step until server is ready --- src/y-socket-io/client.js | 2 +- src/y-socket-io/y-socket-io.js | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index ebeacde..d93a18c 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -157,7 +157,7 @@ export class SocketIOProvider extends Observable { this.doc.on('update', this.onUpdateDoc) - this.socket.on('connect', () => this.onSocketConnection(resyncInterval)) + this.socket.once('ready-for-sync', () => this.onSocketConnection(resyncInterval)) this.socket.on('disconnect', (event) => this.onSocketDisconnection(event)) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 877cbfc..1779033 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -171,6 +171,7 @@ export class YSocketIO { this.client.getDoc(namespace, 'index').then((doc) => { assert(socket.user) assert(this.subscriber) + socket.emit('ready-for-sync') if ( api.isSmallerRedisId(doc.redisLastId, socket.user.initialRedisSubId) ) {