Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* The {@link ProducerFactory} implementation for a {@code singleton} shared
Expand Down Expand Up @@ -92,7 +93,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
*/
public static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30);

private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class); // NOSONAR
private static final Log LOGGER = LogFactory.getLog(DefaultKafkaProducerFactory.class);

private final Map<String, Object> configs;

Expand All @@ -102,8 +103,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();

private volatile CloseSafeProducer<K, V> producer;

private Serializer<K> keySerializer;

private Serializer<V> valueSerializer;
Expand All @@ -116,6 +115,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private boolean producerPerConsumerPartition = true;

private volatile CloseSafeProducer<K, V> producer;

/**
* Construct a factory with the provided configuration.
* @param configs the configuration.
Expand All @@ -126,16 +127,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {

/**
* Construct a factory with the provided configuration and {@link Serializer}s.
* Also configures a {@link #transactionIdPrefix} as a value from the
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
* This config is going to be overridden with a suffix for target {@link Producer} instance.
* @param configs the configuration.
* @param keySerializer the key {@link Serializer}.
* @param valueSerializer the value {@link Serializer}.
*/
public DefaultKafkaProducerFactory(Map<String, Object> configs,
@Nullable Serializer<K> keySerializer,
@Nullable Serializer<V> valueSerializer) {

this.configs = new HashMap<>(configs);
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;

String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (StringUtils.hasText(txId)) {
setTransactionIdPrefix(txId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be made final when calling from a CTOR. Perhaps add a protected getter if a subclass wants to sniff the value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there. See the change on line 186.

I'm OK to add getter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, I missed it because of all the other noise 😦 sorry.

if (LOGGER.isInfoEnabled()) {
LOGGER.info("If 'setTransactionIdPrefix()' is not going to be configured, " +
"an existing 'transactional.id' config with value: '" + txId +
"' will be suffixed with the number for concurrent transactions support.");
}
}
}

@Override
Expand All @@ -152,7 +167,7 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
}

/**
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked).
* The time to wait when physically closing the producer (when {@link #reset()} or {@link #destroy()} is invoked).
* Specified in seconds; default {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
* @param physicalCloseTimeout the timeout in seconds.
* @since 1.0.7
Expand All @@ -162,11 +177,13 @@ public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
}

/**
* Set the transactional.id prefix.
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config.
* By default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used as a prefix
* in the target producer configs.
* @param transactionIdPrefix the prefix.
* @since 1.3
*/
public void setTransactionIdPrefix(String transactionIdPrefix) {
public final void setTransactionIdPrefix(String transactionIdPrefix) {
Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");
this.transactionIdPrefix = transactionIdPrefix;
enableIdempotentBehaviour();
Expand All @@ -177,8 +194,8 @@ public void setTransactionIdPrefix(String transactionIdPrefix) {
*/
private void enableIdempotentBehaviour() {
Object previousValue = this.configs.putIfAbsent(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
if (logger.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
logger.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG +
if (LOGGER.isDebugEnabled() && Boolean.FALSE.equals(previousValue)) {
LOGGER.debug("The '" + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG +
"' is set to false, may result in duplicate messages");
}
}
Expand Down Expand Up @@ -233,13 +250,13 @@ public void destroy() {
producerToClose.delegate.close(this.physicalCloseTimeout);
}
catch (Exception e) {
logger.error("Exception while closing producer", e);
LOGGER.error("Exception while closing producer", e);
}
producerToClose = this.cache.poll();
}
synchronized (this.consumerProducers) {
this.consumerProducers.forEach(
(k, v) -> v.delegate.close(this.physicalCloseTimeout));
(k, v) -> v.delegate.close(this.physicalCloseTimeout));
this.consumerProducers.clear();
}
}
Expand All @@ -251,47 +268,13 @@ public void onApplicationEvent(ContextStoppedEvent event) {
}
}

/**
* NoOp.
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
*/
@Deprecated
public void start() {
// NOSONAR
}

/**
* NoOp.
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented;
* use {@link #reset()} to close the {@link Producer}(s).
*/
@Deprecated
public void stop() {
reset();
}

/**
* Close the {@link Producer}(s) and clear the cache of transactional
* {@link Producer}(s).
* @since 2.2
*/
public void reset() {
try {
destroy();
}
catch (Exception e) {
logger.error("Exception while closing producer", e);
}
}

/**
* NoOp.
* @return always true.
* @deprecated {@link org.springframework.context.Lifecycle} is no longer implemented.
*/
@Deprecated
public boolean isRunning() {
return true;
destroy();
}

@Override
Expand All @@ -307,7 +290,7 @@ public Producer<K, V> createProducer() {
if (this.producer == null) {
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
this.producer = new CloseSafeProducer<>(createKafkaProducer());
}
}
}
Expand All @@ -320,7 +303,7 @@ public Producer<K, V> createProducer() {
* @return the producer.
*/
protected Producer<K, V> createKafkaProducer() {
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
return new KafkaProducer<>(this.configs, this.keySerializer, this.valueSerializer);
}

Producer<K, V> createTransactionalProducerForPartition() {
Expand Down Expand Up @@ -370,13 +353,15 @@ protected Producer<K, V> createTransactionalProducer() {
}
}

private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix,
@Nullable Consumer<CloseSafeProducer<K, V>> remover) {

Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
newProducer = new KafkaProducer<>(newProducerConfigs, this.keySerializer, this.valueSerializer);
newProducer.initTransactions();
return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
return new CloseSafeProducer<>(newProducer, this.cache, remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}

Expand Down Expand Up @@ -471,15 +456,15 @@ public void initTransactions() {

@Override
public void beginTransaction() throws ProducerFencedException {
if (logger.isDebugEnabled()) {
logger.debug("beginTransaction: " + this);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("beginTransaction: " + this);
}
try {
this.delegate.beginTransaction();
}
catch (RuntimeException e) {
if (logger.isErrorEnabled()) {
logger.error("beginTransaction failed: " + this, e);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("beginTransaction failed: " + this, e);
}
this.txFailed = true;
throw e;
Expand All @@ -495,15 +480,15 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs

@Override
public void commitTransaction() throws ProducerFencedException {
if (logger.isDebugEnabled()) {
logger.debug("commitTransaction: " + this);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("commitTransaction: " + this);
}
try {
this.delegate.commitTransaction();
}
catch (RuntimeException e) {
if (logger.isErrorEnabled()) {
logger.error("commitTransaction failed: " + this, e);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("commitTransaction failed: " + this, e);
}
this.txFailed = true;
throw e;
Expand All @@ -512,15 +497,15 @@ public void commitTransaction() throws ProducerFencedException {

@Override
public void abortTransaction() throws ProducerFencedException {
if (logger.isDebugEnabled()) {
logger.debug("abortTransaction: " + this);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("abortTransaction: " + this);
}
try {
this.delegate.abortTransaction();
}
catch (RuntimeException e) {
if (logger.isErrorEnabled()) {
logger.error("Abort failed: " + this, e);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Abort failed: " + this, e);
}
this.txFailed = true;
throw e;
Expand All @@ -543,9 +528,10 @@ public void close(long timeout, @Nullable TimeUnit unit) {
public void close(@Nullable Duration timeout) {
if (this.cache != null) {
if (this.txFailed) {
if (logger.isWarnEnabled()) {
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
+ "broker restarted during transaction: " + this);
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Error during transactional operation; producer removed from cache; possible " +
"cause: "
+ "broker restarted during transaction: " + this);
}
if (timeout == null) {
this.delegate.close();
Expand Down
Loading