Skip to content

Commit 3d3ad96

Browse files
authored
SignalR Java Client LongPolling Transport (#6856)
1 parent 2ac4619 commit 3d3ad96

12 files changed

+733
-110
lines changed

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/DefaultHttpClient.java

Lines changed: 74 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,87 +8,110 @@
88
import java.util.Collection;
99
import java.util.List;
1010
import java.util.Map;
11+
import java.util.concurrent.TimeUnit;
1112
import java.util.concurrent.locks.Lock;
1213
import java.util.concurrent.locks.ReentrantLock;
1314

1415
import io.reactivex.Single;
1516
import io.reactivex.subjects.SingleSubject;
16-
import okhttp3.Call;
17-
import okhttp3.Callback;
18-
import okhttp3.Cookie;
19-
import okhttp3.CookieJar;
20-
import okhttp3.HttpUrl;
21-
import okhttp3.OkHttpClient;
22-
import okhttp3.Request;
23-
import okhttp3.RequestBody;
24-
import okhttp3.Response;
25-
import okhttp3.ResponseBody;
17+
import okhttp3.*;
2618

2719
final class DefaultHttpClient extends HttpClient {
28-
private final OkHttpClient client;
20+
private OkHttpClient client = null;
2921

3022
public DefaultHttpClient() {
31-
this.client = new OkHttpClient.Builder().cookieJar(new CookieJar() {
32-
private List<Cookie> cookieList = new ArrayList<>();
33-
private Lock cookieLock = new ReentrantLock();
23+
this(0, null);
24+
}
3425

35-
@Override
36-
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
37-
cookieLock.lock();
38-
try {
39-
for (Cookie cookie : cookies) {
40-
boolean replacedCookie = false;
41-
for (int i = 0; i < cookieList.size(); i++) {
42-
Cookie innerCookie = cookieList.get(i);
43-
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
44-
// We have a new cookie that matches an older one so we replace the older one.
45-
cookieList.set(i, innerCookie);
46-
replacedCookie = true;
47-
break;
26+
public DefaultHttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
27+
OkHttpClient newClient = client.newBuilder().readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS)
28+
.build();
29+
return new DefaultHttpClient(timeoutInMilliseconds, newClient);
30+
}
31+
32+
public DefaultHttpClient(int timeoutInMilliseconds, OkHttpClient client) {
33+
if (client != null) {
34+
this.client = client;
35+
} else {
36+
37+
OkHttpClient.Builder builder = new OkHttpClient.Builder().cookieJar(new CookieJar() {
38+
private List<Cookie> cookieList = new ArrayList<>();
39+
private Lock cookieLock = new ReentrantLock();
40+
41+
@Override
42+
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
43+
cookieLock.lock();
44+
try {
45+
for (Cookie cookie : cookies) {
46+
boolean replacedCookie = false;
47+
for (int i = 0; i < cookieList.size(); i++) {
48+
Cookie innerCookie = cookieList.get(i);
49+
if (cookie.name().equals(innerCookie.name()) && innerCookie.matches(url)) {
50+
// We have a new cookie that matches an older one so we replace the older one.
51+
cookieList.set(i, innerCookie);
52+
replacedCookie = true;
53+
break;
54+
}
55+
}
56+
if (!replacedCookie) {
57+
cookieList.add(cookie);
4858
}
4959
}
50-
if (!replacedCookie) {
51-
cookieList.add(cookie);
52-
}
60+
} finally {
61+
cookieLock.unlock();
5362
}
54-
} finally {
55-
cookieLock.unlock();
5663
}
57-
}
5864

59-
@Override
60-
public List<Cookie> loadForRequest(HttpUrl url) {
61-
cookieLock.lock();
62-
try {
63-
List<Cookie> matchedCookies = new ArrayList<>();
64-
List<Cookie> expiredCookies = new ArrayList<>();
65-
for (Cookie cookie : cookieList) {
66-
if (cookie.expiresAt() < System.currentTimeMillis()) {
67-
expiredCookies.add(cookie);
68-
} else if (cookie.matches(url)) {
69-
matchedCookies.add(cookie);
65+
@Override
66+
public List<Cookie> loadForRequest(HttpUrl url) {
67+
cookieLock.lock();
68+
try {
69+
List<Cookie> matchedCookies = new ArrayList<>();
70+
List<Cookie> expiredCookies = new ArrayList<>();
71+
for (Cookie cookie : cookieList) {
72+
if (cookie.expiresAt() < System.currentTimeMillis()) {
73+
expiredCookies.add(cookie);
74+
} else if (cookie.matches(url)) {
75+
matchedCookies.add(cookie);
76+
}
7077
}
71-
}
7278

73-
cookieList.removeAll(expiredCookies);
74-
return matchedCookies;
75-
} finally {
76-
cookieLock.unlock();
79+
cookieList.removeAll(expiredCookies);
80+
return matchedCookies;
81+
} finally {
82+
cookieLock.unlock();
83+
}
7784
}
85+
});
86+
87+
if (timeoutInMilliseconds > 0) {
88+
builder.readTimeout(timeoutInMilliseconds, TimeUnit.MILLISECONDS);
7889
}
79-
}).build();
90+
this.client = builder.build();
91+
}
8092
}
8193

8294
@Override
8395
public Single<HttpResponse> send(HttpRequest httpRequest) {
96+
return send(httpRequest, null);
97+
}
98+
99+
@Override
100+
public Single<HttpResponse> send(HttpRequest httpRequest, String bodyContent) {
84101
Request.Builder requestBuilder = new Request.Builder().url(httpRequest.getUrl());
85102

86103
switch (httpRequest.getMethod()) {
87104
case "GET":
88105
requestBuilder.get();
89106
break;
90107
case "POST":
91-
RequestBody body = RequestBody.create(null, new byte[]{});
108+
RequestBody body;
109+
if (bodyContent != null) {
110+
body = RequestBody.create(MediaType.parse("text/plain"), bodyContent);
111+
} else {
112+
body = RequestBody.create(null, new byte[]{});
113+
}
114+
92115
requestBuilder.post(body);
93116
break;
94117
case "DELETE":

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ public Single<HttpResponse> post(String url) {
9595
return this.send(request);
9696
}
9797

98+
public Single<HttpResponse> post(String url, String body, HttpRequest options) {
99+
options.setUrl(url);
100+
options.setMethod("POST");
101+
return this.send(options, body);
102+
}
103+
98104
public Single<HttpResponse> post(String url, HttpRequest options) {
99105
options.setUrl(url);
100106
options.setMethod("POST");
@@ -116,5 +122,9 @@ public Single<HttpResponse> delete(String url, HttpRequest options) {
116122

117123
public abstract Single<HttpResponse> send(HttpRequest request);
118124

125+
public abstract Single<HttpResponse> send(HttpRequest request, String body);
126+
119127
public abstract WebSocketWrapper createWebSocket(String url, Map<String, String> headers);
120-
}
128+
129+
public abstract HttpClient cloneWithTimeOut(int timeoutInMilliseconds);
130+
}

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HttpHubConnectionBuilder.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,26 @@ public class HttpHubConnectionBuilder {
1919
private Single<String> accessTokenProvider;
2020
private long handshakeResponseTimeout = 0;
2121
private Map<String, String> headers;
22+
private TransportEnum transportEnum;
2223

2324
HttpHubConnectionBuilder(String url) {
2425
this.url = url;
2526
}
2627

28+
//For testing purposes. The Transport interface isn't public.
29+
HttpHubConnectionBuilder withTransportImplementation(Transport transport) {
30+
this.transport = transport;
31+
return this;
32+
}
33+
2734
/**
28-
* Sets the transport to be used by the {@link HubConnection}.
35+
* Sets the transport type to indicate which transport to be used by the {@link HubConnection}.
2936
*
30-
* @param transport The transport to be used.
37+
* @param transportEnum The type of transport to be used.
3138
* @return This instance of the HttpHubConnectionBuilder.
3239
*/
33-
HttpHubConnectionBuilder withTransport(Transport transport) {
34-
this.transport = transport;
40+
public HttpHubConnectionBuilder withTransport(TransportEnum transportEnum) {
41+
this.transportEnum = transportEnum;
3542
return this;
3643
}
3744

@@ -112,6 +119,6 @@ public HttpHubConnectionBuilder withHeader(String name, String value) {
112119
* @return A new instance of {@link HubConnection}.
113120
*/
114121
public HubConnection build() {
115-
return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers);
122+
return new HubConnection(url, transport, skipNegotiate, httpClient, accessTokenProvider, handshakeResponseTimeout, headers, transportEnum);
116123
}
117124
}

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,7 @@
33

44
package com.microsoft.signalr;
55

6-
import java.util.ArrayList;
7-
import java.util.Collection;
8-
import java.util.Date;
9-
import java.util.HashMap;
10-
import java.util.List;
11-
import java.util.Map;
12-
import java.util.Timer;
13-
import java.util.TimerTask;
6+
import java.util.*;
147
import java.util.concurrent.*;
158
import java.util.concurrent.atomic.AtomicInteger;
169
import java.util.concurrent.atomic.AtomicLong;
@@ -46,7 +39,7 @@ public class HubConnection {
4639
private Single<String> accessTokenProvider;
4740
private final Map<String, String> headers = new HashMap<>();
4841
private ConnectionState connectionState = null;
49-
private final HttpClient httpClient;
42+
private HttpClient httpClient;
5043
private String stopError;
5144
private Timer pingTimer = null;
5245
private final AtomicLong nextServerTimeout = new AtomicLong();
@@ -56,6 +49,7 @@ public class HubConnection {
5649
private long tickRate = 1000;
5750
private CompletableSubject handshakeResponseSubject;
5851
private long handshakeResponseTimeout = 15*1000;
52+
private TransportEnum transportEnum = TransportEnum.ALL;
5953
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
6054

6155
/**
@@ -100,7 +94,7 @@ void setTickRate(long tickRateInMilliseconds) {
10094
}
10195

10296
HubConnection(String url, Transport transport, boolean skipNegotiate, HttpClient httpClient,
103-
Single<String> accessTokenProvider, long handshakeResponseTimeout, Map<String, String> headers) {
97+
Single<String> accessTokenProvider, long handshakeResponseTimeout, Map<String, String> headers, TransportEnum transportEnum) {
10498
if (url == null || url.isEmpty()) {
10599
throw new IllegalArgumentException("A valid url is required.");
106100
}
@@ -122,6 +116,8 @@ void setTickRate(long tickRateInMilliseconds) {
122116

123117
if (transport != null) {
124118
this.transport = transport;
119+
} else if (transportEnum != null) {
120+
this.transportEnum = transportEnum;
125121
}
126122

127123
if (handshakeResponseTimeout > 0) {
@@ -301,7 +297,13 @@ public Completable start() {
301297
negotiate.flatMapCompletable(url -> {
302298
logger.debug("Starting HubConnection.");
303299
if (transport == null) {
304-
transport = new WebSocketTransport(headers, httpClient);
300+
switch (transportEnum) {
301+
case LONG_POLLING:
302+
transport = new LongPollingTransport(headers, httpClient, accessTokenProvider);
303+
break;
304+
default:
305+
transport = new WebSocketTransport(headers, httpClient);
306+
}
305307
}
306308

307309
transport.setOnReceive(this.callback);
@@ -311,37 +313,20 @@ public Completable start() {
311313
String handshake = HandshakeProtocol.createHandshakeRequestMessage(
312314
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
313315

316+
connectionState = new ConnectionState(this);
317+
314318
return transport.send(handshake).andThen(Completable.defer(() -> {
315319
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
316320
return handshakeResponseSubject.andThen(Completable.defer(() -> {
317321
hubConnectionStateLock.lock();
318322
try {
319-
connectionState = new ConnectionState(this);
320323
hubConnectionState = HubConnectionState.CONNECTED;
321324
logger.info("HubConnection started.");
322-
323325
resetServerTimeout();
324-
this.pingTimer = new Timer();
325-
this.pingTimer.schedule(new TimerTask() {
326-
@Override
327-
public void run() {
328-
try {
329-
if (System.currentTimeMillis() > nextServerTimeout.get()) {
330-
stop("Server timeout elapsed without receiving a message from the server.");
331-
return;
332-
}
333-
334-
if (System.currentTimeMillis() > nextPingActivation.get()) {
335-
sendHubMessage(PingMessage.getInstance());
336-
}
337-
} catch (Exception e) {
338-
logger.warn("Error sending ping: {}.", e.getMessage());
339-
// The connection is probably in a bad or closed state now, cleanup the timer so
340-
// it stops triggering
341-
pingTimer.cancel();
342-
}
343-
}
344-
}, new Date(0), tickRate);
326+
//Don't send pings if we're using long polling.
327+
if (transportEnum != TransportEnum.LONG_POLLING) {
328+
activatePingTimer();
329+
}
345330
} finally {
346331
hubConnectionStateLock.unlock();
347332
}
@@ -356,6 +341,30 @@ public void run() {
356341
return start;
357342
}
358343

344+
private void activatePingTimer() {
345+
this.pingTimer = new Timer();
346+
this.pingTimer.schedule(new TimerTask() {
347+
@Override
348+
public void run() {
349+
try {
350+
if (System.currentTimeMillis() > nextServerTimeout.get()) {
351+
stop("Server timeout elapsed without receiving a message from the server.");
352+
return;
353+
}
354+
355+
if (System.currentTimeMillis() > nextPingActivation.get()) {
356+
sendHubMessage(PingMessage.getInstance());
357+
}
358+
} catch (Exception e) {
359+
logger.warn("Error sending ping: {}.", e.getMessage());
360+
// The connection is probably in a bad or closed state now, cleanup the timer so
361+
// it stops triggering
362+
pingTimer.cancel();
363+
}
364+
}
365+
}, new Date(0), tickRate);
366+
}
367+
359368
private Single<String> startNegotiate(String url, int negotiateAttempts) {
360369
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
361370
return Single.just(null);
@@ -367,7 +376,10 @@ private Single<String> startNegotiate(String url, int negotiateAttempts) {
367376
}
368377

369378
if (response.getRedirectUrl() == null) {
370-
if (!response.getAvailableTransports().contains("WebSockets")) {
379+
Set<String> transports = response.getAvailableTransports();
380+
if ((this.transportEnum == TransportEnum.ALL && !(transports.contains("WebSockets") || transports.contains("LongPolling"))) ||
381+
(this.transportEnum == TransportEnum.WEBSOCKETS && !transports.contains("WebSockets")) ||
382+
(this.transportEnum == TransportEnum.LONG_POLLING && !transports.contains("LongPolling"))) {
371383
throw new RuntimeException("There were no compatible transports on the server.");
372384
}
373385

@@ -563,7 +575,7 @@ private void sendHubMessage(HubMessage message) {
563575
} else {
564576
logger.debug("Sending {} message.", message.getMessageType().name());
565577
}
566-
transport.send(serializedMessage);
578+
transport.send(serializedMessage).subscribeWith(CompletableSubject.create());
567579

568580
resetKeepAlive();
569581
}

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/JsonHubProtocol.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public TransferFormat getTransferFormat() {
3737

3838
@Override
3939
public HubMessage[] parseMessages(String payload, InvocationBinder binder) {
40-
if (payload != null && !payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR)) {
40+
if (payload.length() == 0) {
41+
return new HubMessage[]{};
42+
}
43+
if (!(payload.substring(payload.length() - 1).equals(RECORD_SEPARATOR))) {
4144
throw new RuntimeException("Message is incomplete.");
4245
}
4346

0 commit comments

Comments
 (0)