16
16
17
17
package org .springframework .http .client ;
18
18
19
+ import java .io .FilterInputStream ;
19
20
import java .io .IOException ;
20
21
import java .io .InputStream ;
21
22
import java .io .UncheckedIOException ;
22
23
import java .net .URI ;
23
24
import java .net .http .HttpClient ;
24
25
import java .net .http .HttpRequest ;
25
26
import java .net .http .HttpResponse ;
27
+ import java .net .http .HttpTimeoutException ;
26
28
import java .nio .ByteBuffer ;
27
29
import java .time .Duration ;
28
30
import java .util .Collections ;
29
31
import java .util .Set ;
30
32
import java .util .TreeSet ;
33
+ import java .util .concurrent .CancellationException ;
34
+ import java .util .concurrent .CompletableFuture ;
31
35
import java .util .concurrent .ExecutionException ;
32
36
import java .util .concurrent .Executor ;
33
37
import java .util .concurrent .Flow ;
34
38
import java .util .concurrent .TimeUnit ;
35
- import java .util .concurrent .TimeoutException ;
36
-
37
39
import org .springframework .http .HttpHeaders ;
38
40
import org .springframework .http .HttpMethod ;
39
41
import org .springframework .lang .Nullable ;
@@ -92,28 +94,46 @@ public URI getURI() {
92
94
@ Override
93
95
@ SuppressWarnings ("NullAway" )
94
96
protected ClientHttpResponse executeInternal (HttpHeaders headers , @ Nullable Body body ) throws IOException {
97
+ HttpRequest request = buildRequest (headers , body );
98
+ CompletableFuture <HttpResponse <InputStream >> responsefuture =
99
+ this .httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ());
95
100
try {
96
- HttpRequest request = buildRequest (headers , body );
97
- HttpResponse <InputStream > response ;
98
101
if (this .timeout != null ) {
99
- response = this .httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ())
100
- .get (this .timeout .toMillis (), TimeUnit .MILLISECONDS );
101
- }
102
- else {
103
- response = this .httpClient .send (request , HttpResponse .BodyHandlers .ofInputStream ());
102
+ CompletableFuture <Void > timeoutFuture = new CompletableFuture <Void >()
103
+ .completeOnTimeout (null , this .timeout .toMillis (), TimeUnit .MILLISECONDS );
104
+ timeoutFuture .thenRun (() -> {
105
+ if (!responsefuture .cancel (true ) && !responsefuture .isCompletedExceptionally ()) {
106
+ try {
107
+ responsefuture .resultNow ().body ().close ();
108
+ } catch (IOException ignored ) {}
109
+ }
110
+ });
111
+ var response = responsefuture .get ();
112
+ return new JdkClientHttpResponse (response .statusCode (), response .headers (), new FilterInputStream (response .body ()) {
113
+
114
+ @ Override
115
+ public void close () throws IOException {
116
+ timeoutFuture .cancel (false );
117
+ super .close ();
118
+ }
119
+ });
120
+
121
+ } else {
122
+ var response = responsefuture .get ();
123
+ return new JdkClientHttpResponse (response .statusCode (), response .headers (), response .body ());
104
124
}
105
- return new JdkClientHttpResponse (response );
106
- }
107
- catch (UncheckedIOException ex ) {
108
- throw ex .getCause ();
109
125
}
110
126
catch (InterruptedException ex ) {
111
127
Thread .currentThread ().interrupt ();
128
+ responsefuture .cancel (true );
112
129
throw new IOException ("Request was interrupted: " + ex .getMessage (), ex );
113
130
}
114
131
catch (ExecutionException ex ) {
115
132
Throwable cause = ex .getCause ();
116
133
134
+ if (cause instanceof CancellationException caEx ) {
135
+ throw new HttpTimeoutException ("Request timed out" );
136
+ }
117
137
if (cause instanceof UncheckedIOException uioEx ) {
118
138
throw uioEx .getCause ();
119
139
}
@@ -127,17 +147,11 @@ else if (cause instanceof IOException ioEx) {
127
147
throw new IOException (cause .getMessage (), cause );
128
148
}
129
149
}
130
- catch (TimeoutException ex ) {
131
- throw new IOException ("Request timed out: " + ex .getMessage (), ex );
132
- }
133
150
}
134
151
135
152
136
153
private HttpRequest buildRequest (HttpHeaders headers , @ Nullable Body body ) {
137
154
HttpRequest .Builder builder = HttpRequest .newBuilder ().uri (this .uri );
138
- if (this .timeout != null ) {
139
- builder .timeout (this .timeout );
140
- }
141
155
142
156
headers .forEach ((headerName , headerValues ) -> {
143
157
if (!DISALLOWED_HEADERS .contains (headerName .toLowerCase ())) {
0 commit comments