Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions integration/test/ParseLiveQueryTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,27 @@ describe('Parse LiveQuery', () => {
await object.save();
});

it('can subscribe to query with client', async (done) => {
const object = new TestObject();
await object.save();

const query = new Parse.Query(TestObject);
query.equalTo('objectId', object.id);
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
if (client.shouldOpen()) {
client.open();
}
const subscription = client.subscribe(query);

subscription.on('update', object => {
assert.equal(object.get('foo'), 'bar');
done();
});
await subscription.subscribePromise;
object.set({ foo: 'bar' });
await object.save();
});

it('can subscribe to multiple queries', async () => {
const objectA = new TestObject();
const objectB = new TestObject();
Expand Down
10 changes: 7 additions & 3 deletions src/LiveQueryClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ class LiveQueryClient extends EventEmitter {
*
* @param {Object} query - the ParseQuery you want to subscribe to
* @param {string} sessionToken (optional)
* @return {Object} subscription
* @return {LiveQuerySubscription} subscription
*/
subscribe(query: Object, sessionToken: ?string): Object {
subscribe(query: Object, sessionToken: ?string): LiveQuerySubscription {
if (!query) {
return;
}
Expand Down Expand Up @@ -309,6 +309,7 @@ class LiveQueryClient extends EventEmitter {
this.socket.close();
// Notify each subscription about the close
for (const subscription of this.subscriptions.values()) {
subscription.subscribed = false;
subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE);
}
this._handleReset();
Expand Down Expand Up @@ -358,12 +359,15 @@ class LiveQueryClient extends EventEmitter {
break;
case OP_EVENTS.SUBSCRIBED:
if (subscription) {
subscription.subscribed = true;
subscription.subscribePromise.resolve();
subscription.emit(SUBSCRIPTION_EMMITER_TYPES.OPEN);
}
break;
case OP_EVENTS.ERROR:
if (data.requestId) {
if (subscription) {
subscription.subscribePromise.resolve();
subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR, data.error);
}
} else {
Expand Down Expand Up @@ -435,7 +439,7 @@ class LiveQueryClient extends EventEmitter {

// handle case when both close/error occur at frequent rates we ensure we do not reconnect unnecessarily.
// we're unable to distinguish different between close/error when we're unable to reconnect therefore
// we try to reonnect in both cases
// we try to reconnect in both cases
// server side ws and browser WebSocket behave differently in when close/error get triggered

if (this.reconnectHandle) {
Expand Down
3 changes: 3 additions & 0 deletions src/LiveQuerySubscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import EventEmitter from './EventEmitter';
import CoreManager from './CoreManager';
import { resolvingPromise } from './promiseUtils';

/**
* Creates a new LiveQuery Subscription.
Expand Down Expand Up @@ -101,6 +102,8 @@ class Subscription extends EventEmitter {
this.id = id;
this.query = query;
this.sessionToken = sessionToken;
this.subscribePromise = resolvingPromise();
this.subscribed = false;

// adding listener so process does not crash
// best practice is for developer to register their own listener
Expand Down
4 changes: 3 additions & 1 deletion src/ParseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,9 @@ class ParseQuery {
liveQueryClient.open();
}
const subscription = liveQueryClient.subscribe(this, sessionToken);
return subscription;
return subscription.subscribePromise.then(() => {
return subscription;
});
}

/**
Expand Down
55 changes: 53 additions & 2 deletions src/__tests__/LiveQueryClient-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ const events = require('events');

CoreManager.setLocalDatastore(mockLocalDatastore);

function resolvingPromise() {
let res;
let rej;
const promise = new Promise((resolve, reject) => {
res = resolve;
rej = reject;
});
promise.resolve = res;
promise.reject = rej;
return promise;
}

describe('LiveQueryClient', () => {
beforeEach(() => {
mockLocalDatastore.isEnabled = false;
Expand Down Expand Up @@ -152,6 +164,8 @@ describe('LiveQueryClient', () => {
});
// Add mock subscription
const subscription = new events.EventEmitter();
subscription.subscribePromise = resolvingPromise();

liveQueryClient.subscriptions.set(1, subscription);
const data = {
op: 'subscribed',
Expand Down Expand Up @@ -200,6 +214,39 @@ describe('LiveQueryClient', () => {
expect(isChecked).toBe(true);
});

it('can handle WebSocket error while subscribing', () => {
const liveQueryClient = new LiveQueryClient({
applicationId: 'applicationId',
serverURL: 'ws://test',
javascriptKey: 'javascriptKey',
masterKey: 'masterKey',
sessionToken: 'sessionToken'
});
const subscription = new events.EventEmitter();
subscription.subscribePromise = resolvingPromise();
liveQueryClient.subscriptions.set(1, subscription);

const data = {
op: 'error',
clientId: 1,
requestId: 1,
error: 'error thrown'
};
const event = {
data: JSON.stringify(data)
}
// Register checked in advance
let isChecked = false;
subscription.on('error', function(error) {
isChecked = true;
expect(error).toEqual('error thrown');
});

liveQueryClient._handleWebSocketMessage(event);

expect(isChecked).toBe(true);
});

it('can handle WebSocket event response message', () => {
const liveQueryClient = new LiveQueryClient({
applicationId: 'applicationId',
Expand Down Expand Up @@ -457,9 +504,13 @@ describe('LiveQueryClient', () => {
const query = new ParseQuery('Test');
query.equalTo('key', 'value');

const subscription = liveQueryClient.subscribe(query);
const subscribePromise = liveQueryClient.subscribe(query);
const clientSub = liveQueryClient.subscriptions.get(1);
clientSub.subscribePromise.resolve();

const subscription = await subscribePromise;
liveQueryClient.connectPromise.resolve();
expect(subscription).toBe(liveQueryClient.subscriptions.get(1));
expect(subscription).toBe(clientSub);
expect(liveQueryClient.requestId).toBe(2);
await liveQueryClient.connectPromise;
const messageStr = liveQueryClient.socket.send.mock.calls[0][0];
Expand Down
2 changes: 0 additions & 2 deletions src/__tests__/ParseLiveQuery-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ describe('ParseLiveQuery', () => {
});

it('subscribes to all subscription events', (done) => {

CoreManager.set('UserController', {
currentUserAsync() {
return Promise.resolve({
Expand Down Expand Up @@ -235,7 +234,6 @@ describe('ParseLiveQuery', () => {
}
}, 1);
});

});

it('should not throw on usubscribe', (done) => {
Expand Down
16 changes: 12 additions & 4 deletions src/__tests__/ParseQuery-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2849,7 +2849,9 @@ describe('ParseQuery LocalDatastore', () => {
return false;
},
subscribe: function(query, sessionToken) {
return new LiveQuerySubscription('0', query, sessionToken);
const subscription = new LiveQuerySubscription('0', query, sessionToken);
subscription.subscribePromise.resolve();
return subscription;
},
};
CoreManager.set('UserController', {
Expand Down Expand Up @@ -2880,7 +2882,9 @@ describe('ParseQuery LocalDatastore', () => {
},
open: function() {},
subscribe: function(query, sessionToken) {
return new LiveQuerySubscription('0', query, sessionToken);
const subscription = new LiveQuerySubscription('0', query, sessionToken);
subscription.subscribePromise.resolve();
return subscription;
},
};
CoreManager.set('UserController', {
Expand Down Expand Up @@ -2911,7 +2915,9 @@ describe('ParseQuery LocalDatastore', () => {
},
open: function() {},
subscribe: function(query, sessionToken) {
return new LiveQuerySubscription('0', query, sessionToken);
const subscription = new LiveQuerySubscription('0', query, sessionToken);
subscription.subscribePromise.resolve();
return subscription;
},
};
CoreManager.set('UserController', {
Expand All @@ -2938,7 +2944,9 @@ describe('ParseQuery LocalDatastore', () => {
},
open: function() {},
subscribe: function(query, sessionToken) {
return new LiveQuerySubscription('0', query, sessionToken);
const subscription = new LiveQuerySubscription('0', query, sessionToken);
subscription.subscribePromise.resolve();
return subscription;
},
};
CoreManager.set('UserController', {
Expand Down