From 93420e58189dc2ce071c1a44924c5128e3f86105 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Tue, 8 Jan 2019 14:06:00 -0800 Subject: [PATCH 01/15] LongPolling transport for Java client --- .../microsoft/signalr/DefaultHttpClient.java | 126 ++++++++++-------- .../com/microsoft/signalr/HttpClient.java | 13 +- .../signalr/HttpHubConnectionBuilder.java | 17 ++- .../com/microsoft/signalr/HubConnection.java | 74 +++++----- .../microsoft/signalr/JsonHubProtocol.java | 5 +- .../signalr/LongPollingTransport.java | 123 +++++++++++++++++ .../com/microsoft/signalr/TransportEnum.java | 10 ++ .../microsoft/signalr/HubConnectionTest.java | 65 +++++++-- .../signalr/LongPollingTransportTest.java | 120 +++++++++++++++++ .../com/microsoft/signalr/TestHttpClient.java | 12 +- .../java/com/microsoft/signalr/TestUtils.java | 4 +- .../signalr/WebSocketTransportTest.java | 10 ++ 12 files changed, 472 insertions(+), 107 deletions(-) create mode 100644 src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java create mode 100644 src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/TransportEnum.java create mode 100644 src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index d8eee27c87a4..cbd9feee7f91 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -8,79 +8,95 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import io.reactivex.Single; import io.reactivex.subjects.SingleSubject; -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.Cookie; -import okhttp3.CookieJar; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okhttp3.ResponseBody; +import okhttp3.*; final class DefaultHttpClient extends HttpClient { - private final OkHttpClient client; + private OkHttpClient client = null; public DefaultHttpClient() { - this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() { - private List cookieList = new ArrayList<>(); - private Lock cookieLock = new ReentrantLock(); + this(5000, null); + } - @Override - public void saveFromResponse(HttpUrl url, List cookies) { - cookieLock.lock(); - try { - for (Cookie cookie : cookies) { - boolean replacedCookie = false; - for (int i = 0; i < cookieList.size(); i++) { - Cookie innerCookie = cookieList.get(i); - if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) { - // We have a new cookie that matches an older one so we replace the older one. - cookieList.set(i, innerCookie); - replacedCookie = true; - break; + public DefaultHttpClient cloneWithTimeOut(int timeoutInMilliseconds) { + OkHttpClient newClient = client.newBuilder().readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS) + .build(); + return new DefaultHttpClient(timeoutInMilliseconds, newClient); + } + + public DefaultHttpClient(int timeoutInMiliseconds, OkHttpClient client) { + if (client != null) { + this.client = client; + } else { + this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() { + private List cookieList = new ArrayList<>(); + private Lock cookieLock = new ReentrantLock(); + + @Override + public void saveFromResponse(HttpUrl url, List cookies) { + cookieLock.lock(); + try { + for (Cookie cookie : cookies) { + boolean replacedCookie = false; + for (int i = 0; i < cookieList.size(); i++) { + Cookie innerCookie = cookieList.get(i); + if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) { + // We have a new cookie that matches an older one so we replace the older one. + cookieList.set(i, innerCookie); + replacedCookie = true; + break; + } + } + if (!replacedCookie) { + cookieList.add(cookie); } } - if (!replacedCookie) { - cookieList.add(cookie); - } + } finally { + cookieLock.unlock(); } - } finally { - cookieLock.unlock(); } - } - @Override - public List loadForRequest(HttpUrl url) { - cookieLock.lock(); - try { - List matchedCookies = new ArrayList<>(); - List expiredCookies = new ArrayList<>(); - for (Cookie cookie : cookieList) { - if (cookie.expiresAt() < System.currentTimeMillis()) { - expiredCookies.add(cookie); - } else if (cookie.matches(url)) { - matchedCookies.add(cookie); + @Override + public List loadForRequest(HttpUrl url) { + cookieLock.lock(); + try { + List matchedCookies = new ArrayList<>(); + List expiredCookies = new ArrayList<>(); + for (Cookie cookie : cookieList) { + if (cookie.expiresAt() < System.currentTimeMillis()) { + expiredCookies.add(cookie); + } else if (cookie.matches(url)) { + matchedCookies.add(cookie); + } } - } - cookieList.removeAll(expiredCookies); - return matchedCookies; - } finally { - cookieLock.unlock(); + cookieList.removeAll(expiredCookies); + return matchedCookies; + } finally { + cookieLock.unlock(); + } } - } - }).build(); + }).readTimeout(timeoutInMiliseconds, TimeUnit.MILLISECONDS) + .build(); + } + } + + public DefaultHttpClient(int timeoutInMiliseonds) { + } @Override public Single send(HttpRequest httpRequest) { + return send(httpRequest, null); + } + + @Override + public Single send(HttpRequest httpRequest, String bodyContent) { Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl()); switch (httpRequest.getMethod()) { @@ -88,7 +104,13 @@ public Single send(HttpRequest httpRequest) { requestBuilder.get(); break; case "POST": - RequestBody body = RequestBody.create(null, new byte[]{}); + RequestBody body; + if(bodyContent != null) { + body = RequestBody.create(MediaType.parse("text/plain"), bodyContent); + } else { + body = RequestBody.create(null, new byte[]{}); + } + requestBuilder.post(body); break; case "DELETE": diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java index 9b7a1b4fded8..000c93519f44 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java @@ -95,6 +95,13 @@ public Single post(String url) { return this.send(request); } + public Single post(String url, String body) { + HttpRequest request = new HttpRequest(); + request.setUrl(url); + request.setMethod("POST"); + return this.send(request, body); + } + public Single post(String url, HttpRequest options) { options.setUrl(url); options.setMethod("POST"); @@ -116,5 +123,9 @@ public Single delete(String url, HttpRequest options) { public abstract Single send(HttpRequest request); + public abstract Single send(HttpRequest request, String body); + public abstract WebSocketWrapper createWebSocket(String url, Map headers); -} \ No newline at end of file + + public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds); +} diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java index e2a8cceccc9e..d91e382ed915 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java @@ -19,19 +19,26 @@ public class HttpHubConnectionBuilder { private Single accessTokenProvider; private long handshakeResponseTimeout = 0; private Map headers; + private TransportEnum transportEnum; HttpHubConnectionBuilder(String url) { this.url = url; } + //For testing purposes. The Transport interface isn't public. + HttpHubConnectionBuilder withTransportImplementation(Transport transport) { + this.transport = transport; + return this; + } + /** - * Sets the transport to be used by the {@link HubConnection}. + * Sets the transport type to indicate which transport to be used by the {@link HubConnection}. * - * @param transport The transport to be used. + * @param transportEnum The type of transport to be used. * @return This instance of the HttpHubConnectionBuilder. */ - HttpHubConnectionBuilder withTransport(Transport transport) { - this.transport = transport; + public HttpHubConnectionBuilder withTransport(TransportEnum transportEnum) { + this.transportEnum = transportEnum; return this; } @@ -112,6 +119,6 @@ public HttpHubConnectionBuilder withHeader(String name, String value) { * @return A new instance of {@link HubConnection}. */ public HubConnection build() { - return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers); + return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers, transportEnum); } } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index d8c9b931edac..ee8bdfbf217d 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -3,14 +3,7 @@ package com.microsoft.signalr; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -46,7 +39,7 @@ public class HubConnection { private Single accessTokenProvider; private final Map headers = new HashMap<>(); private ConnectionState connectionState = null; - private final HttpClient httpClient; + private HttpClient httpClient; private String stopError; private Timer pingTimer = null; private final AtomicLong nextServerTimeout = new AtomicLong(); @@ -56,6 +49,7 @@ public class HubConnection { private long tickRate = 1000; private CompletableSubject handshakeResponseSubject; private long handshakeResponseTimeout = 15*1000; + private TransportEnum transportEnum = TransportEnum.ALL; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); /** @@ -100,7 +94,7 @@ void setTickRate(long tickRateInMilliseconds) { } HubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient httpClient, - Single accessTokenProvider, long handshakeResponseTimeout, Map headers) { + Single accessTokenProvider, long handshakeResponseTimeout, Map headers, TransportEnum transportEnum) { if (url == null || url.isEmpty()) { throw new IllegalArgumentException("A valid url is required."); } @@ -122,6 +116,8 @@ void setTickRate(long tickRateInMilliseconds) { if (transport != null) { this.transport = transport; + } else if (transportEnum != null) { + this.transportEnum = transportEnum; } if (handshakeResponseTimeout > 0) { @@ -301,7 +297,13 @@ public Completable start() { negotiate.flatMapCompletable(url -> { logger.debug("Starting HubConnection."); if (transport == null) { - transport = new WebSocketTransport(headers, httpClient); + switch (transportEnum){ + case LONG_POLLING: + transport = new LongPollingTransport(headers, httpClient); + break; + default: + transport = new WebSocketTransport(headers, httpClient); + } } transport.setOnReceive(this.callback); @@ -311,37 +313,40 @@ public Completable start() { String handshake = HandshakeProtocol.createHandshakeRequestMessage( new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); + connectionState = new ConnectionState(this); + return transport.send(handshake).andThen(Completable.defer(() -> { timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS); return handshakeResponseSubject.andThen(Completable.defer(() -> { hubConnectionStateLock.lock(); try { - connectionState = new ConnectionState(this); hubConnectionState = HubConnectionState.CONNECTED; logger.info("HubConnection started."); - resetServerTimeout(); - this.pingTimer = new Timer(); - this.pingTimer.schedule(new TimerTask() { - @Override - public void run() { - try { - if (System.currentTimeMillis() > nextServerTimeout.get()) { - stop("Server timeout elapsed without receiving a message from the server."); - return; + //Don't send pings if we're using long polling. + if(transportEnum != TransportEnum.LONG_POLLING) { + this.pingTimer = new Timer(); + this.pingTimer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (System.currentTimeMillis() > nextServerTimeout.get()) { + stop("Server timeout elapsed without receiving a message from the server."); + return; + } + + if (System.currentTimeMillis() > nextPingActivation.get()) { + sendHubMessage(PingMessage.getInstance()); + } + } catch (Exception e) { + logger.warn("Error sending ping: {}.", e.getMessage()); + // The connection is probably in a bad or closed state now, cleanup the timer so + // it stops triggering + pingTimer.cancel(); } - - if (System.currentTimeMillis() > nextPingActivation.get()) { - sendHubMessage(PingMessage.getInstance()); - } - } catch (Exception e) { - logger.warn("Error sending ping: {}.", e.getMessage()); - // The connection is probably in a bad or closed state now, cleanup the timer so - // it stops triggering - pingTimer.cancel(); } - } - }, new Date(0), tickRate); + }, new Date(0), tickRate); + } } finally { hubConnectionStateLock.unlock(); } @@ -367,7 +372,10 @@ private Single startNegotiate(String url, int negotiateAttempts) { } if (response.getRedirectUrl() == null) { - if (!response.getAvailableTransports().contains("WebSockets")) { + Set transports = response.getAvailableTransports(); + if ((this.transportEnum == TransportEnum.ALL && !(transports.contains("WebSockets") || transports.contains("LongPolling"))) || + (this.transportEnum == TransportEnum.WEBSOCKETS && !transports.contains("WebSockets")) || + (this.transportEnum == TransportEnum.LONG_POLLING && !transports.contains("LongPolling"))) { throw new RuntimeException("There were no compatible transports on the server."); } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java index 57bb5d1a7b26..4a2b928ba20e 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java @@ -37,7 +37,10 @@ public TransferFormat getTransferFormat() { @Override public HubMessage[] parseMessages(String payload, InvocationBinder binder) { - if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) { + if(payload.length() == 0 ){ + return new HubMessage[]{}; + } + if (!(payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR))) { throw new RuntimeException("Message is incomplete."); } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java new file mode 100644 index 000000000000..f54c4c89c2b3 --- /dev/null +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -0,0 +1,123 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +import io.reactivex.Completable; +import io.reactivex.subjects.CompletableSubject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +class LongPollingTransport implements Transport { + private OnReceiveCallBack onReceiveCallBack; + private TransportOnClosedCallback onClose; + private String url; + private final HttpClient client; + private final HttpClient pollingClient; + private final Map headers; + private static final int POLL_TIMEOUT = 100*1000; + private volatile Boolean active; + private String pollUrl; + private String closeError; + private CompletableSubject pollCompletableSubject = CompletableSubject.create(); + private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); + + public LongPollingTransport(Map headers, HttpClient client) { + this.headers = headers; + this.client = client; + this.pollingClient = client.cloneWithTimeOut(POLL_TIMEOUT); + } + + //Package private active accessor for testing. + boolean isActive(){ + return this.active; + } + + @Override + public Completable start(String url) { + this.active = true; + logger.info("Starting LongPolling transport"); + this.url = url; + pollUrl = url + "&_=" + System.currentTimeMillis(); + logger.info("Polling {}", pollUrl); + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + return this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { + if (response.getStatusCode() != 200){ + logger.error("Unexpected response code {}", response.getStatusCode()); + this.active = false; + return Completable.error(new Exception("Failed to connect.")); + } else { + logger.info("Activating poll loop", response.getStatusCode()); + this.active = true; + } + poll(url).subscribeWith(pollCompletableSubject); + + return Completable.complete(); + }); + } + + private Completable poll(String url){ + if (this.active) { + pollUrl = url + "&_=" + System.currentTimeMillis(); + logger.info("Polling {}", pollUrl); + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + Completable pollingCompletable = this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { + if (response.getStatusCode() == 204) { + logger.info("LongPolling transport terminated by server."); + this.active = false; + } else if (response.getStatusCode() != 200) { + logger.error("Unexpected response code {}", response.getStatusCode()); + this.active = false; + this.closeError = "Unexpected response code " + response.getStatusCode(); + } else { + logger.info("Message received"); + new Thread(() -> this.onReceive(response.getContent())).start(); + } + return poll(url); }); + return pollingCompletable; + } else { + logger.info("Long Polling transport polling complete."); + pollCompletableSubject.onComplete(); + this.stop(); + } + + return Completable.complete(); + } + + @Override + public Completable send(String message) { + return Completable.fromSingle(this.client.post(url, message)); + } + + @Override + public void setOnReceive(OnReceiveCallBack callback) { + this.onReceiveCallBack = callback; + } + + @Override + public void onReceive(String message) { + this.onReceiveCallBack.invoke(message); + logger.debug("OnReceived callback has been invoked."); + } + + @Override + public void setOnClose(TransportOnClosedCallback onCloseCallback) { + this.onClose = onCloseCallback; + } + + @Override + public Completable stop() { + this.active = false; + this.pollingClient.delete(this.url); + CompletableSubject stopCompletableSubject = CompletableSubject.create(); + return this.pollCompletableSubject.andThen(Completable.defer(() -> { + logger.info("LongPolling transport stopped."); + this.onClose.invoke(this.closeError); + return Completable.complete(); + })).subscribeWith(stopCompletableSubject); + } +} diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/TransportEnum.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/TransportEnum.java new file mode 100644 index 000000000000..129ced21b5dc --- /dev/null +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/TransportEnum.java @@ -0,0 +1,10 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +public enum TransportEnum { + ALL, + WEBSOCKETS, + LONG_POLLING +} diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index a5590d0381ef..f61d0d221a74 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -66,7 +66,7 @@ public void transportCloseWithErrorTriggersStopInHubConnection() { public void checkHubConnectionStateNoHandShakeResponse() { MockTransport mockTransport = new MockTransport(false); HubConnection hubConnection = HubConnectionBuilder.create("http://example.com") - .withTransport(mockTransport) + .withTransportImplementation(mockTransport) .withHttpClient(new TestHttpClient()) .shouldSkipNegotiate(true) .withHandshakeResponseTimeout(100) @@ -1179,7 +1179,7 @@ public void negotiateThatRedirectsForeverFailsAfter100Tries() { } @Test - public void afterSuccessfulNegotiateConnectsWithTransport() { + public void afterSuccessfulNegotiateConnectsWithWebsocketsTransport() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" @@ -1188,7 +1188,7 @@ public void afterSuccessfulNegotiateConnectsWithTransport() { MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .build(); @@ -1199,6 +1199,47 @@ public void afterSuccessfulNegotiateConnectsWithTransport() { assertEquals("{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR, sentMessages[0]); } + @Test + public void afterSuccessfulNegotiateConnectsWithLongPollingTransport() { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> Single.just(new HttpResponse(200, "", + "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"LongPolling\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + MockTransport transport = new MockTransport(true); + HubConnection hubConnection = HubConnectionBuilder + .create("http://example.com") + .withTransportImplementation(transport) + .withHttpClient(client) + .build(); + + hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); + + String[] sentMessages = transport.getSentMessages(); + assertEquals(1, sentMessages.length); + assertEquals("{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR, sentMessages[0]); + } + + @Test + public void receivingServerSentEventsTransportFromNegotiateFails() { + TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", + (req) -> Single.just(new HttpResponse(200, "", + "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" + + "availableTransports\":[{\"transport\":\"ServerSentEvents\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + MockTransport transport = new MockTransport(true); + HubConnection hubConnection = HubConnectionBuilder + .create("http://example.com") + .withTransportImplementation(transport) + .withHttpClient(client) + .build(); + + RuntimeException exception = assertThrows(RuntimeException.class, + () -> hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait()); + + assertEquals(exception.getMessage(), "There were no compatible transports on the server."); + } + @Test public void negotiateThatReturnsErrorThrowsFromStart() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", @@ -1208,7 +1249,7 @@ public void negotiateThatReturnsErrorThrowsFromStart() { HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") .withHttpClient(client) - .withTransport(transport) + .withTransportImplementation(transport) .build(); RuntimeException exception = assertThrows(RuntimeException.class, @@ -1227,7 +1268,7 @@ public void negotiateRedirectIsFollowed() { MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .build(); @@ -1250,7 +1291,7 @@ public void accessTokenProviderIsUsedForNegotiate() { MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .withAccessTokenProvider(Single.just("secretToken")) .build(); @@ -1275,7 +1316,7 @@ public void accessTokenProviderIsOverriddenFromRedirectNegotiate() { MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .withAccessTokenProvider(Single.just("secretToken")) .build(); @@ -1335,7 +1376,7 @@ public void headersAreSetAndSentThroughBuilder() { MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder.create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .withHeader("ExampleHeader", "ExampleValue") .build(); @@ -1360,7 +1401,7 @@ public void sameHeaderSetTwiceGetsOverwritten() { MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder.create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .withHeader("ExampleHeader", "ExampleValue") .withHeader("ExampleHeader", "New Value") @@ -1377,7 +1418,7 @@ public void hubConnectionCanBeStartedAfterBeingStopped() { MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .shouldSkipNegotiate(true) .build(); @@ -1401,7 +1442,7 @@ public void hubConnectionCanBeStartedAfterBeingStoppedAndRedirected() { HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(mockTransport) + .withTransportImplementation(mockTransport) .withHttpClient(client) .build(); @@ -1424,7 +1465,7 @@ public void non200FromNegotiateThrowsError() { MockTransport transport = new MockTransport(); HubConnection hubConnection = HubConnectionBuilder .create("http://example.com") - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .build(); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java new file mode 100644 index 000000000000..5ea01c021664 --- /dev/null +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -0,0 +1,120 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +package com.microsoft.signalr; + +import io.reactivex.Single; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +public class LongPollingTransportTest { + + @Test + public void LongPollingFailsToConnectWith404Response() { + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> Single.just(new HttpResponse(404, "", ""))); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client); + Throwable exception = assertThrows(RuntimeException.class, () -> transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals(Exception.class, exception.getCause().getClass()); + assertEquals("Failed to connect.", exception.getCause().getMessage()); + assertFalse(transport.isActive()); + } + + @Test + public void StatusCode204StopsLongPolling() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", "")); + } + return Single.just(new HttpResponse(204, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client); + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + assertFalse(transport.isActive()); + } + + @Test + public void StatusCode204StopsLongPollingTriggersOnClosed() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", "")); + } + return Single.just(new HttpResponse(204, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client); + AtomicBoolean onClosedRan = new AtomicBoolean(false); + transport.setOnClose((error) -> { + onClosedRan.set(true); + }); + + assertFalse(onClosedRan.get()); + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + assertTrue(onClosedRan.get()); + assertFalse(transport.isActive()); + } + + @Test + public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", "")); + } + return Single.just(new HttpResponse(999, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client); + AtomicBoolean onClosedRan = new AtomicBoolean(false); + transport.setOnClose((error) -> { + onClosedRan.set(true); + assertEquals("Unexpected response code 999", error); + }); + + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + assertFalse(transport.isActive()); + assertTrue(onClosedRan.get()); + } + + @Test + public void CanSetAndTriggerOnReceive() { + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> Single.just(new HttpResponse(200, "", ""))); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client); + + AtomicBoolean onReceivedRan = new AtomicBoolean(false); + transport.setOnReceive((message) -> { + onReceivedRan.set(true); + assertEquals("TEST", message); + }); + + // The transport doesn't need to be active to trigger onReceive for the case + // when we are handling the last outstanding poll. + transport.onReceive("TEST"); + assertTrue(onReceivedRan.get()); + transport.stop(); + } +} diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java index 7c95f25c00bb..cf969638298b 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java @@ -26,6 +26,11 @@ public Single send(HttpRequest request) { return this.handler.invoke(request); } + @Override + public Single send(HttpRequest request, String body) { + return null; + } + public List getSentRequests() { return sentRequests; } @@ -66,7 +71,12 @@ public WebSocketWrapper createWebSocket(String url, Map headers) throw new RuntimeException("WebSockets isn't supported in testing currently."); } + @Override + public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) { + return this; + } + interface TestHttpRequestHandler { Single invoke(HttpRequest request); } -} \ No newline at end of file +} diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestUtils.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestUtils.java index 9026c392b642..795df060e448 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestUtils.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestUtils.java @@ -14,10 +14,10 @@ static HubConnection createHubConnection(String url, Transport transport) { static HubConnection createHubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient client) { HttpHubConnectionBuilder builder = HubConnectionBuilder.create(url) - .withTransport(transport) + .withTransportImplementation(transport) .withHttpClient(client) .shouldSkipNegotiate(skipNegotiate); return builder.build(); } -} \ No newline at end of file +} diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java index a68e69195d11..239bc80f5b0a 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java @@ -41,10 +41,20 @@ public Single send(HttpRequest request) { return null; } + @Override + public Single send(HttpRequest request, String body) { + return null; + } + @Override public WebSocketWrapper createWebSocket(String url, Map headers) { return new TestWrapper(); } + + @Override + public HttpClient cloneWithTimeOut(int timeoutInMiliseconds) { + return null; + } } class TestWrapper extends WebSocketWrapper { From 32e5b680e54eec34543fc94919f919d7efefa6e3 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 6 Feb 2019 10:39:46 -0800 Subject: [PATCH 02/15] Java Spotless Apply --- .../com/microsoft/signalr/DefaultHttpClient.java | 2 +- .../java/com/microsoft/signalr/HubConnection.java | 4 ++-- .../java/com/microsoft/signalr/JsonHubProtocol.java | 2 +- .../com/microsoft/signalr/LongPollingTransport.java | 13 +++++++------ .../com/microsoft/signalr/HubConnectionTest.java | 1 - .../microsoft/signalr/LongPollingTransportTest.java | 8 ++++---- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index cbd9feee7f91..6b2ba9bd7211 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -105,7 +105,7 @@ public Single send(HttpRequest httpRequest, String bodyContent) { break; case "POST": RequestBody body; - if(bodyContent != null) { + if (bodyContent != null) { body = RequestBody.create(MediaType.parse("text/plain"), bodyContent); } else { body = RequestBody.create(null, new byte[]{}); diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index ee8bdfbf217d..4810a9def012 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -297,7 +297,7 @@ public Completable start() { negotiate.flatMapCompletable(url -> { logger.debug("Starting HubConnection."); if (transport == null) { - switch (transportEnum){ + switch (transportEnum) { case LONG_POLLING: transport = new LongPollingTransport(headers, httpClient); break; @@ -324,7 +324,7 @@ public Completable start() { logger.info("HubConnection started."); resetServerTimeout(); //Don't send pings if we're using long polling. - if(transportEnum != TransportEnum.LONG_POLLING) { + if (transportEnum != TransportEnum.LONG_POLLING) { this.pingTimer = new Timer(); this.pingTimer.schedule(new TimerTask() { @Override diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java index 4a2b928ba20e..48dc04f52e9e 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java @@ -37,7 +37,7 @@ public TransferFormat getTransferFormat() { @Override public HubMessage[] parseMessages(String payload, InvocationBinder binder) { - if(payload.length() == 0 ){ + if (payload.length() == 0 ) { return new HubMessage[]{}; } if (!(payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR))) { diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index f54c4c89c2b3..ed048cde4a3b 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -3,12 +3,13 @@ package com.microsoft.signalr; -import io.reactivex.Completable; -import io.reactivex.subjects.CompletableSubject; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; +import io.reactivex.Completable; +import io.reactivex.subjects.CompletableSubject; class LongPollingTransport implements Transport { private OnReceiveCallBack onReceiveCallBack; @@ -31,7 +32,7 @@ public LongPollingTransport(Map headers, HttpClient client) { } //Package private active accessor for testing. - boolean isActive(){ + boolean isActive() { return this.active; } @@ -45,7 +46,7 @@ public Completable start(String url) { HttpRequest request = new HttpRequest(); request.addHeaders(headers); return this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { - if (response.getStatusCode() != 200){ + if (response.getStatusCode() != 200) { logger.error("Unexpected response code {}", response.getStatusCode()); this.active = false; return Completable.error(new Exception("Failed to connect.")); @@ -59,7 +60,7 @@ public Completable start(String url) { }); } - private Completable poll(String url){ + private Completable poll(String url) { if (this.active) { pollUrl = url + "&_=" + System.currentTimeMillis(); logger.info("Polling {}", pollUrl); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index f61d0d221a74..cd4c5bbe5ced 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -8,7 +8,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 5ea01c021664..7bbef1990310 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -3,16 +3,16 @@ package com.microsoft.signalr; -import io.reactivex.Single; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; + +import io.reactivex.Single; public class LongPollingTransportTest { From be5e833eea120164f5c150b6f31a49b60e1b497c Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 6 Feb 2019 15:46:53 -0800 Subject: [PATCH 03/15] Some pr feedback --- .../microsoft/signalr/DefaultHttpClient.java | 4 -- .../com/microsoft/signalr/HubConnection.java | 2 +- .../signalr/LongPollingTransport.java | 47 +++++++++++++------ .../signalr/LongPollingTransportTest.java | 17 ++++--- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index 6b2ba9bd7211..91c4f10a4771 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -86,10 +86,6 @@ public List loadForRequest(HttpUrl url) { } } - public DefaultHttpClient(int timeoutInMiliseonds) { - - } - @Override public Single send(HttpRequest httpRequest) { return send(httpRequest, null); diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 4810a9def012..29d183a1d34e 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -299,7 +299,7 @@ public Completable start() { if (transport == null) { switch (transportEnum) { case LONG_POLLING: - transport = new LongPollingTransport(headers, httpClient); + transport = new LongPollingTransport(headers, httpClient, accessTokenProvider); break; default: transport = new WebSocketTransport(headers, httpClient); diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index ed048cde4a3b..1a004a619e60 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -5,6 +5,7 @@ import java.util.Map; +import io.reactivex.Single; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,13 +23,15 @@ class LongPollingTransport implements Transport { private volatile Boolean active; private String pollUrl; private String closeError; - private CompletableSubject pollCompletableSubject = CompletableSubject.create(); + private Single accessTokenProvider; + private CompletableSubject receiveLoop = CompletableSubject.create(); private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); - public LongPollingTransport(Map headers, HttpClient client) { + public LongPollingTransport(Map headers, HttpClient client, Single accessTokenProvider) { this.headers = headers; this.client = client; this.pollingClient = client.cloneWithTimeOut(POLL_TIMEOUT); + this.accessTokenProvider = accessTokenProvider; } //Package private active accessor for testing. @@ -36,25 +39,35 @@ boolean isActive() { return this.active; } + private void updateHeaderToken() { + this.accessTokenProvider.flatMap((token) -> { + if(!token.isEmpty()) { + this.headers.put("Authorization", "Bearer " + token); + } + return Single.just(""); + }); + + } + @Override public Completable start(String url) { this.active = true; logger.info("Starting LongPolling transport"); this.url = url; pollUrl = url + "&_=" + System.currentTimeMillis(); - logger.info("Polling {}", pollUrl); + logger.debug("Polling {}", pollUrl); + this.updateHeaderToken(); HttpRequest request = new HttpRequest(); request.addHeaders(headers); return this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { if (response.getStatusCode() != 200) { - logger.error("Unexpected response code {}", response.getStatusCode()); + logger.error("Unexpected response code {}.", response.getStatusCode()); this.active = false; return Completable.error(new Exception("Failed to connect.")); } else { - logger.info("Activating poll loop", response.getStatusCode()); this.active = true; } - poll(url).subscribeWith(pollCompletableSubject); + poll(url).subscribeWith(receiveLoop); return Completable.complete(); }); @@ -64,6 +77,7 @@ private Completable poll(String url) { if (this.active) { pollUrl = url + "&_=" + System.currentTimeMillis(); logger.info("Polling {}", pollUrl); + this.updateHeaderToken(); HttpRequest request = new HttpRequest(); request.addHeaders(headers); Completable pollingCompletable = this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { @@ -71,26 +85,30 @@ private Completable poll(String url) { logger.info("LongPolling transport terminated by server."); this.active = false; } else if (response.getStatusCode() != 200) { - logger.error("Unexpected response code {}", response.getStatusCode()); + logger.error("Unexpected response code {}.", response.getStatusCode()); this.active = false; this.closeError = "Unexpected response code " + response.getStatusCode(); } else { - logger.info("Message received"); - new Thread(() -> this.onReceive(response.getContent())).start(); + if(response.getContent() != null) { + logger.debug("Message received."); + new Thread(() -> this.onReceive(response.getContent())).start(); + } else { + logger.debug("Poll timed out, reissuing."); + } + } return poll(url); }); return pollingCompletable; } else { logger.info("Long Polling transport polling complete."); - pollCompletableSubject.onComplete(); - this.stop(); + receiveLoop.onComplete(); + return this.stop(); } - - return Completable.complete(); } @Override public Completable send(String message) { + this.updateHeaderToken(); return Completable.fromSingle(this.client.post(url, message)); } @@ -113,9 +131,10 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) { @Override public Completable stop() { this.active = false; + this.updateHeaderToken(); this.pollingClient.delete(this.url); CompletableSubject stopCompletableSubject = CompletableSubject.create(); - return this.pollCompletableSubject.andThen(Completable.defer(() -> { + return this.receiveLoop.andThen(Completable.defer(() -> { logger.info("LongPolling transport stopped."); this.onClose.invoke(this.closeError); return Completable.complete(); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 7bbef1990310..f91c72ef452d 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -22,7 +22,7 @@ public void LongPollingFailsToConnectWith404Response() { .on("GET", (req) -> Single.just(new HttpResponse(404, "", ""))); Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); Throwable exception = assertThrows(RuntimeException.class, () -> transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait()); assertEquals(Exception.class, exception.getCause().getClass()); assertEquals("Failed to connect.", exception.getCause().getMessage()); @@ -42,8 +42,8 @@ public void StatusCode204StopsLongPolling() { }); Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client); - transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); assertFalse(transport.isActive()); } @@ -60,14 +60,14 @@ public void StatusCode204StopsLongPollingTriggersOnClosed() { }); Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); AtomicBoolean onClosedRan = new AtomicBoolean(false); transport.setOnClose((error) -> { onClosedRan.set(true); }); assertFalse(onClosedRan.get()); - transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); assertTrue(onClosedRan.get()); assertFalse(transport.isActive()); } @@ -85,14 +85,14 @@ public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { }); Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); AtomicBoolean onClosedRan = new AtomicBoolean(false); transport.setOnClose((error) -> { onClosedRan.set(true); assertEquals("Unexpected response code 999", error); }); - transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); assertFalse(transport.isActive()); assertTrue(onClosedRan.get()); } @@ -103,7 +103,7 @@ public void CanSetAndTriggerOnReceive() { .on("GET", (req) -> Single.just(new HttpResponse(200, "", ""))); Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); AtomicBoolean onReceivedRan = new AtomicBoolean(false); transport.setOnReceive((message) -> { @@ -115,6 +115,5 @@ public void CanSetAndTriggerOnReceive() { // when we are handling the last outstanding poll. transport.onReceive("TEST"); assertTrue(onReceivedRan.get()); - transport.stop(); } } From d4929021eed8017aeded9132cad647f7dd3bb2e9 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 6 Feb 2019 17:35:24 -0800 Subject: [PATCH 04/15] spellcheck --- .../main/java/com/microsoft/signalr/DefaultHttpClient.java | 4 ++-- .../java/com/microsoft/signalr/WebSocketTransportTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index 91c4f10a4771..1d45898241b7 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -29,7 +29,7 @@ public DefaultHttpClient cloneWithTimeOut(int timeoutInMilliseconds) { return new DefaultHttpClient(timeoutInMilliseconds, newClient); } - public DefaultHttpClient(int timeoutInMiliseconds, OkHttpClient client) { + public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) { if (client != null) { this.client = client; } else { @@ -81,7 +81,7 @@ public List loadForRequest(HttpUrl url) { cookieLock.unlock(); } } - }).readTimeout(timeoutInMiliseconds, TimeUnit.MILLISECONDS) + }).readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS) .build(); } } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java index 239bc80f5b0a..a3a35955503f 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/WebSocketTransportTest.java @@ -52,7 +52,7 @@ public WebSocketWrapper createWebSocket(String url, Map headers) } @Override - public HttpClient cloneWithTimeOut(int timeoutInMiliseconds) { + public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) { return null; } } From cf5831ffe7466835e98a66a99d13a2ccd1bfbee9 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Wed, 6 Feb 2019 19:25:01 -0800 Subject: [PATCH 05/15] tests --- .../signalr/LongPollingTransport.java | 5 +- .../signalr/LongPollingTransportTest.java | 86 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 1a004a619e60..23822b293dcc 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -20,7 +20,7 @@ class LongPollingTransport implements Transport { private final HttpClient pollingClient; private final Map headers; private static final int POLL_TIMEOUT = 100*1000; - private volatile Boolean active; + private volatile Boolean active = false; private String pollUrl; private String closeError; private Single accessTokenProvider; @@ -108,6 +108,9 @@ private Completable poll(String url) { @Override public Completable send(String message) { + if (!this.active) { + return Completable.error(new Exception("Cannot send unless the transport is active.")); + } this.updateHeaderToken(); return Completable.fromSingle(this.client.post(url, message)); } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index f91c72ef452d..8e824c9cbe95 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; @@ -29,6 +31,19 @@ public void LongPollingFailsToConnectWith404Response() { assertFalse(transport.isActive()); } + @Test + public void LongPollingTransportCantSendBeforeStart() { + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> Single.just(new HttpResponse(404, "", ""))); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + Throwable exception = assertThrows(RuntimeException.class, () -> transport.send("First").timeout(1, TimeUnit.SECONDS).blockingAwait()); + assertEquals(Exception.class, exception.getCause().getClass()); + assertEquals("Cannot send unless the transport is active.", exception.getCause().getMessage()); + assertFalse(transport.isActive()); + } + @Test public void StatusCode204StopsLongPolling() { AtomicBoolean firstPoll = new AtomicBoolean(true); @@ -116,4 +131,75 @@ public void CanSetAndTriggerOnReceive() { transport.onReceive("TEST"); assertTrue(onReceivedRan.get()); } + + @Test + public void LongPollingTransportOnReceiveGetsCalled() { + AtomicInteger requestCount = new AtomicInteger(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (requestCount.get() == 0) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "")); + } else if (requestCount.get() == 1){ + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "TEST")); + } + + return Single.just(new HttpResponse(204, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + + AtomicBoolean onReceiveCalled = new AtomicBoolean(false); + AtomicReference message = new AtomicReference<>(); + transport.setOnReceive((msg -> { + onReceiveCalled.set(true); + message.set(msg); + }) ); + + transport.setOnClose((error) -> {}); + + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + + assertTrue(onReceiveCalled.get()); + assertEquals("TEST", message.get()); + } + + @Test + public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { + AtomicInteger requestCount = new AtomicInteger(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (requestCount.get() == 0) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "")); + } else if (requestCount.get() == 1){ + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "FIRST")); + } else if (requestCount.get() == 2) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "SECOND")); + } + + return Single.just(new HttpResponse(204, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + + AtomicBoolean onReceiveCalled = new AtomicBoolean(false); + AtomicReference message = new AtomicReference<>(""); + transport.setOnReceive((msg -> { + onReceiveCalled.set(true); + message.set(message.get() + msg); + }) ); + + transport.setOnClose((error) -> {}); + + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + + assertTrue(onReceiveCalled.get()); + assertEquals("FIRSTSECOND", message.get()); + } } From e9139bb1c42cb3ddb9c30d6d2e5c3a94bfc4b429 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 7 Feb 2019 13:39:32 -0800 Subject: [PATCH 06/15] thread pool --- .../java/com/microsoft/signalr/LongPollingTransport.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 23822b293dcc..5569f400b831 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -4,6 +4,8 @@ package com.microsoft.signalr; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.reactivex.Single; import org.slf4j.Logger; @@ -25,6 +27,8 @@ class LongPollingTransport implements Transport { private String closeError; private Single accessTokenProvider; private CompletableSubject receiveLoop = CompletableSubject.create(); + private final ExecutorService threadPool = Executors.newCachedThreadPool(); + private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); public LongPollingTransport(Map headers, HttpClient client, Single accessTokenProvider) { @@ -91,7 +95,7 @@ private Completable poll(String url) { } else { if(response.getContent() != null) { logger.debug("Message received."); - new Thread(() -> this.onReceive(response.getContent())).start(); + threadPool.execute(() -> this.onReceive(response.getContent())); } else { logger.debug("Poll timed out, reissuing."); } From f87bf4672545e2dbf1e17533690c5a8dd8f175b1 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 7 Feb 2019 13:42:17 -0800 Subject: [PATCH 07/15] clean --- .../main/java/com/microsoft/signalr/LongPollingTransport.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 5569f400b831..81d386d34eb6 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -27,7 +27,7 @@ class LongPollingTransport implements Transport { private String closeError; private Single accessTokenProvider; private CompletableSubject receiveLoop = CompletableSubject.create(); - private final ExecutorService threadPool = Executors.newCachedThreadPool(); + private ExecutorService threadPool; private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); @@ -71,6 +71,7 @@ public Completable start(String url) { } else { this.active = true; } + this.threadPool = Executors.newCachedThreadPool(); poll(url).subscribeWith(receiveLoop); return Completable.complete(); From 71c1a0bc0e9a6533f5d1abfc77c0c511437fc0b6 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 7 Feb 2019 13:44:33 -0800 Subject: [PATCH 08/15] spotless --- .../java/com/microsoft/signalr/LongPollingTransport.java | 6 +++--- .../com/microsoft/signalr/LongPollingTransportTest.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 81d386d34eb6..eadc5e78268a 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -7,11 +7,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import io.reactivex.Single; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.reactivex.Completable; +import io.reactivex.Single; import io.reactivex.subjects.CompletableSubject; class LongPollingTransport implements Transport { @@ -45,7 +45,7 @@ boolean isActive() { private void updateHeaderToken() { this.accessTokenProvider.flatMap((token) -> { - if(!token.isEmpty()) { + if (!token.isEmpty()) { this.headers.put("Authorization", "Bearer " + token); } return Single.just(""); @@ -94,7 +94,7 @@ private Completable poll(String url) { this.active = false; this.closeError = "Unexpected response code " + response.getStatusCode(); } else { - if(response.getContent() != null) { + if (response.getContent() != null) { logger.debug("Message received."); threadPool.execute(() -> this.onReceive(response.getContent())); } else { diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 8e824c9cbe95..712428e6f86d 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -140,7 +140,7 @@ public void LongPollingTransportOnReceiveGetsCalled() { if (requestCount.get() == 0) { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "")); - } else if (requestCount.get() == 1){ + } else if (requestCount.get() == 1) { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "TEST")); } @@ -174,7 +174,7 @@ public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { if (requestCount.get() == 0) { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "")); - } else if (requestCount.get() == 1){ + } else if (requestCount.get() == 1) { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "FIRST")); } else if (requestCount.get() == 2) { From 34c836ff88ee1547b7c0b65dafb077c12253fc85 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 7 Feb 2019 17:33:34 -0800 Subject: [PATCH 09/15] pr fb --- .../microsoft/signalr/DefaultHttpClient.java | 13 ++- .../com/microsoft/signalr/HubConnection.java | 48 +++++----- .../microsoft/signalr/JsonHubProtocol.java | 2 +- .../signalr/LongPollingTransport.java | 96 ++++++++++--------- .../signalr/LongPollingTransportTest.java | 40 ++++++++ 5 files changed, 126 insertions(+), 73 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index 1d45898241b7..713c5a9f343d 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -20,7 +20,7 @@ final class DefaultHttpClient extends HttpClient { private OkHttpClient client = null; public DefaultHttpClient() { - this(5000, null); + this(0, null); } public DefaultHttpClient cloneWithTimeOut(int timeoutInMilliseconds) { @@ -33,7 +33,8 @@ public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) { if (client != null) { this.client = client; } else { - this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() { + + OkHttpClient.Builder builder = new OkHttpClient.Builder().cookieJar(new CookieJar() { private List cookieList = new ArrayList<>(); private Lock cookieLock = new ReentrantLock(); @@ -81,8 +82,12 @@ public List loadForRequest(HttpUrl url) { cookieLock.unlock(); } } - }).readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS) - .build(); + }); + + if (timeoutInMilliseconds > 0){ + builder.readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS); + } + this.client = builder.build(); } } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 29d183a1d34e..6bac938c72a2 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -325,27 +325,7 @@ public Completable start() { resetServerTimeout(); //Don't send pings if we're using long polling. if (transportEnum != TransportEnum.LONG_POLLING) { - this.pingTimer = new Timer(); - this.pingTimer.schedule(new TimerTask() { - @Override - public void run() { - try { - if (System.currentTimeMillis() > nextServerTimeout.get()) { - stop("Server timeout elapsed without receiving a message from the server."); - return; - } - - if (System.currentTimeMillis() > nextPingActivation.get()) { - sendHubMessage(PingMessage.getInstance()); - } - } catch (Exception e) { - logger.warn("Error sending ping: {}.", e.getMessage()); - // The connection is probably in a bad or closed state now, cleanup the timer so - // it stops triggering - pingTimer.cancel(); - } - } - }, new Date(0), tickRate); + activatePingTimer(); } } finally { hubConnectionStateLock.unlock(); @@ -361,6 +341,30 @@ public void run() { return start; } + private void activatePingTimer() { + this.pingTimer = new Timer(); + this.pingTimer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (System.currentTimeMillis() > nextServerTimeout.get()) { + stop("Server timeout elapsed without receiving a message from the server."); + return; + } + + if (System.currentTimeMillis() > nextPingActivation.get()) { + sendHubMessage(PingMessage.getInstance()); + } + } catch (Exception e) { + logger.warn("Error sending ping: {}.", e.getMessage()); + // The connection is probably in a bad or closed state now, cleanup the timer so + // it stops triggering + pingTimer.cancel(); + } + } + }, new Date(0), tickRate); + } + private Single startNegotiate(String url, int negotiateAttempts) { if (hubConnectionState != HubConnectionState.DISCONNECTED) { return Single.just(null); @@ -571,7 +575,7 @@ private void sendHubMessage(HubMessage message) { } else { logger.debug("Sending {} message.", message.getMessageType().name()); } - transport.send(serializedMessage); + transport.send(serializedMessage).subscribeWith(CompletableSubject.create()); resetKeepAlive(); } diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java index 48dc04f52e9e..fa4d3212739d 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java @@ -37,7 +37,7 @@ public TransferFormat getTransferFormat() { @Override public HubMessage[] parseMessages(String payload, InvocationBinder binder) { - if (payload.length() == 0 ) { + if (payload.length() == 0) { return new HubMessage[]{}; } if (!(payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR))) { diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index eadc5e78268a..72d6e7958080 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -43,14 +43,13 @@ boolean isActive() { return this.active; } - private void updateHeaderToken() { - this.accessTokenProvider.flatMap((token) -> { + private Single updateHeaderToken() { + return this.accessTokenProvider.flatMap((token) -> { if (!token.isEmpty()) { this.headers.put("Authorization", "Bearer " + token); } return Single.just(""); }); - } @Override @@ -60,21 +59,22 @@ public Completable start(String url) { this.url = url; pollUrl = url + "&_=" + System.currentTimeMillis(); logger.debug("Polling {}", pollUrl); - this.updateHeaderToken(); - HttpRequest request = new HttpRequest(); - request.addHeaders(headers); - return this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { - if (response.getStatusCode() != 200) { - logger.error("Unexpected response code {}.", response.getStatusCode()); - this.active = false; - return Completable.error(new Exception("Failed to connect.")); - } else { - this.active = true; - } - this.threadPool = Executors.newCachedThreadPool(); - poll(url).subscribeWith(receiveLoop); + return this.updateHeaderToken().flatMapCompletable((r) -> { + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + return this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { + if (response.getStatusCode() != 200) { + logger.error("Unexpected response code {}.", response.getStatusCode()); + this.active = false; + return Completable.error(new Exception("Failed to connect.")); + } else { + this.active = true; + } + this.threadPool = Executors.newCachedThreadPool(); + poll(url).subscribeWith(receiveLoop); - return Completable.complete(); + return Completable.complete(); + }); }); } @@ -82,28 +82,30 @@ private Completable poll(String url) { if (this.active) { pollUrl = url + "&_=" + System.currentTimeMillis(); logger.info("Polling {}", pollUrl); - this.updateHeaderToken(); - HttpRequest request = new HttpRequest(); - request.addHeaders(headers); - Completable pollingCompletable = this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { - if (response.getStatusCode() == 204) { - logger.info("LongPolling transport terminated by server."); - this.active = false; - } else if (response.getStatusCode() != 200) { - logger.error("Unexpected response code {}.", response.getStatusCode()); - this.active = false; - this.closeError = "Unexpected response code " + response.getStatusCode(); - } else { - if (response.getContent() != null) { - logger.debug("Message received."); - threadPool.execute(() -> this.onReceive(response.getContent())); + return this.updateHeaderToken().flatMapCompletable((x) -> { + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + Completable pollingCompletable = this.pollingClient.get(pollUrl, request).flatMapCompletable(response -> { + if (response.getStatusCode() == 204) { + logger.info("LongPolling transport terminated by server."); + this.active = false; + } else if (response.getStatusCode() != 200) { + logger.error("Unexpected response code {}.", response.getStatusCode()); + this.active = false; + this.closeError = "Unexpected response code " + response.getStatusCode(); } else { - logger.debug("Poll timed out, reissuing."); + if (response.getContent() != null) { + logger.debug("Message received."); + threadPool.execute(() -> this.onReceive(response.getContent())); + } else { + logger.debug("Poll timed out, reissuing."); + } } + return poll(url); + }); - } - return poll(url); }); - return pollingCompletable; + return pollingCompletable; + }); } else { logger.info("Long Polling transport polling complete."); receiveLoop.onComplete(); @@ -116,8 +118,9 @@ public Completable send(String message) { if (!this.active) { return Completable.error(new Exception("Cannot send unless the transport is active.")); } - this.updateHeaderToken(); - return Completable.fromSingle(this.client.post(url, message)); + return this.updateHeaderToken().flatMapCompletable((x) -> { + return Completable.fromSingle(this.client.post(url, message)); + }); } @Override @@ -139,13 +142,14 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) { @Override public Completable stop() { this.active = false; - this.updateHeaderToken(); - this.pollingClient.delete(this.url); - CompletableSubject stopCompletableSubject = CompletableSubject.create(); - return this.receiveLoop.andThen(Completable.defer(() -> { - logger.info("LongPolling transport stopped."); - this.onClose.invoke(this.closeError); - return Completable.complete(); - })).subscribeWith(stopCompletableSubject); + return this.updateHeaderToken().flatMapCompletable((x) -> { + this.pollingClient.delete(this.url); + CompletableSubject stopCompletableSubject = CompletableSubject.create(); + return this.receiveLoop.andThen(Completable.defer(() -> { + logger.info("LongPolling transport stopped."); + this.onClose.invoke(this.closeError); + return Completable.complete(); + })).subscribeWith(stopCompletableSubject); + }); } } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 712428e6f86d..f2e83311cc35 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.subjects.CompletableSubject; import org.junit.jupiter.api.Test; import io.reactivex.Single; @@ -202,4 +203,43 @@ public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { assertTrue(onReceiveCalled.get()); assertEquals("FIRSTSECOND", message.get()); } + + @Test + public void LongPollingTransportSendsHeaders() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicReference headerValue = new AtomicReference<>(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (requestCount.get() == 0) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "")); + } else if (requestCount.get() == 1) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "FIRST")); + } + + return Single.just(new HttpResponse(204, "", "")); + }) + .on("POST", (req) -> { + assertFalse(req.getHeaders().isEmpty()); + headerValue.set(req.getHeaders().get("KEY")); + return Single.just(new HttpResponse(200, "", "")); + }); + + Map headers = new HashMap<>(); + headers.put("KEY", "VALUE"); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + + AtomicBoolean onReceiveCalled = new AtomicBoolean(false); + transport.setOnReceive((msg -> { + onReceiveCalled.set(true); + }) ); + + transport.setOnClose((error) -> {}); + + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + transport.send("TEST"); + assertEquals("VALUE", client.getSentRequests().get(2).getHeaders().get("KEY")); + assertTrue(onReceiveCalled.get()); + } } From 55fdfab3cefc5b50cc8244c4df3db20c8db4a74e Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Thu, 7 Feb 2019 17:35:37 -0800 Subject: [PATCH 10/15] spotless --- .../src/main/java/com/microsoft/signalr/DefaultHttpClient.java | 2 +- .../java/com/microsoft/signalr/LongPollingTransportTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java index 713c5a9f343d..a264f5d09f13 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java @@ -84,7 +84,7 @@ public List loadForRequest(HttpUrl url) { } }); - if (timeoutInMilliseconds > 0){ + if (timeoutInMilliseconds > 0) { builder.readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS); } this.client = builder.build(); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index f2e83311cc35..abb06f1442e5 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -12,7 +12,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.subjects.CompletableSubject; import org.junit.jupiter.api.Test; import io.reactivex.Single; From 10c34802be223074c87c80b620c10f51c8426f61 Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Fri, 8 Feb 2019 22:19:04 -0800 Subject: [PATCH 11/15] I miss tasks --- .../com/microsoft/signalr/HttpClient.java | 9 +- .../signalr/LongPollingTransport.java | 24 +++-- .../microsoft/signalr/HubConnectionTest.java | 2 +- .../signalr/LongPollingTransportTest.java | 94 +++++++++++-------- .../com/microsoft/signalr/TestHttpClient.java | 8 +- 5 files changed, 83 insertions(+), 54 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java index 000c93519f44..534457367ff6 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java @@ -95,11 +95,10 @@ public Single post(String url) { return this.send(request); } - public Single post(String url, String body) { - HttpRequest request = new HttpRequest(); - request.setUrl(url); - request.setMethod("POST"); - return this.send(request, body); + public Single post(String url, String body, HttpRequest options) { + options.setUrl(url); + options.setMethod("POST"); + return this.send(options, body); } public Single post(String url, HttpRequest options) { diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 72d6e7958080..728562dc9aff 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -28,6 +28,7 @@ class LongPollingTransport implements Transport { private Single accessTokenProvider; private CompletableSubject receiveLoop = CompletableSubject.create(); private ExecutorService threadPool; + private Boolean stopCalled = false; private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); @@ -55,10 +56,10 @@ private Single updateHeaderToken() { @Override public Completable start(String url) { this.active = true; - logger.info("Starting LongPolling transport"); + logger.debug("Starting LongPolling transport."); this.url = url; pollUrl = url + "&_=" + System.currentTimeMillis(); - logger.debug("Polling {}", pollUrl); + logger.debug("Polling {}.", pollUrl); return this.updateHeaderToken().flatMapCompletable((r) -> { HttpRequest request = new HttpRequest(); request.addHeaders(headers); @@ -71,7 +72,7 @@ public Completable start(String url) { this.active = true; } this.threadPool = Executors.newCachedThreadPool(); - poll(url).subscribeWith(receiveLoop); + threadPool.execute(() -> poll(url).subscribeWith(receiveLoop)); return Completable.complete(); }); @@ -81,7 +82,7 @@ public Completable start(String url) { private Completable poll(String url) { if (this.active) { pollUrl = url + "&_=" + System.currentTimeMillis(); - logger.info("Polling {}", pollUrl); + logger.debug("Polling {}", pollUrl); return this.updateHeaderToken().flatMapCompletable((x) -> { HttpRequest request = new HttpRequest(); request.addHeaders(headers); @@ -107,9 +108,12 @@ private Completable poll(String url) { return pollingCompletable; }); } else { - logger.info("Long Polling transport polling complete."); + logger.debug("Long Polling transport polling complete."); receiveLoop.onComplete(); - return this.stop(); + if (!stopCalled) { + return this.stop(); + } + return Completable.complete(); } } @@ -119,7 +123,9 @@ public Completable send(String message) { return Completable.error(new Exception("Cannot send unless the transport is active.")); } return this.updateHeaderToken().flatMapCompletable((x) -> { - return Completable.fromSingle(this.client.post(url, message)); + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + return Completable.fromSingle(this.client.post(url, message, request)); }); } @@ -143,7 +149,9 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) { public Completable stop() { this.active = false; return this.updateHeaderToken().flatMapCompletable((x) -> { - this.pollingClient.delete(this.url); + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + this.pollingClient.delete(this.url, request); CompletableSubject stopCompletableSubject = CompletableSubject.create(); return this.receiveLoop.andThen(Completable.defer(() -> { logger.info("LongPolling transport stopped."); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index cd4c5bbe5ced..ea6e4742f72e 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -1224,7 +1224,7 @@ public void receivingServerSentEventsTransportFromNegotiateFails() { TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate", (req) -> Single.just(new HttpResponse(200, "", "{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"" - + "availableTransports\":[{\"transport\":\"ServerSentEvents\",\"transferFormats\":[\"Text\",\"Binary\"]}]}"))); + + "availableTransports\":[{\"transport\":\"ServerSentEvents\",\"transferFormats\":[\"Text\"]}]}"))); MockTransport transport = new MockTransport(true); HubConnection hubConnection = HubConnectionBuilder diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index abb06f1442e5..caf8977450f4 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.subjects.CompletableSubject; import org.junit.jupiter.api.Test; import io.reactivex.Single; @@ -44,27 +45,10 @@ public void LongPollingTransportCantSendBeforeStart() { assertFalse(transport.isActive()); } - @Test - public void StatusCode204StopsLongPolling() { - AtomicBoolean firstPoll = new AtomicBoolean(true); - TestHttpClient client = new TestHttpClient() - .on("GET", (req) -> { - if (firstPoll.get()) { - firstPoll.set(false); - return Single.just(new HttpResponse(200, "", "")); - } - return Single.just(new HttpResponse(204, "", "")); - }); - - Map headers = new HashMap<>(); - LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); - transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - assertFalse(transport.isActive()); - } - @Test public void StatusCode204StopsLongPollingTriggersOnClosed() { AtomicBoolean firstPoll = new AtomicBoolean(true); + CompletableSubject block = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() .on("GET", (req) -> { if (firstPoll.get()) { @@ -79,10 +63,12 @@ public void StatusCode204StopsLongPollingTriggersOnClosed() { AtomicBoolean onClosedRan = new AtomicBoolean(false); transport.setOnClose((error) -> { onClosedRan.set(true); + block.onComplete(); }); assertFalse(onClosedRan.get()); - transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + block.blockingAwait(); assertTrue(onClosedRan.get()); assertFalse(transport.isActive()); } @@ -90,6 +76,7 @@ public void StatusCode204StopsLongPollingTriggersOnClosed() { @Test public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { AtomicBoolean firstPoll = new AtomicBoolean(true); + CompletableSubject blocker = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() .on("GET", (req) -> { if (firstPoll.get()) { @@ -105,9 +92,12 @@ public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { transport.setOnClose((error) -> { onClosedRan.set(true); assertEquals("Unexpected response code 999", error); + blocker.onComplete(); + }); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + blocker.blockingAwait(); assertFalse(transport.isActive()); assertTrue(onClosedRan.get()); } @@ -135,6 +125,7 @@ public void CanSetAndTriggerOnReceive() { @Test public void LongPollingTransportOnReceiveGetsCalled() { AtomicInteger requestCount = new AtomicInteger(); + CompletableSubject block = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() .on("GET", (req) -> { if (requestCount.get() == 0) { @@ -156,12 +147,13 @@ public void LongPollingTransportOnReceiveGetsCalled() { transport.setOnReceive((msg -> { onReceiveCalled.set(true); message.set(msg); + block.onComplete(); }) ); transport.setOnClose((error) -> {}); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - + block.blockingAwait(1,TimeUnit.SECONDS); assertTrue(onReceiveCalled.get()); assertEquals("TEST", message.get()); } @@ -169,6 +161,7 @@ public void LongPollingTransportOnReceiveGetsCalled() { @Test public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { AtomicInteger requestCount = new AtomicInteger(); + CompletableSubject blocker = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() .on("GET", (req) -> { if (requestCount.get() == 0) { @@ -190,15 +183,19 @@ public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { AtomicBoolean onReceiveCalled = new AtomicBoolean(false); AtomicReference message = new AtomicReference<>(""); - transport.setOnReceive((msg -> { + AtomicInteger messageCount = new AtomicInteger(); + transport.setOnReceive((msg) -> { onReceiveCalled.set(true); message.set(message.get() + msg); - }) ); + if (messageCount.incrementAndGet() == 2) { + blocker.onComplete(); + } + }); transport.setOnClose((error) -> {}); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - + blocker.blockingAwait(1, TimeUnit.SECONDS); assertTrue(onReceiveCalled.get()); assertEquals("FIRSTSECOND", message.get()); } @@ -207,19 +204,16 @@ public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { public void LongPollingTransportSendsHeaders() { AtomicInteger requestCount = new AtomicInteger(); AtomicReference headerValue = new AtomicReference<>(); + CompletableSubject close = CompletableSubject.create(); TestHttpClient client = new TestHttpClient() .on("GET", (req) -> { if (requestCount.get() == 0) { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "")); - } else if (requestCount.get() == 1) { - requestCount.incrementAndGet(); - return Single.just(new HttpResponse(200, "", "FIRST")); } - + close.blockingAwait(); return Single.just(new HttpResponse(204, "", "")); - }) - .on("POST", (req) -> { + }).on("POST", (req) -> { assertFalse(req.getHeaders().isEmpty()); headerValue.set(req.getHeaders().get("KEY")); return Single.just(new HttpResponse(200, "", "")); @@ -228,17 +222,43 @@ public void LongPollingTransportSendsHeaders() { Map headers = new HashMap<>(); headers.put("KEY", "VALUE"); LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + transport.setOnClose((error) -> {}); - AtomicBoolean onReceiveCalled = new AtomicBoolean(false); - transport.setOnReceive((msg -> { - onReceiveCalled.set(true); - }) ); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + transport.send("TEST").blockingAwait(); + close.onComplete(); + assertEquals(headerValue.get(), "VALUE"); + } + + @Test + public void LongPollingTransportSetsAuthorizationHeader() { + AtomicInteger requestCount = new AtomicInteger(); + AtomicReference headerValue = new AtomicReference<>(); + CompletableSubject close = CompletableSubject.create(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (requestCount.get() == 0) { + requestCount.incrementAndGet(); + return Single.just(new HttpResponse(200, "", "")); + } + close.blockingAwait(); + return Single.just(new HttpResponse(204, "", "")); + }) + .on("POST", (req) -> { + assertFalse(req.getHeaders().isEmpty()); + headerValue.set(req.getHeaders().get("Authorization")); + return Single.just(new HttpResponse(200, "", "")); + }); + Map headers = new HashMap<>(); + Single tokenProvider = Single.just("TOKEN"); + LongPollingTransport transport = new LongPollingTransport(headers, client, tokenProvider); transport.setOnClose((error) -> {}); - transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - transport.send("TEST"); - assertEquals("VALUE", client.getSentRequests().get(2).getHeaders().get("KEY")); - assertTrue(onReceiveCalled.get()); + transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); + transport.send("TEST").blockingAwait(); + assertEquals(headerValue.get(), "Bearer TOKEN"); + assertEquals("Bearer TOKEN", client.getSentRequests().get(2).getHeaders().get("Authorization")); + close.onComplete(); } } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java index cf969638298b..0c94a14ea78b 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java @@ -7,7 +7,9 @@ import java.util.List; import java.util.Map; +import io.reactivex.Completable; import io.reactivex.Single; +import io.reactivex.subjects.SingleSubject; class TestHttpClient extends HttpClient { private TestHttpRequestHandler handler; @@ -22,13 +24,13 @@ public TestHttpClient() { @Override public Single send(HttpRequest request) { - this.sentRequests.add(request); - return this.handler.invoke(request); + return send(request, null); } @Override public Single send(HttpRequest request, String body) { - return null; + this.sentRequests.add(request); + return this.handler.invoke(request); } public List getSentRequests() { From d8910e1b68380d9692bd1e3e123718fbf0e69d8c Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Fri, 8 Feb 2019 23:03:40 -0800 Subject: [PATCH 12/15] spotless --- .../main/java/com/microsoft/signalr/LongPollingTransport.java | 2 +- .../java/com/microsoft/signalr/LongPollingTransportTest.java | 4 ++-- .../src/test/java/com/microsoft/signalr/TestHttpClient.java | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 728562dc9aff..43efc11f15c1 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -93,7 +93,7 @@ private Completable poll(String url) { } else if (response.getStatusCode() != 200) { logger.error("Unexpected response code {}.", response.getStatusCode()); this.active = false; - this.closeError = "Unexpected response code " + response.getStatusCode(); + this.closeError = "Unexpected response code " + response.getStatusCode() + "."; } else { if (response.getContent() != null) { logger.debug("Message received."); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index caf8977450f4..3cddcf3333a4 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -12,10 +12,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.subjects.CompletableSubject; import org.junit.jupiter.api.Test; import io.reactivex.Single; +import io.reactivex.subjects.CompletableSubject; public class LongPollingTransportTest { @@ -91,7 +91,7 @@ public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { AtomicBoolean onClosedRan = new AtomicBoolean(false); transport.setOnClose((error) -> { onClosedRan.set(true); - assertEquals("Unexpected response code 999", error); + assertEquals("Unexpected response code 999.", error); blocker.onComplete(); }); diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java index 0c94a14ea78b..eea04535046d 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/TestHttpClient.java @@ -7,9 +7,7 @@ import java.util.List; import java.util.Map; -import io.reactivex.Completable; import io.reactivex.Single; -import io.reactivex.subjects.SingleSubject; class TestHttpClient extends HttpClient { private TestHttpRequestHandler handler; From a0acbcc4b18ec41a30b4f35e4f74d7bb4f7f988a Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Mon, 11 Feb 2019 00:25:59 -0800 Subject: [PATCH 13/15] pr feedback + tests --- .../signalr/LongPollingTransport.java | 35 ++++---- .../signalr/LongPollingTransportTest.java | 90 ++++++++++++++++--- 2 files changed, 99 insertions(+), 26 deletions(-) diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java index 43efc11f15c1..087867bace42 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/LongPollingTransport.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +29,7 @@ class LongPollingTransport implements Transport { private Single accessTokenProvider; private CompletableSubject receiveLoop = CompletableSubject.create(); private ExecutorService threadPool; - private Boolean stopCalled = false; + private AtomicBoolean stopCalled = new AtomicBoolean(false); private final Logger logger = LoggerFactory.getLogger(LongPollingTransport.class); @@ -82,7 +83,7 @@ public Completable start(String url) { private Completable poll(String url) { if (this.active) { pollUrl = url + "&_=" + System.currentTimeMillis(); - logger.debug("Polling {}", pollUrl); + logger.debug("Polling {}.", pollUrl); return this.updateHeaderToken().flatMapCompletable((x) -> { HttpRequest request = new HttpRequest(); request.addHeaders(headers); @@ -110,7 +111,7 @@ private Completable poll(String url) { } else { logger.debug("Long Polling transport polling complete."); receiveLoop.onComplete(); - if (!stopCalled) { + if (!stopCalled.get()) { return this.stop(); } return Completable.complete(); @@ -147,17 +148,21 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) { @Override public Completable stop() { - this.active = false; - return this.updateHeaderToken().flatMapCompletable((x) -> { - HttpRequest request = new HttpRequest(); - request.addHeaders(headers); - this.pollingClient.delete(this.url, request); - CompletableSubject stopCompletableSubject = CompletableSubject.create(); - return this.receiveLoop.andThen(Completable.defer(() -> { - logger.info("LongPolling transport stopped."); - this.onClose.invoke(this.closeError); - return Completable.complete(); - })).subscribeWith(stopCompletableSubject); - }); + if (!stopCalled.get()) { + this.stopCalled.set(true); + this.active = false; + return this.updateHeaderToken().flatMapCompletable((x) -> { + HttpRequest request = new HttpRequest(); + request.addHeaders(headers); + this.pollingClient.delete(this.url, request); + CompletableSubject stopCompletableSubject = CompletableSubject.create(); + return this.receiveLoop.andThen(Completable.defer(() -> { + logger.info("LongPolling transport stopped."); + this.onClose.invoke(this.closeError); + return Completable.complete(); + })).subscribeWith(stopCompletableSubject); + }); + } + return Completable.complete(); } } diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 3cddcf3333a4..99781978e44d 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.*; +import java.sql.Time; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -67,8 +68,8 @@ public void StatusCode204StopsLongPollingTriggersOnClosed() { }); assertFalse(onClosedRan.get()); - transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); - block.blockingAwait(); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertTrue(block.blockingAwait(1, TimeUnit.SECONDS)); assertTrue(onClosedRan.get()); assertFalse(transport.isActive()); } @@ -93,11 +94,10 @@ public void LongPollingFailsWhenReceivingUnexpectedErrorCode() { onClosedRan.set(true); assertEquals("Unexpected response code 999.", error); blocker.onComplete(); - }); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - blocker.blockingAwait(); + assertTrue(blocker.blockingAwait(1, TimeUnit.SECONDS)); assertFalse(transport.isActive()); assertTrue(onClosedRan.get()); } @@ -153,7 +153,7 @@ public void LongPollingTransportOnReceiveGetsCalled() { transport.setOnClose((error) -> {}); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - block.blockingAwait(1,TimeUnit.SECONDS); + assertTrue(block.blockingAwait(1,TimeUnit.SECONDS)); assertTrue(onReceiveCalled.get()); assertEquals("TEST", message.get()); } @@ -195,7 +195,7 @@ public void LongPollingTransportOnReceiveGetsCalledMultipleTimes() { transport.setOnClose((error) -> {}); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - blocker.blockingAwait(1, TimeUnit.SECONDS); + assertTrue(blocker.blockingAwait(1, TimeUnit.SECONDS)); assertTrue(onReceiveCalled.get()); assertEquals("FIRSTSECOND", message.get()); } @@ -211,7 +211,7 @@ public void LongPollingTransportSendsHeaders() { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "")); } - close.blockingAwait(); + assertTrue(close.blockingAwait(1, TimeUnit.SECONDS)); return Single.just(new HttpResponse(204, "", "")); }).on("POST", (req) -> { assertFalse(req.getHeaders().isEmpty()); @@ -225,7 +225,7 @@ public void LongPollingTransportSendsHeaders() { transport.setOnClose((error) -> {}); transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); - transport.send("TEST").blockingAwait(); + assertTrue(transport.send("TEST").blockingAwait(1, TimeUnit.SECONDS)); close.onComplete(); assertEquals(headerValue.get(), "VALUE"); } @@ -241,7 +241,7 @@ public void LongPollingTransportSetsAuthorizationHeader() { requestCount.incrementAndGet(); return Single.just(new HttpResponse(200, "", "")); } - close.blockingAwait(); + assertTrue(close.blockingAwait(1, TimeUnit.SECONDS)); return Single.just(new HttpResponse(204, "", "")); }) .on("POST", (req) -> { @@ -255,10 +255,78 @@ public void LongPollingTransportSetsAuthorizationHeader() { LongPollingTransport transport = new LongPollingTransport(headers, client, tokenProvider); transport.setOnClose((error) -> {}); - transport.start("http://example.com").timeout(100, TimeUnit.SECONDS).blockingAwait(); - transport.send("TEST").blockingAwait(); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertTrue(transport.send("TEST").blockingAwait(1, TimeUnit.SECONDS)); assertEquals(headerValue.get(), "Bearer TOKEN"); assertEquals("Bearer TOKEN", client.getSentRequests().get(2).getHeaders().get("Authorization")); close.onComplete(); } + + @Test + public void After204StopDoesNotTriggerOnClose() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + CompletableSubject block = CompletableSubject.create(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", "")); + } + return Single.just(new HttpResponse(204, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + AtomicBoolean onClosedRan = new AtomicBoolean(false); + AtomicInteger onCloseCount = new AtomicInteger(0); + transport.setOnClose((error) -> { + onClosedRan.set(true); + onCloseCount.incrementAndGet(); + block.onComplete(); + }); + + assertFalse(onClosedRan.get()); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertTrue(block.blockingAwait(1, TimeUnit.SECONDS)); + assertEquals(1, onCloseCount.get()); + assertTrue(onClosedRan.get()); + assertFalse(transport.isActive()); + + assertTrue(transport.stop().blockingAwait(1, TimeUnit.SECONDS)); + assertEquals(1, onCloseCount.get()); + } + + @Test + public void StoppingTransportRunsCloseHandlersOnce() { + AtomicBoolean firstPoll = new AtomicBoolean(true); + CompletableSubject block = CompletableSubject.create(); + TestHttpClient client = new TestHttpClient() + .on("GET", (req) -> { + if (firstPoll.get()) { + firstPoll.set(false); + return Single.just(new HttpResponse(200, "", "")); + } else { + assertTrue(block.blockingAwait(1, TimeUnit.SECONDS)); + return Single.just(new HttpResponse(204, "", "")); + } + }) + .on("DELETE", (req) ->{ + //Unblock the last poll when we sent the DELETE request. + block.onComplete(); + return Single.just(new HttpResponse(200, "", "")); + }); + + Map headers = new HashMap<>(); + LongPollingTransport transport = new LongPollingTransport(headers, client, Single.just("")); + AtomicInteger onCloseCount = new AtomicInteger(0); + transport.setOnClose((error) -> { + onCloseCount.incrementAndGet(); + }); + + assertEquals(0, onCloseCount.get()); + transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertTrue(transport.stop().blockingAwait(1, TimeUnit.SECONDS)); + assertEquals(1, onCloseCount.get()); + assertFalse(transport.isActive()); + } } From e2d66928eff6a6c1169d6671abdcaa5e74e6421c Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Mon, 11 Feb 2019 00:27:55 -0800 Subject: [PATCH 14/15] Spotless --- .../java/com/microsoft/signalr/LongPollingTransportTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index 99781978e44d..c8dd9aa5e91b 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -5,7 +5,6 @@ import static org.junit.jupiter.api.Assertions.*; -import java.sql.Time; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; From 575178445e8d5de5d4409398dbcfffe959be4c3a Mon Sep 17 00:00:00 2001 From: MikaelMengistu Date: Mon, 11 Feb 2019 21:17:47 -0800 Subject: [PATCH 15/15] Remove redundant header check --- .../java/com/microsoft/signalr/LongPollingTransportTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java index c8dd9aa5e91b..f2a0076dd6b3 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/LongPollingTransportTest.java @@ -257,7 +257,6 @@ public void LongPollingTransportSetsAuthorizationHeader() { transport.start("http://example.com").timeout(1, TimeUnit.SECONDS).blockingAwait(); assertTrue(transport.send("TEST").blockingAwait(1, TimeUnit.SECONDS)); assertEquals(headerValue.get(), "Bearer TOKEN"); - assertEquals("Bearer TOKEN", client.getSentRequests().get(2).getHeaders().get("Authorization")); close.onComplete(); }