Skip to content

Commit 9cbf55b

Browse files
[Java] Fix close with LongPolling (#25582)
1 parent 475fc56 commit 9cbf55b

File tree

4 files changed

+113
-6
lines changed

4 files changed

+113
-6
lines changed

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,9 @@ private Completable stop(String errorMessage) {
527527
hubConnectionStateLock.unlock();
528528
}
529529

530-
return transport.stop();
530+
Completable stop = transport.stop();
531+
stop.onErrorComplete().subscribe();
532+
return stop;
531533
}
532534

533535
/**

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/LongPollingTransport.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
class LongPollingTransport implements Transport {
2121
private OnReceiveCallBack onReceiveCallBack;
22-
private TransportOnClosedCallback onClose;
22+
private TransportOnClosedCallback onClose = (reason) -> {};
2323
private String url;
2424
private final HttpClient client;
2525
private final HttpClient pollingClient;
@@ -30,6 +30,7 @@ class LongPollingTransport implements Transport {
3030
private String pollUrl;
3131
private String closeError;
3232
private CompletableSubject receiveLoop = CompletableSubject.create();
33+
private CompletableSubject closeSubject = CompletableSubject.create();
3334
private ExecutorService threadPool;
3435
private ExecutorService onReceiveThread;
3536
private AtomicBoolean stopCalled = new AtomicBoolean(false);
@@ -157,7 +158,7 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) {
157158
public Completable stop() {
158159
if (stopCalled.compareAndSet(false, true)) {
159160
this.active = false;
160-
return this.updateHeaderToken().andThen(Completable.defer(() -> {
161+
Completable stopCompletable = this.updateHeaderToken().andThen(Completable.defer(() -> {
161162
HttpRequest request = new HttpRequest();
162163
request.addHeaders(headers);
163164
return this.pollingClient.delete(this.url, request).ignoreElement()
@@ -168,8 +169,10 @@ public Completable stop() {
168169
})).doOnError(e -> {
169170
cleanup(e.getMessage());
170171
});
172+
173+
stopCompletable.subscribe(closeSubject);
171174
}
172-
return Completable.complete();
175+
return closeSubject;
173176
}
174177

175178
private void cleanup(String error) {

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3127,6 +3127,84 @@ public void LongPollingTransportAccessTokenProviderThrowsDuringStop() {
31273127
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
31283128
}
31293129

3130+
@Test
3131+
public void stopWithoutObservingWithLongPollingTransportStops() {
3132+
AtomicInteger requestCount = new AtomicInteger(0);
3133+
CompletableSubject blockGet = CompletableSubject.create();
3134+
TestHttpClient client = new TestHttpClient()
3135+
.on("POST", "http://example.com/negotiate?negotiateVersion=1",
3136+
(req) -> Single.just(new HttpResponse(200, "",
3137+
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
3138+
+ "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))))
3139+
.on("GET", (req) -> {
3140+
if (requestCount.getAndIncrement() > 1) {
3141+
blockGet.blockingAwait();
3142+
}
3143+
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR)));
3144+
})
3145+
.on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> {
3146+
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
3147+
});
3148+
3149+
HubConnection hubConnection = HubConnectionBuilder
3150+
.create("http://example.com")
3151+
.withTransport(TransportEnum.LONG_POLLING)
3152+
.withHttpClient(client)
3153+
.build();
3154+
3155+
CompletableSubject closed = CompletableSubject.create();
3156+
hubConnection.onClosed((e) -> {
3157+
closed.onComplete();
3158+
});
3159+
3160+
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
3161+
3162+
hubConnection.stop();
3163+
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
3164+
blockGet.onComplete();
3165+
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
3166+
}
3167+
3168+
@Test
3169+
public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithLongPolling() {
3170+
AtomicInteger requestCount = new AtomicInteger(0);
3171+
CompletableSubject blockGet = CompletableSubject.create();
3172+
TestHttpClient client = new TestHttpClient()
3173+
.on("POST", "http://example.com/negotiate?negotiateVersion=1",
3174+
(req) -> Single.just(new HttpResponse(200, "",
3175+
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
3176+
+ "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))))
3177+
.on("GET", (req) -> {
3178+
if (requestCount.getAndIncrement() > 1) {
3179+
blockGet.blockingAwait();
3180+
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"type\":7}" + RECORD_SEPARATOR)));
3181+
}
3182+
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{}" + RECORD_SEPARATOR)));
3183+
})
3184+
.on("POST", "http://example.com?id=bVOiRPG8-6YiJ6d7ZcTOVQ", (req) -> {
3185+
return Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("")));
3186+
});
3187+
3188+
HubConnection hubConnection = HubConnectionBuilder
3189+
.create("http://example.com")
3190+
.withTransport(TransportEnum.LONG_POLLING)
3191+
.withHttpClient(client)
3192+
.build();
3193+
3194+
CompletableSubject closed = CompletableSubject.create();
3195+
hubConnection.onClosed((ex) -> {
3196+
closed.onComplete();
3197+
});
3198+
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
3199+
3200+
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
3201+
blockGet.onComplete();
3202+
3203+
closed.timeout(1, TimeUnit.SECONDS).blockingAwait();
3204+
3205+
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
3206+
}
3207+
31303208
@Test
31313209
public void receivingServerSentEventsTransportFromNegotiateFails() {
31323210
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate?negotiateVersion=1",

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/LongPollingTransportTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ public void LongPollingTransportRunsAccessTokenProviderEveryRequest() {
307307
}
308308

309309
@Test
310-
public void After204StopDoesNotTriggerOnClose() {
310+
public void After204StopDoesNotTriggerOnCloseAgain() {
311311
AtomicBoolean firstPoll = new AtomicBoolean(true);
312312
CompletableSubject block = CompletableSubject.create();
313313
TestHttpClient client = new TestHttpClient()
@@ -317,6 +317,9 @@ public void After204StopDoesNotTriggerOnClose() {
317317
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
318318
}
319319
return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer));
320+
})
321+
.on("DELETE", (req) -> {
322+
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
320323
});
321324

322325
Map<String, String> headers = new HashMap<>();
@@ -356,7 +359,7 @@ public void StoppingTransportRunsCloseHandlersOnce() {
356359
})
357360
.on("DELETE", (req) ->{
358361
//Unblock the last poll when we sent the DELETE request.
359-
block.onComplete();
362+
block.onComplete();
360363
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
361364
});
362365

@@ -373,4 +376,25 @@ public void StoppingTransportRunsCloseHandlersOnce() {
373376
assertEquals(1, onCloseCount.get());
374377
assertFalse(transport.isActive());
375378
}
379+
380+
@Test
381+
public void ErrorFromClosePropagatesOnSecondStopCall() {
382+
AtomicBoolean firstPoll = new AtomicBoolean(true);
383+
TestHttpClient client = new TestHttpClient()
384+
.on("GET", (req) -> {
385+
if (firstPoll.get()) {
386+
firstPoll.set(false);
387+
return Single.just(new HttpResponse(200, "", TestUtils.emptyByteBuffer));
388+
}
389+
return Single.just(new HttpResponse(204, "", TestUtils.emptyByteBuffer));
390+
});
391+
392+
Map<String, String> headers = new HashMap<>();
393+
LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just(""));
394+
395+
transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait();
396+
397+
RuntimeException exception = assertThrows(RuntimeException.class, () -> transport.stop().blockingAwait(100, TimeUnit.SECONDS));
398+
assertEquals("Request has no handler: DELETE http://example.com", exception.getMessage());
399+
}
376400
}

0 commit comments

Comments
 (0)