Skip to content

Commit 8d076f8

Browse files
committed
Bug fix for S3AsyncClient.putObject hangs if there is a connection reset while uploading of objects
1 parent d1a23e2 commit 8d076f8

File tree

9 files changed

+206
-131
lines changed

9 files changed

+206
-131
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO Http Client",
4+
"contributor": "",
5+
"description": "Fix for Netty based client request getting stuck if connection is reset after recieveing Http Continue response."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public final class ChannelAttributeKey {
7373
public static final AttributeKey<ChannelDiagnostics> CHANNEL_DIAGNOSTICS = NettyUtils.getOrCreateAttributeKey(
7474
"aws.http.nio.netty.async.channelDiagnostics");
7575

76+
/**
77+
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
78+
*/
79+
public static final AttributeKey<Boolean> LAST_HTTP_CONTENT_RECEIVED_KEY = NettyUtils.getOrCreateAttributeKey(
80+
"aws.http.nio.netty.async.lastHttpContentReceived");
81+
7682
/**
7783
* {@link AttributeKey} to keep track of whether we should close the connection after this request
7884
* has completed.
@@ -100,12 +106,6 @@ public final class ChannelAttributeKey {
100106
static final AttributeKey<Long> RESPONSE_DATA_READ = NettyUtils.getOrCreateAttributeKey(
101107
"aws.http.nio.netty.async.responseDataRead");
102108

103-
/**
104-
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
105-
*/
106-
static final AttributeKey<Boolean> LAST_HTTP_CONTENT_RECEIVED_KEY = NettyUtils.getOrCreateAttributeKey(
107-
"aws.http.nio.netty.async.lastHttpContentReceived");
108-
109109
static final AttributeKey<CompletableFuture<Void>> EXECUTE_FUTURE_KEY = NettyUtils.getOrCreateAttributeKey(
110110
"aws.http.nio.netty.async.executeFuture");
111111

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public void channelReleased(Channel channel) {
4949
if (channel.isOpen() || channel.isRegistered()) {
5050
removeIfExists(channel.pipeline(),
5151
HttpStreamsClientHandler.class,
52-
LastHttpContentHandler.class,
5352
FlushOnReadHandler.class,
5453
ResponseHandler.class,
5554
ReadTimeoutHandler.class,

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/LastHttpContentHandler.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ private void configurePipeline() throws IOException {
220220
throw new IOException("Unknown protocol: " + protocol);
221221
}
222222

223-
pipeline.addLast(LastHttpContentHandler.create());
224223
if (protocol == Protocol.HTTP2) {
225224
pipeline.addLast(FlushOnReadHandler.getInstance());
226225
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/nrs/HttpStreamsHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal.nrs;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
19+
1820
import io.netty.channel.ChannelDuplexHandler;
1921
import io.netty.channel.ChannelFuture;
2022
import io.netty.channel.ChannelFutureListener;
@@ -211,7 +213,7 @@ private void handleCancelled(ChannelHandlerContext ctx, InT msg) {
211213
private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
212214
if (!ignoreBodyRead) {
213215
if (content instanceof LastHttpContent) {
214-
216+
ctx.channel().attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(true);
215217
if (content.content().readableBytes() > 0 ||
216218
!((LastHttpContent) content).trailingHeaders().isEmpty()) {
217219
// It has data or trailing headers, send them
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package software.amazon.awssdk.http.nio.netty.fault;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.Optional;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
import org.reactivestreams.Subscriber;
8+
import org.reactivestreams.Subscription;
9+
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
10+
11+
public class SdkTestHttpContentPublisher implements SdkHttpContentPublisher {
12+
private final byte[] body;
13+
private final AtomicReference<Subscriber<? super ByteBuffer>> subscriber = new AtomicReference<>(null);
14+
private final AtomicBoolean complete = new AtomicBoolean(false);
15+
16+
public SdkTestHttpContentPublisher(byte[] body) {
17+
this.body = body;
18+
}
19+
20+
@Override
21+
public void subscribe(Subscriber<? super ByteBuffer> s) {
22+
boolean wasFirstSubscriber = subscriber.compareAndSet(null, s);
23+
24+
SdkTestHttpContentPublisher publisher = this;
25+
26+
if (wasFirstSubscriber) {
27+
s.onSubscribe(new Subscription() {
28+
@Override
29+
public void request(long n) {
30+
publisher.request(n);
31+
}
32+
33+
@Override
34+
public void cancel() {
35+
// Do nothing
36+
}
37+
});
38+
} else {
39+
s.onError(new RuntimeException("Only allow one subscriber"));
40+
}
41+
}
42+
43+
protected void request(long n) {
44+
// Send the whole body if they request >0 ByteBuffers
45+
if (n > 0 && !complete.get()) {
46+
complete.set(true);
47+
subscriber.get().onNext(ByteBuffer.wrap(body));
48+
subscriber.get().onComplete();
49+
}
50+
}
51+
52+
@Override
53+
public Optional<Long> contentLength() {
54+
return Optional.of((long)body.length);
55+
}
56+
}

0 commit comments

Comments
 (0)