Skip to content

Commit 1355e79

Browse files
committed
Resolve race condition by ensuring that op=connected has been received
before sending a new subscribe event. Fixes #46
1 parent b376ee1 commit 1355e79

File tree

2 files changed

+18
-15
lines changed

2 files changed

+18
-15
lines changed

ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
private WebSocketClient webSocketClient;
3939
private int requestIdCount = 1;
4040
private boolean userInitiatedDisconnect = false;
41+
private boolean hasReceivedConnected = false;
4142

4243
/* package */ ParseLiveQueryClientImpl() {
4344
this(getDefaultUri());
@@ -88,8 +89,7 @@ public <T extends ParseObject> SubscriptionHandling<T> subscribe(ParseQuery<T> q
8889
Subscription<T> subscription = new Subscription<>(requestId, query);
8990
subscriptions.append(requestId, subscription);
9091

91-
// TODO: differentiate between state=CONNECTED, vs received op=connected response
92-
if (inAnyState(WebSocketClient.State.CONNECTED)) {
92+
if (isConnected()) {
9393
sendSubscription(subscription);
9494
} else if (userInitiatedDisconnect) {
9595
Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions.");
@@ -151,18 +151,21 @@ public void reconnect() {
151151
webSocketClient.close();
152152
}
153153

154+
userInitiatedDisconnect = false;
155+
hasReceivedConnected = false;
154156
webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri);
155157
webSocketClient.open();
156-
userInitiatedDisconnect = false;
157158
}
158159

159160
@Override
160161
public void disconnect() {
161162
if (webSocketClient != null) {
162-
userInitiatedDisconnect = true;
163163
webSocketClient.close();
164164
webSocketClient = null;
165165
}
166+
167+
userInitiatedDisconnect = true;
168+
hasReceivedConnected = false;
166169
}
167170

168171
@Override
@@ -186,6 +189,10 @@ private WebSocketClient.State getWebSocketState() {
186189
return state == null ? WebSocketClient.State.NONE : state;
187190
}
188191

192+
private boolean isConnected() {
193+
return hasReceivedConnected && inAnyState(WebSocketClient.State.CONNECTED);
194+
}
195+
189196
private boolean inAnyState(WebSocketClient.State... states) {
190197
return Arrays.asList(states).contains(getWebSocketState());
191198
}
@@ -220,6 +227,7 @@ private void parseMessage(String message) throws LiveQueryException {
220227

221228
switch (rawOperation) {
222229
case "connected":
230+
hasReceivedConnected = true;
223231
dispatchConnected();
224232
Log.v(LOG_TAG, "Connected, sending pending subscription");
225233
for (int i = 0; i < subscriptions.size(); i++) {
@@ -371,6 +379,7 @@ private WebSocketClient.WebSocketClientCallback getWebSocketClientCallback() {
371379
return new WebSocketClient.WebSocketClientCallback() {
372380
@Override
373381
public void onOpen() {
382+
hasReceivedConnected = false;
374383
Log.v(LOG_TAG, "Socket opened");
375384
ParseUser.getCurrentSessionTokenAsync().onSuccessTask(new Continuation<String, Task<Void>>() {
376385
@Override
@@ -406,12 +415,14 @@ public Void then(Task<Void> task) {
406415
@Override
407416
public void onClose() {
408417
Log.v(LOG_TAG, "Socket onClose");
418+
hasReceivedConnected = false;
409419
dispatchDisconnected();
410420
}
411421

412422
@Override
413423
public void onError(Throwable exception) {
414424
Log.e(LOG_TAG, "Socket onError", exception);
425+
hasReceivedConnected = false;
415426
dispatchSocketError(exception);
416427
}
417428

ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public void tearDown() throws Exception {
8484
}
8585

8686
@Test
87-
public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Exception {
87+
public void testSubscribeAfterSocketConnectBeforeConnectedOp() throws Exception {
88+
// Bug: https://github.com/parse-community/ParseLiveQuery-Android/issues/46
8889
ParseQuery<ParseObject> queryA = ParseQuery.getQuery("objA");
8990
ParseQuery<ParseObject> queryB = ParseQuery.getQuery("objB");
9091
clearConnection();
@@ -102,21 +103,12 @@ public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Excepti
102103
verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\""));
103104

104105
// Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected
105-
SubscriptionHandling<ParseObject> subB = parseLiveQueryClient.subscribe(queryB); // TODO: fix this state
106+
SubscriptionHandling<ParseObject> subB = parseLiveQueryClient.subscribe(queryB);
106107
verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\""));
107108

108109
// on op=connected, _then_ we should send both subscriptions
109110
webSocketClientCallback.onMessage(createConnectedMessage().toString());
110111
verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\""));
111-
112-
// 1. Subscribe to queryA
113-
// - it is not connected yet, so it will trigger reconnect.
114-
// 2. Socket opens & connects; initiate op=connect
115-
// 3. subscribe to queryB
116-
// - SOCKET is connected, but we haven't received op=connected yet.
117-
// - BUG: it will call sendSubscription now
118-
// 4. Server responds to #2 with op=connected
119-
// 5. On op=connected, we replay pending subscriptions, including the one that was already sent in #3
120112
}
121113

122114
@Test

0 commit comments

Comments
 (0)