diff --git a/integration/test/ParseLiveQueryTest.js b/integration/test/ParseLiveQueryTest.js index 5c2a89e53..cf3bd0a34 100644 --- a/integration/test/ParseLiveQueryTest.js +++ b/integration/test/ParseLiveQueryTest.js @@ -33,6 +33,27 @@ describe('Parse LiveQuery', () => { await object.save(); }); + it('can subscribe to query with client', async (done) => { + const object = new TestObject(); + await object.save(); + + const query = new Parse.Query(TestObject); + query.equalTo('objectId', object.id); + const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); + if (client.shouldOpen()) { + client.open(); + } + const subscription = client.subscribe(query); + + subscription.on('update', object => { + assert.equal(object.get('foo'), 'bar'); + done(); + }); + await subscription.subscribePromise; + object.set({ foo: 'bar' }); + await object.save(); + }); + it('can subscribe to multiple queries', async () => { const objectA = new TestObject(); const objectB = new TestObject(); diff --git a/src/LiveQueryClient.js b/src/LiveQueryClient.js index 39f7cc87f..d96a9c08a 100644 --- a/src/LiveQueryClient.js +++ b/src/LiveQueryClient.js @@ -178,9 +178,9 @@ class LiveQueryClient extends EventEmitter { * * @param {Object} query - the ParseQuery you want to subscribe to * @param {string} sessionToken (optional) - * @return {Object} subscription + * @return {LiveQuerySubscription} subscription */ - subscribe(query: Object, sessionToken: ?string): Object { + subscribe(query: Object, sessionToken: ?string): LiveQuerySubscription { if (!query) { return; } @@ -309,6 +309,7 @@ class LiveQueryClient extends EventEmitter { this.socket.close(); // Notify each subscription about the close for (const subscription of this.subscriptions.values()) { + subscription.subscribed = false; subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE); } this._handleReset(); @@ -358,12 +359,15 @@ class LiveQueryClient extends EventEmitter { break; case OP_EVENTS.SUBSCRIBED: if (subscription) { + subscription.subscribed = true; + subscription.subscribePromise.resolve(); subscription.emit(SUBSCRIPTION_EMMITER_TYPES.OPEN); } break; case OP_EVENTS.ERROR: if (data.requestId) { if (subscription) { + subscription.subscribePromise.resolve(); subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR, data.error); } } else { @@ -435,7 +439,7 @@ class LiveQueryClient extends EventEmitter { // handle case when both close/error occur at frequent rates we ensure we do not reconnect unnecessarily. // we're unable to distinguish different between close/error when we're unable to reconnect therefore - // we try to reonnect in both cases + // we try to reconnect in both cases // server side ws and browser WebSocket behave differently in when close/error get triggered if (this.reconnectHandle) { diff --git a/src/LiveQuerySubscription.js b/src/LiveQuerySubscription.js index 8ea25d751..a6ed986f9 100644 --- a/src/LiveQuerySubscription.js +++ b/src/LiveQuerySubscription.js @@ -10,6 +10,7 @@ import EventEmitter from './EventEmitter'; import CoreManager from './CoreManager'; +import { resolvingPromise } from './promiseUtils'; /** * Creates a new LiveQuery Subscription. @@ -101,6 +102,8 @@ class Subscription extends EventEmitter { this.id = id; this.query = query; this.sessionToken = sessionToken; + this.subscribePromise = resolvingPromise(); + this.subscribed = false; // adding listener so process does not crash // best practice is for developer to register their own listener diff --git a/src/ParseQuery.js b/src/ParseQuery.js index 93eceefd0..de8fbc5b1 100644 --- a/src/ParseQuery.js +++ b/src/ParseQuery.js @@ -1618,7 +1618,9 @@ class ParseQuery { liveQueryClient.open(); } const subscription = liveQueryClient.subscribe(this, sessionToken); - return subscription; + return subscription.subscribePromise.then(() => { + return subscription; + }); } /** diff --git a/src/__tests__/LiveQueryClient-test.js b/src/__tests__/LiveQueryClient-test.js index d8e04e455..d32131991 100644 --- a/src/__tests__/LiveQueryClient-test.js +++ b/src/__tests__/LiveQueryClient-test.js @@ -51,6 +51,18 @@ const events = require('events'); CoreManager.setLocalDatastore(mockLocalDatastore); +function resolvingPromise() { + let res; + let rej; + const promise = new Promise((resolve, reject) => { + res = resolve; + rej = reject; + }); + promise.resolve = res; + promise.reject = rej; + return promise; +} + describe('LiveQueryClient', () => { beforeEach(() => { mockLocalDatastore.isEnabled = false; @@ -152,6 +164,8 @@ describe('LiveQueryClient', () => { }); // Add mock subscription const subscription = new events.EventEmitter(); + subscription.subscribePromise = resolvingPromise(); + liveQueryClient.subscriptions.set(1, subscription); const data = { op: 'subscribed', @@ -200,6 +214,39 @@ describe('LiveQueryClient', () => { expect(isChecked).toBe(true); }); + it('can handle WebSocket error while subscribing', () => { + const liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + const subscription = new events.EventEmitter(); + subscription.subscribePromise = resolvingPromise(); + liveQueryClient.subscriptions.set(1, subscription); + + const data = { + op: 'error', + clientId: 1, + requestId: 1, + error: 'error thrown' + }; + const event = { + data: JSON.stringify(data) + } + // Register checked in advance + let isChecked = false; + subscription.on('error', function(error) { + isChecked = true; + expect(error).toEqual('error thrown'); + }); + + liveQueryClient._handleWebSocketMessage(event); + + expect(isChecked).toBe(true); + }); + it('can handle WebSocket event response message', () => { const liveQueryClient = new LiveQueryClient({ applicationId: 'applicationId', @@ -457,9 +504,13 @@ describe('LiveQueryClient', () => { const query = new ParseQuery('Test'); query.equalTo('key', 'value'); - const subscription = liveQueryClient.subscribe(query); + const subscribePromise = liveQueryClient.subscribe(query); + const clientSub = liveQueryClient.subscriptions.get(1); + clientSub.subscribePromise.resolve(); + + const subscription = await subscribePromise; liveQueryClient.connectPromise.resolve(); - expect(subscription).toBe(liveQueryClient.subscriptions.get(1)); + expect(subscription).toBe(clientSub); expect(liveQueryClient.requestId).toBe(2); await liveQueryClient.connectPromise; const messageStr = liveQueryClient.socket.send.mock.calls[0][0]; diff --git a/src/__tests__/ParseLiveQuery-test.js b/src/__tests__/ParseLiveQuery-test.js index 74ae1f37b..be9c89446 100644 --- a/src/__tests__/ParseLiveQuery-test.js +++ b/src/__tests__/ParseLiveQuery-test.js @@ -157,7 +157,6 @@ describe('ParseLiveQuery', () => { }); it('subscribes to all subscription events', (done) => { - CoreManager.set('UserController', { currentUserAsync() { return Promise.resolve({ @@ -235,7 +234,6 @@ describe('ParseLiveQuery', () => { } }, 1); }); - }); it('should not throw on usubscribe', (done) => { diff --git a/src/__tests__/ParseQuery-test.js b/src/__tests__/ParseQuery-test.js index 0eed5193d..97462cef8 100644 --- a/src/__tests__/ParseQuery-test.js +++ b/src/__tests__/ParseQuery-test.js @@ -2849,7 +2849,9 @@ describe('ParseQuery LocalDatastore', () => { return false; }, subscribe: function(query, sessionToken) { - return new LiveQuerySubscription('0', query, sessionToken); + const subscription = new LiveQuerySubscription('0', query, sessionToken); + subscription.subscribePromise.resolve(); + return subscription; }, }; CoreManager.set('UserController', { @@ -2880,7 +2882,9 @@ describe('ParseQuery LocalDatastore', () => { }, open: function() {}, subscribe: function(query, sessionToken) { - return new LiveQuerySubscription('0', query, sessionToken); + const subscription = new LiveQuerySubscription('0', query, sessionToken); + subscription.subscribePromise.resolve(); + return subscription; }, }; CoreManager.set('UserController', { @@ -2911,7 +2915,9 @@ describe('ParseQuery LocalDatastore', () => { }, open: function() {}, subscribe: function(query, sessionToken) { - return new LiveQuerySubscription('0', query, sessionToken); + const subscription = new LiveQuerySubscription('0', query, sessionToken); + subscription.subscribePromise.resolve(); + return subscription; }, }; CoreManager.set('UserController', { @@ -2938,7 +2944,9 @@ describe('ParseQuery LocalDatastore', () => { }, open: function() {}, subscribe: function(query, sessionToken) { - return new LiveQuerySubscription('0', query, sessionToken); + const subscription = new LiveQuerySubscription('0', query, sessionToken); + subscription.subscribePromise.resolve(); + return subscription; }, }; CoreManager.set('UserController', {