-
Notifications
You must be signed in to change notification settings - Fork 944
Description
Describe the bug
I get the same defect as described here: #3335 when I use the SubscribeToShard API. It hungs and no other visitor.visit() is executed after the Kinesis Async Client throws
java.io.IOException: Response had content-length of 28 bytes, but only received 0 bytes before the connection was closed.
Basically, stream consumption is halted afterwards regardless of the fact that I use the onError
handler in the request.
Expected Behavior
The SubscribeToShard API should be able to resume stream consumption after encountering IOException.
Current Behavior
The thread hangs and the client does not continue to consume the stream.
Reproduction Steps
It is difficult to reproduce as it happens once per week randomly when the message length does not pass the validation.
My dependencies:
<dependency>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<version>2.20.123</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.20.123</version>
</dependency>
Here is my setup.
I call (in Kotlin):
kinesisClient.subscribeToShard(requestToSubscribe, responseHandler).get()
whereby:
kinesisClient
issoftware.amazon.awssdk.services.kinesis.KinesisAsyncClient
.- `` is defined as:
SubscribeToShardRequest.builder()
.consumerARN(<some efo consumer arn>)
.shardId(<some shard name>)
.startingPosition(<some starting position>)
.build()
- `` is defined as:
SubscribeToShardResponseHandler
.builder()
.onResponse{
log.info("Subscription to Kinesis stream started {}", it)
}
.onComplete {
log.info("Subscription to Kinesis stream completed")
}
.onError { t: Throwable ->
when (t) {
is ResourceInUseException -> log.debug("Other instance is already subscribed to Kinesis: " + t.message)
else -> log.error("Error when reading data from processor: stack={}", t.stackTrace)
}
}.subscriber(<some visitor>)
.build()
Possible Solution
No response
Additional Information/Context
Stack Trace:
java.io.IOException: Response had content-length of 28 bytes, but only received 0 bytes before the connection was closed.
software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.validateResponseContentLength(ResponseHandler.java:163), software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$700(ResponseHandler.java:75), software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:369), software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.complete(HandlerPublisher.java:447), software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelInactive(HandlerPublisher.java:430), io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303), io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281), io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274), io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81), io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277), io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303), io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281), io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274), io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405), io.netty.channel.AbstractChannelHandlerContext
AWS Java SDK version used
2.20.43 / kcl 2.5.2
JDK version used
11.0.9
Operating System and version
linux (different versions) x86_64