Skip to content

Commit 876adfd

Browse files
authored
Bug fix for S3AsyncClient.putObject hangs if there is a connection reset set while uploading of object (#3535)
* Bug fix for S3AsyncClient.putObject hangs if there is a connection reset while uploading of objects * Changing LAST_HTTP_CONTENT_RECEIVED_KEY key name to STREAMING_COMPLETE_KEY and setting it irrespective of ignoreBodyRead flag * Added SimpleHttpContentPublisher and removed SdkTestHttpContentPublisher * Moving log in handleReadHttpContent() to log irrespective of ignoreBodyRead flag
1 parent 764d148 commit 876adfd

File tree

10 files changed

+253
-139
lines changed

10 files changed

+253
-139
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO Http Client",
4+
"contributor": "",
5+
"description": "Fix for Netty based client request getting stuck if connection is reset after recieveing Http Continue response."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public final class ChannelAttributeKey {
7373
public static final AttributeKey<ChannelDiagnostics> CHANNEL_DIAGNOSTICS = NettyUtils.getOrCreateAttributeKey(
7474
"aws.http.nio.netty.async.channelDiagnostics");
7575

76+
/**
77+
* {@link AttributeKey} to keep track of whether the streaming is completed and this is set to true when we receive the *
78+
* {@link LastHttpContent}.
79+
*/
80+
public static final AttributeKey<Boolean> STREAMING_COMPLETE_KEY = NettyUtils.getOrCreateAttributeKey(
81+
"aws.http.nio.netty.async.streamingComplete");
82+
7683
/**
7784
* {@link AttributeKey} to keep track of whether we should close the connection after this request
7885
* has completed.
@@ -100,12 +107,6 @@ public final class ChannelAttributeKey {
100107
static final AttributeKey<Long> RESPONSE_DATA_READ = NettyUtils.getOrCreateAttributeKey(
101108
"aws.http.nio.netty.async.responseDataRead");
102109

103-
/**
104-
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
105-
*/
106-
static final AttributeKey<Boolean> LAST_HTTP_CONTENT_RECEIVED_KEY = NettyUtils.getOrCreateAttributeKey(
107-
"aws.http.nio.netty.async.lastHttpContentReceived");
108-
109110
static final AttributeKey<CompletableFuture<Void>> EXECUTE_FUTURE_KEY = NettyUtils.getOrCreateAttributeKey(
110111
"aws.http.nio.netty.async.executeFuture");
111112

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public void channelReleased(Channel channel) {
4949
if (channel.isOpen() || channel.isRegistered()) {
5050
removeIfExists(channel.pipeline(),
5151
HttpStreamsClientHandler.class,
52-
LastHttpContentHandler.class,
5352
FlushOnReadHandler.class,
5453
ResponseHandler.class,
5554
ReadTimeoutHandler.class,

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/LastHttpContentHandler.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
2222
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
2323
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
24-
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
2524
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2625
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
2726
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_CONTENT_LENGTH;
2827
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
28+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
2929
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
3030

3131
import io.netty.buffer.ByteBuf;
@@ -195,7 +195,7 @@ private void configureChannel() {
195195
channel.attr(EXECUTE_FUTURE_KEY).set(executeFuture);
196196
channel.attr(REQUEST_CONTEXT_KEY).set(context);
197197
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
198-
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
198+
channel.attr(STREAMING_COMPLETE_KEY).set(false);
199199
channel.attr(RESPONSE_CONTENT_LENGTH).set(null);
200200
channel.attr(RESPONSE_DATA_READ).set(null);
201201
channel.attr(CHANNEL_DIAGNOSTICS).get().incrementRequestCount();
@@ -220,7 +220,6 @@ private void configurePipeline() throws IOException {
220220
throw new IOException("Unknown protocol: " + protocol);
221221
}
222222

223-
pipeline.addLast(LastHttpContentHandler.create());
224223
if (protocol == Protocol.HTTP2) {
225224
pipeline.addLast(FlushOnReadHandler.getInstance());
226225
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import static java.util.stream.Collectors.mapping;
2020
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
2121
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
22-
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
2322
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2423
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
2524
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_CONTENT_LENGTH;
2625
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
2726
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_STATUS_CODE;
27+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
2828
import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch;
2929
import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatchFinally;
3030

@@ -463,10 +463,10 @@ public void cancel() {
463463
private void notifyIfResponseNotCompleted(ChannelHandlerContext handlerCtx) {
464464
RequestContext requestCtx = handlerCtx.channel().attr(REQUEST_CONTEXT_KEY).get();
465465
Boolean responseCompleted = handlerCtx.channel().attr(RESPONSE_COMPLETE_KEY).get();
466-
Boolean lastHttpContentReceived = handlerCtx.channel().attr(LAST_HTTP_CONTENT_RECEIVED_KEY).get();
466+
Boolean isStreamingComplete = handlerCtx.channel().attr(STREAMING_COMPLETE_KEY).get();
467467
handlerCtx.channel().attr(KEEP_ALIVE).set(false);
468468

469-
if (!Boolean.TRUE.equals(responseCompleted) && !Boolean.TRUE.equals(lastHttpContentReceived)) {
469+
if (!Boolean.TRUE.equals(responseCompleted) && !Boolean.TRUE.equals(isStreamingComplete)) {
470470
IOException err = new IOException(NettyUtils.closedChannelMessage(handlerCtx.channel()));
471471
runAndLogError(handlerCtx.channel(), () -> "Fail to execute SdkAsyncHttpResponseHandler#onError",
472472
() -> requestCtx.handler().onError(err));

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/nrs/HttpStreamsHandler.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal.nrs;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
19+
1820
import io.netty.channel.ChannelDuplexHandler;
1921
import io.netty.channel.ChannelFuture;
2022
import io.netty.channel.ChannelFutureListener;
@@ -30,6 +32,7 @@
3032
import org.reactivestreams.Publisher;
3133
import org.reactivestreams.Subscriber;
3234
import software.amazon.awssdk.annotations.SdkInternalApi;
35+
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger;
3336

3437
/**
3538
* This class contains source imported from https://github.com/playframework/netty-reactive-streams,
@@ -42,6 +45,7 @@
4245
@SdkInternalApi
4346
abstract class HttpStreamsHandler<InT extends HttpMessage, OutT extends HttpMessage> extends ChannelDuplexHandler {
4447

48+
private static final NettyClientLogger logger = NettyClientLogger.getLogger(HttpStreamsHandler.class);
4549
private final Queue<Outgoing> outgoing = new LinkedList<>();
4650
private final Class<InT> inClass;
4751
private final Class<OutT> outClass;
@@ -209,11 +213,16 @@ private void handleCancelled(ChannelHandlerContext ctx, InT msg) {
209213
}
210214

211215
private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
216+
boolean lastHttpContent = content instanceof LastHttpContent;
217+
if (lastHttpContent) {
218+
logger.debug(ctx.channel(),
219+
() -> "Received LastHttpContent " + ctx.channel() + " with ignoreBodyRead as " + ignoreBodyRead);
220+
ctx.channel().attr(STREAMING_COMPLETE_KEY).set(true);
221+
}
212222
if (!ignoreBodyRead) {
213-
if (content instanceof LastHttpContent) {
214-
223+
if (lastHttpContent) {
215224
if (content.content().readableBytes() > 0 ||
216-
!((LastHttpContent) content).trailingHeaders().isEmpty()) {
225+
!((LastHttpContent) content).trailingHeaders().isEmpty()) {
217226
// It has data or trailing headers, send them
218227
ctx.fireChannelRead(content);
219228
} else {
@@ -230,7 +239,7 @@ private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent conten
230239

231240
} else {
232241
ReferenceCountUtil.release(content);
233-
if (content instanceof LastHttpContent) {
242+
if (lastHttpContent) {
234243
ignoreBodyRead = false;
235244
if (currentlyStreamedMessage != null) {
236245
removeHandlerIfActive(ctx, ctx.name() + "-body-publisher");

0 commit comments

Comments
 (0)