Skip to content

SignalR Java Client LongPolling Transport #6856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -8,87 +8,105 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Cookie;
import okhttp3.CookieJar;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.*;

final class DefaultHttpClient extends HttpClient {
private final OkHttpClient client;
private OkHttpClient client = null;

public DefaultHttpClient() {
this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock();
this(5000, null);
}

@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
cookieLock.lock();
try {
for (Cookie cookie : cookies) {
boolean replacedCookie = false;
for (int i = 0; i < cookieList.size(); i++) {
Cookie innerCookie = cookieList.get(i);
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
// We have a new cookie that matches an older one so we replace the older one.
cookieList.set(i, innerCookie);
replacedCookie = true;
break;
public DefaultHttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
OkHttpClient newClient = client.newBuilder().readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS)
.build();
return new DefaultHttpClient(timeoutInMilliseconds, newClient);
}

public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) {
if (client != null) {
this.client = client;
} else {
this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() {
private List<Cookie> cookieList = new ArrayList<>();
private Lock cookieLock = new ReentrantLock();

@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
cookieLock.lock();
try {
for (Cookie cookie : cookies) {
boolean replacedCookie = false;
for (int i = 0; i < cookieList.size(); i++) {
Cookie innerCookie = cookieList.get(i);
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
// We have a new cookie that matches an older one so we replace the older one.
cookieList.set(i, innerCookie);
replacedCookie = true;
break;
}
}
if (!replacedCookie) {
cookieList.add(cookie);
}
}
if (!replacedCookie) {
cookieList.add(cookie);
}
} finally {
cookieLock.unlock();
}
} finally {
cookieLock.unlock();
}
}

@Override
public List<Cookie> loadForRequest(HttpUrl url) {
cookieLock.lock();
try {
List<Cookie> matchedCookies = new ArrayList<>();
List<Cookie> expiredCookies = new ArrayList<>();
for (Cookie cookie : cookieList) {
if (cookie.expiresAt() < System.currentTimeMillis()) {
expiredCookies.add(cookie);
} else if (cookie.matches(url)) {
matchedCookies.add(cookie);
@Override
public List<Cookie> loadForRequest(HttpUrl url) {
cookieLock.lock();
try {
List<Cookie> matchedCookies = new ArrayList<>();
List<Cookie> expiredCookies = new ArrayList<>();
for (Cookie cookie : cookieList) {
if (cookie.expiresAt() < System.currentTimeMillis()) {
expiredCookies.add(cookie);
} else if (cookie.matches(url)) {
matchedCookies.add(cookie);
}
}
}

cookieList.removeAll(expiredCookies);
return matchedCookies;
} finally {
cookieLock.unlock();
cookieList.removeAll(expiredCookies);
return matchedCookies;
} finally {
cookieLock.unlock();
}
}
}
}).build();
}).readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS)
.build();
}
}

@Override
public Single<HttpResponse> send(HttpRequest httpRequest) {
return send(httpRequest, null);
}

@Override
public Single<HttpResponse> send(HttpRequest httpRequest, String bodyContent) {
Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl());

