-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Affects Version(s): 5.1.1.RELEASE +
Bug report
TimeoutCountSequenceSizeReleaseStrategy
appears to have a bug where it will not respect the timeout when there are multiple messages in the MessageGroup.
The bug appears to have been introduced in 5.1.1.RELEASE when a null check was added. The change causes the code to default to a timeout of Long.MAX_VALUE
if any of the timestamp are null. Unfortunately it introduced this bug where if any message in the group has a timestamp larger than the first it also triggers this default timeout.
The behaviour is demonstrated in this simple example. Using spring integration 5.1.0.RELEASE it will print out a message with payload [hello, world]
. In 5.1.1.RELEASE and later versions it will not print out any message as the release strategy will not release the messages even though the timeout has been reached.
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
@SpringBootApplication
@Configuration
public class TimeoutCountSequenceBugApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(TimeoutCountSequenceBugApplication.class, args);
Example gateway = context.getBean(Example.class);
gateway.submit("hello");
Thread.sleep(2000);
gateway.submit("world");
}
@MessagingGateway
public interface Example {
@Gateway(requestChannel = "example.input")
void submit(String s);
}
@Bean
IntegrationFlow example() {
return f -> f
.aggregate(
agg -> agg
.correlationStrategy(s -> "")
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(Integer.MAX_VALUE, 1000))
)
.handle(msg -> System.out.println(msg));
}
}
The offending code appears to be this method within the TimeoutCountSequenceSizeReleaseStrategy
class
private long findEarliestTimestamp(MessageGroup messages) {
long result = Long.MAX_VALUE;
for (Message<?> message : messages.getMessages()) {
Long timestamp = message.getHeaders().getTimestamp();
if (timestamp != null && timestamp < result) {
result = timestamp;
}
else {
return Long.MAX_VALUE; // can't release based on time if there is no timestamp
}
}
return result;
}
Simply removing the else block should solve the issue. It would cause the code to default to Long.MAX_VALUE
only if all messages do not have a timestamp header, rather than any of them, and return the earliest non-null timestamp otherwise.