18
18
19
19
package io .undertow .conduits ;
20
20
21
- import java .io .IOException ;
22
- import java .nio .ByteBuffer ;
23
- import java .nio .channels .FileChannel ;
24
- import java .util .concurrent .TimeUnit ;
25
-
26
21
import io .undertow .UndertowLogger ;
27
22
import io .undertow .UndertowMessages ;
28
23
import io .undertow .UndertowOptions ;
41
36
import org .xnio .conduits .ReadReadyHandler ;
42
37
import org .xnio .conduits .StreamSourceConduit ;
43
38
39
+ import java .io .IOException ;
40
+ import java .nio .ByteBuffer ;
41
+ import java .nio .channels .FileChannel ;
42
+ import java .util .concurrent .TimeUnit ;
43
+
44
44
/**
45
45
* Wrapper for read timeout. This should always be the first wrapper applied to the underlying channel.
46
46
*
49
49
*/
50
50
public final class ReadTimeoutStreamSourceConduit extends AbstractStreamSourceConduit <StreamSourceConduit > {
51
51
52
- private XnioExecutor .Key handle ;
52
+ private volatile XnioExecutor .Key handle ;
53
53
private final StreamConnection connection ;
54
54
private volatile long expireTime = -1 ;
55
55
private final OpenListener openListener ;
@@ -60,14 +60,21 @@ public final class ReadTimeoutStreamSourceConduit extends AbstractStreamSourceCo
60
60
private final Runnable timeoutCommand = new Runnable () {
61
61
@ Override
62
62
public void run () {
63
- handle = null ;
64
- if (expireTime == -1 ) {
63
+ synchronized (ReadTimeoutStreamSourceConduit .this ) {
64
+ handle = null ;
65
+ }
66
+ if (expireTime == -1 || !connection .isOpen ()) {
65
67
return ;
66
68
}
67
69
long current = System .currentTimeMillis ();
68
70
if (current < expireTime ) {
69
71
//timeout has been bumped, re-schedule
70
- handle = WorkerUtils .executeAfter (connection .getIoThread (),timeoutCommand , (expireTime - current ) + FUZZ_FACTOR , TimeUnit .MILLISECONDS );
72
+ if (handle == null ) {
73
+ synchronized (ReadTimeoutStreamSourceConduit .this ) {
74
+ if (handle == null )
75
+ handle = WorkerUtils .executeAfter (connection .getIoThread (), timeoutCommand , (expireTime - current ) + FUZZ_FACTOR , TimeUnit .MILLISECONDS );
76
+ }
77
+ }
71
78
return ;
72
79
}
73
80
UndertowLogger .REQUEST_LOGGER .tracef ("Timing out channel %s due to inactivity" , connection .getSourceChannel ());
@@ -131,12 +138,16 @@ private void handleReadTimeout(final long ret) throws IOException {
131
138
final long expireTimeVar = expireTime ;
132
139
if (expireTimeVar != -1 && currentTime > expireTimeVar ) {
133
140
IoUtils .safeClose (connection );
134
- throw UndertowMessages .MESSAGES .readTimedOut (this .getTimeout ());
141
+ throw UndertowMessages .MESSAGES .readTimedOut (currentTime - ( expireTimeVar - this .getTimeout () ));
135
142
}
136
143
}
137
144
expireTime = currentTime + timeout ;
138
145
if (handle == null ) {
139
- handle = connection .getIoThread ().executeAfter (timeoutCommand , timeout , TimeUnit .MILLISECONDS );
146
+ synchronized (this ) {
147
+ if (handle == null )
148
+ handle = connection .getIoThread ().executeAfter (timeoutCommand , timeout , TimeUnit .MILLISECONDS );
149
+ }
150
+
140
151
}
141
152
}
142
153
@@ -232,9 +243,13 @@ public void terminateReads() throws IOException {
232
243
233
244
private void cleanup () {
234
245
if (handle != null ) {
235
- handle .remove ();
236
- handle = null ;
237
- expireTime = -1 ;
246
+ synchronized (this ) {
247
+ if (handle != null ) {
248
+ handle .remove ();
249
+ handle = null ;
250
+ expireTime = -1 ;
251
+ }
252
+ }
238
253
}
239
254
}
240
255
@@ -247,7 +262,7 @@ public void suspendReads() {
247
262
private void checkExpired () throws ReadTimeoutException {
248
263
synchronized (this ) {
249
264
if (expired ) {
250
- throw UndertowMessages .MESSAGES .readTimedOut (System .currentTimeMillis ());
265
+ throw UndertowMessages .MESSAGES .readTimedOut (System .currentTimeMillis () - ( expireTime - getTimeout ()) );
251
266
}
252
267
}
253
268
}
0 commit comments