switch (httpRequest.getMethod()) {
case "GET":
requestBuilder.get();
break;
case "POST":
RequestBody body = RequestBody.create(null, new byte[]{});
RequestBody body;
if (bodyContent != null) {
body = RequestBody.create(MediaType.parse("text/plain"), bodyContent);
} else {
body = RequestBody.create(null, new byte[]{});
}

requestBuilder.post(body);
break;
case "DELETE":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public Single<HttpResponse> post(String url) {
return this.send(request);
}

public Single<HttpResponse> post(String url, String body) {
HttpRequest request = new HttpRequest();
request.setUrl(url);
request.setMethod("POST");
return this.send(request, body);
}

public Single<HttpResponse> post(String url, HttpRequest options) {
options.setUrl(url);
options.setMethod("POST");
Expand All @@ -116,5 +123,9 @@ public Single<HttpResponse> delete(String url, HttpRequest options) {

public abstract Single<HttpResponse> send(HttpRequest request);

public abstract Single<HttpResponse> send(HttpRequest request, String body);

public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers);
}

public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ public class HttpHubConnectionBuilder {
private Single<String> accessTokenProvider;
private long handshakeResponseTimeout = 0;
private Map<String, String> headers;
private TransportEnum transportEnum;

HttpHubConnectionBuilder(String url) {
this.url = url;
}

//For testing purposes. The Transport interface isn't public.
HttpHubConnectionBuilder withTransportImplementation(Transport transport) {
this.transport = transport;
return this;
}

/**
* Sets the transport to be used by the {@link HubConnection}.
* Sets the transport type to indicate which transport to be used by the {@link HubConnection}.
*
* @param transport The transport to be used.
* @param transportEnum The type of transport to be used.
* @return This instance of the HttpHubConnectionBuilder.
*/
HttpHubConnectionBuilder withTransport(Transport transport) {
this.transport = transport;
public HttpHubConnectionBuilder withTransport(TransportEnum transportEnum) {
this.transportEnum = transportEnum;
return this;
}

Expand Down Expand Up @@ -112,6 +119,6 @@ public HttpHubConnectionBuilder withHeader(String name, String value) {
* @return A new instance of {@link HubConnection}.
*/
public HubConnection build() {
return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers);
return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers, transportEnum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,7 @@

package com.microsoft.signalr;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -46,7 +39,7 @@ public class HubConnection {
private Single<String> accessTokenProvider;
private final Map<String, String> headers = new HashMap<>();
private ConnectionState connectionState = null;
private final HttpClient httpClient;
private HttpClient httpClient;
private String stopError;
private Timer pingTimer = null;
private final AtomicLong nextServerTimeout = new AtomicLong();
Expand All @@ -56,6 +49,7 @@ public class HubConnection {
private long tickRate = 1000;
private CompletableSubject handshakeResponseSubject;
private long handshakeResponseTimeout = 15*1000;
private TransportEnum transportEnum = TransportEnum.ALL;
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);

/**
Expand Down Expand Up @@ -100,7 +94,7 @@ void setTickRate(long tickRateInMilliseconds) {
}

HubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient httpClient,
Single<String> accessTokenProvider, long handshakeResponseTimeout, Map<String, String> headers) {
Single<String> accessTokenProvider, long handshakeResponseTimeout, Map<String, String> headers, TransportEnum transportEnum) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
}
Expand All @@ -122,6 +116,8 @@ void setTickRate(long tickRateInMilliseconds) {

if (transport != null) {
this.transport = transport;
} else if (transportEnum != null) {
this.transportEnum = transportEnum;
}

if (handshakeResponseTimeout > 0) {
Expand Down Expand Up @@ -301,7 +297,13 @@ public Completable start() {
negotiate.flatMapCompletable(url -> {
logger.debug("Starting HubConnection.");
if (transport == null) {
transport = new WebSocketTransport(headers, httpClient);
switch (transportEnum) {
case LONG_POLLING:
transport = new LongPollingTransport(headers, httpClient, accessTokenProvider);
break;
default:
transport = new WebSocketTransport(headers, httpClient);
}
}

transport.setOnReceive(this.callback);
Expand All @@ -311,37 +313,40 @@ public Completable start() {
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));

connectionState = new ConnectionState(this);

return transport.send(handshake).andThen(Completable.defer(() -> {
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
return handshakeResponseSubject.andThen(Completable.defer(() -> {
hubConnectionStateLock.lock();
try {
connectionState = new ConnectionState(this);
hubConnectionState = HubConnectionState.CONNECTED;
logger.info("HubConnection started.");

resetServerTimeout();
this.pingTimer = new Timer();
this.pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() > nextServerTimeout.get()) {
stop("Server timeout elapsed without receiving a message from the server.");
return;
//Don't send pings if we're using long polling.
if (transportEnum != TransportEnum.LONG_POLLING) {
this.pingTimer = new Timer();
this.pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() > nextServerTimeout.get()) {
stop("Server timeout elapsed without receiving a message from the server.");
return;
}

if (System.currentTimeMillis() > nextPingActivation.get()) {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.warn("Error sending ping: {}.", e.getMessage());
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
}

if (System.currentTimeMillis() > nextPingActivation.get()) {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.warn("Error sending ping: {}.", e.getMessage());
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
}
}
}, new Date(0), tickRate);
}, new Date(0), tickRate);
}
} finally {
hubConnectionStateLock.unlock();
}
Expand All @@ -367,7 +372,10 @@ private Single<String> startNegotiate(String url, int negotiateAttempts) {
}

if (response.getRedirectUrl() == null) {
if (!response.getAvailableTransports().contains("WebSockets")) {
Set<String> transports = response.getAvailableTransports();
if ((this.transportEnum == TransportEnum.ALL && !(transports.contains("WebSockets") || transports.contains("LongPolling"))) ||
(this.transportEnum == TransportEnum.WEBSOCKETS && !transports.contains("WebSockets")) ||
(this.transportEnum == TransportEnum.LONG_POLLING && !transports.contains("LongPolling"))) {
throw new RuntimeException("There were no compatible transports on the server.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public TransferFormat getTransferFormat() {

@Override
public HubMessage[] parseMessages(String payload, InvocationBinder binder) {
if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) {
if (payload.length() == 0 ) {
return new HubMessage[]{};
}
if (!(payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR))) {
throw new RuntimeException("Message is incomplete.");
}

Expand Down
Loading