Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
*
* @author Soby Chacko
* @author Chris Bono
* @author Alexander Preuß
*/
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -166,8 +167,16 @@
* The bean name or a 'SpEL' expression that resolves to a
* {@link org.apache.pulsar.client.api.RedeliveryBackoff} to use on the consumer to
* control the redelivery backoff of messages after a negative ack.
* @return the bean name or empty string to not set the backoff
* @return the bean name or empty string to not set the backoff.
*/
String negativeAckRedeliveryBackoff() default "";

/**
* The bean name or a 'SpEL' expression that resolves to a
* {@link org.apache.pulsar.client.api.DeadLetterPolicy} to use on the consumer to
* configure a dead letter policy for message redelivery.
* @return the bean name or empty string to not set any dead letter policy.
*/
String deadLetterPolicy() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.SubscriptionType;

Expand Down Expand Up @@ -363,6 +364,7 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
endpoint.setBeanFactory(this.beanFactory);

resolveNegativeAckRedeliveryBackoff(endpoint, pulsarListener);
resolveDeadLetterPolicy(endpoint, pulsarListener);
}

private void resolveNegativeAckRedeliveryBackoff(MethodPulsarListenerEndpoint<?> endpoint,
Expand All @@ -381,6 +383,21 @@ private void resolveNegativeAckRedeliveryBackoff(MethodPulsarListenerEndpoint<?>
}
}

private void resolveDeadLetterPolicy(MethodPulsarListenerEndpoint<?> endpoint, PulsarListener pulsarListener) {
Object deadLetterPolicy = resolveExpression(pulsarListener.deadLetterPolicy());
if (deadLetterPolicy instanceof DeadLetterPolicy) {
endpoint.setDeadLetterPolicy((DeadLetterPolicy) deadLetterPolicy);
}
else {
String deadLetterPolicyBeanName = resolveExpressionAsString(pulsarListener.deadLetterPolicy(),
"deadLetterPolicy");
if (StringUtils.hasText(deadLetterPolicyBeanName)) {
endpoint.setDeadLetterPolicy(
this.beanFactory.getBean(deadLetterPolicyBeanName, DeadLetterPolicy.class));
}
}
}

private Integer resolveExpressionAsInteger(String value, String attribute) {
Object resolved = resolveExpression(value);
Integer result = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.RedeliveryBackoff;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class MethodPulsarListenerEndpoint<V> extends AbstractPulsarListenerEndpo

private RedeliveryBackoff negativeAckRedeliveryBackoff;

private DeadLetterPolicy deadLetterPolicy;

public void setBean(Object bean) {
this.bean = bean;
}
Expand Down Expand Up @@ -176,6 +179,7 @@ protected PulsarMessagingMessageListenerAdapter<V> createMessageListener(PulsarM
pulsarContainerProperties.setSchemaType(type);

container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff);
container.setDeadLetterPolicy(this.deadLetterPolicy);

return messageListener;
}
Expand Down Expand Up @@ -254,4 +258,8 @@ public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliv
this.negativeAckRedeliveryBackoff = negativeAckRedeliveryBackoff;
}

