Skip to content

Commit 16879a2

Browse files
committed
Harden synchronization around SockJS heartbeats
Create an explicit heartbeat task with an experiration flag so that it can be cancelled reliably vs relying on the ScheduledFutureTask cancel method which may return true even if the task is already running. Issue: SPR-14356
1 parent 2aab08f commit 16879a2

File tree

2 files changed

+55
-58
lines changed

2 files changed

+55
-58
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java

+54-57
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@
2727
import java.util.Set;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ScheduledFuture;
30-
import java.util.concurrent.locks.Lock;
31-
import java.util.concurrent.locks.ReentrantLock;
3230

3331
import org.apache.commons.logging.Log;
3432
import org.apache.commons.logging.LogFactory;
33+
3534
import org.springframework.core.NestedCheckedException;
3635
import org.springframework.util.Assert;
3736
import org.springframework.web.socket.CloseStatus;
@@ -106,9 +105,11 @@ private enum State {NEW, OPEN, CLOSED}
106105

107106
private volatile long timeLastActive = this.timeCreated;
108107

109-
private volatile ScheduledFuture<?> heartbeatTask;
108+
private ScheduledFuture<?> heartbeatFuture;
109+
110+
private HeartbeatTask heartbeatTask;
110111

111-
private final Lock heartbeatLock = new ReentrantLock();
112+
private final Object heartbeatLock = new Object();
112113

113114
private volatile boolean heartbeatDisabled;
114115

@@ -248,16 +249,11 @@ public void disableHeartbeat() {
248249
cancelHeartbeat();
249250
}
250251

251-
public void sendHeartbeat() throws SockJsTransportFailureException {
252-
if (isActive()) {
253-
if (heartbeatLock.tryLock()) {
254-
try {
255-
writeFrame(SockJsFrame.heartbeatFrame());
256-
scheduleHeartbeat();
257-
}
258-
finally {
259-
heartbeatLock.unlock();
260-
}
252+
protected void sendHeartbeat() throws SockJsTransportFailureException {
253+
synchronized (this.heartbeatLock) {
254+
if (isActive() && !this.heartbeatDisabled) {
255+
writeFrame(SockJsFrame.heartbeatFrame());
256+
scheduleHeartbeat();
261257
}
262258
}
263259
}
@@ -266,56 +262,33 @@ protected void scheduleHeartbeat() {
266262
if (this.heartbeatDisabled) {
267263
return;
268264
}
269-
270-
Assert.state(this.config.getTaskScheduler() != null, "Expected SockJS TaskScheduler");
271-
cancelHeartbeat();
272-
if (!isActive()) {
273-
return;
274-
}
275-
276-
Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
277-
this.heartbeatTask = this.config.getTaskScheduler().schedule(new Runnable() {
278-
public void run() {
279-
try {
280-
sendHeartbeat();
281-
}
282-
catch (Throwable ex) {
283-
// ignore
284-
}
285-
}
286-
}, time);
287-
if (logger.isTraceEnabled()) {
288-
logger.trace("Scheduled heartbeat in session " + getId());
289-
}
290-
}
291-
292-
protected void cancelHeartbeat() {
293-
try {
294-
ScheduledFuture<?> task = this.heartbeatTask;
295-
this.heartbeatTask = null;
296-
if (task == null || task.isCancelled()) {
265+
synchronized (this.heartbeatLock) {
266+
cancelHeartbeat();
267+
if (!isActive()) {
297268
return;
298269
}
299-
270+
Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
271+
this.heartbeatTask = new HeartbeatTask();
272+
this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time);
300273
if (logger.isTraceEnabled()) {
301-
logger.trace("Cancelling heartbeat in session " + getId());
302-
}
303-
if (task.cancel(false)) {
304-
return;
274+
logger.trace("Scheduled heartbeat in session " + getId());
305275
}
276+
}
277+
}
306278

307-
if (logger.isTraceEnabled()) {
308-
logger.trace("Failed to cancel heartbeat, acquiring heartbeat write lock.");
279+
protected void cancelHeartbeat() {
280+
synchronized (this.heartbeatLock) {
281+
if (this.heartbeatFuture != null) {
282+
if (logger.isTraceEnabled()) {
283+
logger.trace("Cancelling heartbeat in session " + getId());
284+
}
285+
this.heartbeatFuture.cancel(false);
286+
this.heartbeatFuture = null;
309287
}
310-
this.heartbeatLock.lock();
311-
312-
if (logger.isTraceEnabled()) {
313-
logger.trace("Releasing heartbeat lock.");
288+
if (this.heartbeatTask != null) {
289+
this.heartbeatTask.cancel();
290+
this.heartbeatTask = null;
314291
}
315-
this.heartbeatLock.unlock();
316-
}
317-
catch (Throwable ex) {
318-
logger.debug("Failure while cancelling heartbeat in session " + getId(), ex);
319292
}
320293
}
321294

@@ -465,4 +438,28 @@ public String toString() {
465438
return getClass().getSimpleName() + "[id=" + getId() + "]";
466439
}
467440

441+
442+
private class HeartbeatTask implements Runnable {
443+
444+
private boolean expired;
445+
446+
@Override
447+
public void run() {
448+
synchronized (heartbeatLock) {
449+
if (!this.expired) {
450+
try {
451+
sendHeartbeat();
452+
}
453+
finally {
454+
this.expired = true;
455+
}
456+
}
457+
}
458+
}
459+
460+
void cancel() {
461+
this.expired = true;
462+
}
463+
}
464+
468465
}

spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ public void scheduleHeartbeatNotActive() throws Exception {
270270
@Test
271271
public void sendHeartbeatWhenDisabled() throws Exception {
272272
this.session.disableHeartbeat();
273+
this.session.setActive(true);
273274
this.session.sendHeartbeat();
274275

275276
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
@@ -292,7 +293,6 @@ public void scheduleAndCancelHeartbeat() throws Exception {
292293

293294
this.session.cancelHeartbeat();
294295

295-
verify(task).isCancelled();
296296
verify(task).cancel(false);
297297
verifyNoMoreInteractions(task);
298298
}

0 commit comments

Comments
 (0)