public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
Expand All @@ -36,6 +37,7 @@
*
* @param <T> underlying payload type for the consumer.
* @author Soby Chacko
* @author Alexander Preuß
*/
public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T> {

Expand Down Expand Up @@ -77,10 +79,21 @@ public Consumer<T> createConsumer(Schema<T> schema, BatchReceivePolicy batchRece
final Map<String, Object> properties = new HashMap<>(this.consumerConfig);
properties.putAll(propertiesToOverride);

// Remove deadLetterPolicy from the properties here and save it to re-apply after
// calling `loadConf` (https://github.com/apache/pulsar/issues/11646)
DeadLetterPolicy deadLetterPolicy = null;
if (properties.containsKey("deadLetterPolicy")) {
deadLetterPolicy = (DeadLetterPolicy) properties.remove("deadLetterPolicy");
}

if (!CollectionUtils.isEmpty(properties)) {
consumerBuilder.loadConf(properties);
}

if (deadLetterPolicy != null) {
consumerBuilder.deadLetterPolicy(deadLetterPolicy);
}

if (properties.containsKey("negativeAckRedeliveryBackoff")) {
final RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) properties
.get("negativeAckRedeliveryBackoff");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.pulsar.listener;

import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RedeliveryBackoff;

import org.springframework.beans.BeansException;
Expand All @@ -35,6 +36,7 @@
*
* @param <T> message type.
* @author Soby Chacko
* @author Alexander Preuß
*/
public abstract class AbstractPulsarMessageListenerContainer<T> implements PulsarMessageListenerContainer,
BeanNameAware, ApplicationEventPublisherAware, ApplicationContextAware {
Expand All @@ -61,6 +63,8 @@ public abstract class AbstractPulsarMessageListenerContainer<T> implements Pulsa

protected RedeliveryBackoff negativeAckRedeliveryBackoff;

protected DeadLetterPolicy deadLetterPolicy;

@SuppressWarnings("unchecked")
protected AbstractPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties) {
Expand Down Expand Up @@ -185,4 +189,13 @@ public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
return this.negativeAckRedeliveryBackoff;
}

@Override
public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @param <T> the payload type.
* @author Soby Chacko
* @author Alexander Preuß
*/
public class ConcurrentPulsarMessageListenerContainer<T> extends AbstractPulsarMessageListenerContainer<T> {

Expand Down Expand Up @@ -109,6 +110,7 @@ private void configureChildContainer(int index, DefaultPulsarMessageListenerCont
container.getContainerProperties().setConsumerTaskExecutor(exec);
}
container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff);
container.setDeadLetterPolicy(this.deadLetterPolicy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
Expand Down Expand Up @@ -239,6 +240,10 @@ private void populateAllNecessaryPropertiesIfNeedBe(Map<String, Object> currentP
if (negativeAckRedeliveryBackoff != null) {
currentProperties.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
}
final DeadLetterPolicy deadLetterPolicy = DefaultPulsarMessageListenerContainer.this.deadLetterPolicy;
if (deadLetterPolicy != null) {
currentProperties.put("deadLetterPolicy", deadLetterPolicy);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.pulsar.listener;

import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RedeliveryBackoff;

import org.springframework.beans.factory.DisposableBean;
Expand Down Expand Up @@ -46,4 +47,6 @@ default PulsarContainerProperties getContainerProperties() {

void setNegativeAckRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff);

void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -41,28 +42,28 @@

/**
* @author Soby Chacko
* @author Alexander Preuß
*/
public class ConcurrentPulsarMessageListenerContainerTests {

@Test
@SuppressWarnings("unchecked")
void nackRedeliveryBackoffAppliedOnChildContainer() throws Exception {
PulsarConsumerFactory<String> pulsarConsumerFactory = mock(PulsarConsumerFactory.class);
Consumer<String> consumer = mock(Consumer.class);
void deadLetterPolicyAppliedOnChildContainer() throws Exception {
PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Shared);
ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = env.concurrentContainer();
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder().maxRedeliverCount(5).deadLetterTopic("dlq-topic")
.retryLetterTopic("retry-topic").build();
concurrentContainer.setDeadLetterPolicy(deadLetterPolicy);

when(pulsarConsumerFactory.createConsumer(any(Schema.class), any(BatchReceivePolicy.class), any(Map.class)))
.thenReturn(consumer);

when(consumer.batchReceive()).thenReturn(mock(Messages.class));
concurrentContainer.start();

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setSchema(Schema.STRING);
pulsarContainerProperties.setSubscriptionType(SubscriptionType.Shared);
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (cons, msg) -> {
});
final DefaultPulsarMessageListenerContainer<String> childContainer = concurrentContainer.getContainers().get(0);
assertThat(childContainer.getDeadLetterPolicy()).isEqualTo(deadLetterPolicy);
}

ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = new ConcurrentPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
@Test
void nackRedeliveryBackoffAppliedOnChildContainer() throws Exception {
PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Shared);
ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = env.concurrentContainer();
RedeliveryBackoff redeliveryBackoff = MultiplierRedeliveryBackoff.builder().minDelayMs(1000)
.maxDelayMs(5 * 1000).build();
concurrentContainer.setNegativeAckRedeliveryBackoff(redeliveryBackoff);
Expand All @@ -76,35 +77,34 @@ void nackRedeliveryBackoffAppliedOnChildContainer() throws Exception {
@Test
@SuppressWarnings("unchecked")
void basicConcurrencyTesting() throws Exception {
PulsarConsumerFactory<String> pulsarConsumerFactory = mock(PulsarConsumerFactory.class);
Consumer<String> consumer = mock(Consumer.class);

when(pulsarConsumerFactory.createConsumer(any(Schema.class), any(BatchReceivePolicy.class), any(Map.class)))
.thenReturn(consumer);

when(consumer.batchReceive()).thenReturn(mock(Messages.class));

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setSchema(Schema.STRING);
pulsarContainerProperties.setSubscriptionType(SubscriptionType.Failover);
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (cons, msg) -> {
});

ConcurrentPulsarMessageListenerContainer<String> container = new ConcurrentPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);
PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Failover);
PulsarConsumerFactory<String> pulsarConsumerFactory = env.consumerFactory();
Consumer<String> consumer = env.consumer();
ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = env.concurrentContainer();

container.setConcurrency(3);
concurrentContainer.setConcurrency(3);

container.start();
concurrentContainer.start();

await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(pulsarConsumerFactory, times(3))
.createConsumer(any(Schema.class), any(BatchReceivePolicy.class), any(Map.class)));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(consumer, times(3)).batchReceive());
}

@Test
@SuppressWarnings("unchecked")
void exclusiveSubscriptionMustUseSingleThread() throws Exception {
PulsarListenerMockComponents env = setupListenerMockComponents(SubscriptionType.Exclusive);
ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = env.concurrentContainer();

concurrentContainer.setConcurrency(3);

assertThatThrownBy(concurrentContainer::start).isInstanceOf(IllegalStateException.class)
.hasMessage("concurrency > 1 is not allowed on Exclusive subscription type");
}

@SuppressWarnings("unchecked")
private PulsarListenerMockComponents setupListenerMockComponents(SubscriptionType subscriptionType)
throws Exception {
PulsarConsumerFactory<String> pulsarConsumerFactory = mock(PulsarConsumerFactory.class);
Consumer<String> consumer = mock(Consumer.class);

Expand All @@ -115,16 +115,18 @@ void exclusiveSubscriptionMustUseSingleThread() throws Exception {

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setSchema(Schema.STRING);
pulsarContainerProperties.setSubscriptionType(subscriptionType);
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (cons, msg) -> {
});

ConcurrentPulsarMessageListenerContainer<String> container = new ConcurrentPulsarMessageListenerContainer<>(
ConcurrentPulsarMessageListenerContainer<String> concurrentContainer = new ConcurrentPulsarMessageListenerContainer<>(
pulsarConsumerFactory, pulsarContainerProperties);

container.setConcurrency(3);
return new PulsarListenerMockComponents(pulsarConsumerFactory, consumer, concurrentContainer);
}

assertThatThrownBy(container::start).isInstanceOf(IllegalStateException.class)
.hasMessage("concurrency > 1 is not allowed on Exclusive subscription type");
private record PulsarListenerMockComponents(PulsarConsumerFactory<String> consumerFactory,
Consumer<String> consumer, ConcurrentPulsarMessageListenerContainer<String> concurrentContainer) {
}

}
Loading