@@ -28,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer
2828import org .apache .kafka .clients .producer .ProducerRecord
2929import org .apache .kafka .clients .producer .RecordMetadata
3030import org .apache .kafka .common .PartitionInfo
31- import org .apache .kafka .common .errors .SerializationException
3231import org .apache .samza .system .OutgoingMessageEnvelope
3332import org .apache .samza .system .SystemProducer
3433import org .apache .samza .system .SystemProducerException
@@ -46,32 +45,30 @@ class KafkaSystemProducer(systemName: String,
4645
4746 // Represents a fatal error that caused the producer to close.
4847 val fatalException : AtomicReference [SystemProducerException ] = new AtomicReference [SystemProducerException ]()
49- @ volatile var producer : Producer [Array [Byte ], Array [Byte ]] = null
50- val producerLock : Object = new Object
48+ val producerRef : AtomicReference [Producer [Array [Byte ], Array [Byte ]]] = new AtomicReference [Producer [Array [Byte ], Array [Byte ]]]()
49+ val producerCreationLock : Object = new Object
50+ @ volatile var stopped = false
5151
5252 def start (): Unit = {
53- producer = getProducer()
53+ producerRef.set( getProducer() )
5454 }
5555
5656 def stop () {
5757 info(" Stopping producer for system: " + this .systemName)
5858
59- // stop() should not happen often so no need to optimize locking
60- producerLock. synchronized {
61- try {
62- if (producer != null ) {
63- producer .close // Also performs the equivalent of a flush()
64- }
59+ stopped = true
60+ val currentProducer = producerRef.getAndSet( null )
61+ try {
62+ if (currentProducer != null ) {
63+ currentProducer .close // Also performs the equivalent of a flush()
64+ }
6565
66- val exception = fatalException.get()
67- if (exception != null ) {
68- error(" Observed an earlier send() error while closing producer" , exception)
69- }
70- } catch {
71- case e : Exception => error(" Error while closing producer for system: " + systemName, e)
72- } finally {
73- producer = null
66+ val exception = fatalException.get()
67+ if (exception != null ) {
68+ error(" Observed an earlier send() error while closing producer" , exception)
7469 }
70+ } catch {
71+ case e : Exception => error(" Error while closing producer for system: " + systemName, e)
7572 }
7673 }
7774
@@ -82,7 +79,7 @@ class KafkaSystemProducer(systemName: String,
8279 trace(" Enqueuing message: %s, %s." format (source, envelope))
8380
8481 val topicName = envelope.getSystemStream.getStream
85- if (topicName == null || topicName == " " ) {
82+ if (topicName == null || topicName.isEmpty ) {
8683 throw new IllegalArgumentException (" Invalid system stream: " + envelope.getSystemStream)
8784 }
8885
@@ -92,10 +89,7 @@ class KafkaSystemProducer(systemName: String,
9289 throw new SystemProducerException (" Producer was unable to recover from previous exception." , globalProducerException)
9390 }
9491
95- val currentProducer = producer
96- if (currentProducer == null ) {
97- throw new SystemProducerException (" Kafka producer is null." )
98- }
92+ val currentProducer = getOrCreateCurrentProducer
9993
10094 // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners
10195 // Any kind of custom partitioning has to be done on the client-side
@@ -115,7 +109,7 @@ class KafkaSystemProducer(systemName: String,
115109 val producerException = new SystemProducerException (" Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
116110 .format(source, systemName, topicName, partitionKey), exception)
117111
118- handleSendException (currentProducer, producerException, true )
112+ handleFatalSendException (currentProducer, producerException)
119113 }
120114 }
121115 })
@@ -125,18 +119,25 @@ class KafkaSystemProducer(systemName: String,
125119 val producerException = new SystemProducerException (" Failed to send message for Source: %s on System:%s Topic:%s Partition:%s"
126120 .format(source, systemName, topicName, partitionKey), originalException)
127121
128- handleSendException(currentProducer, producerException, isFatalException(originalException))
122+ metrics.sendFailed.inc
123+ error(" Got a synchronous error from Kafka producer." , producerException)
124+ // Synchronous exceptions are always recoverable so propagate it up and let the user decide
129125 throw producerException
130126 }
131127 }
132128
133-
134129 def flush (source : String ) {
135130 updateTimer(metrics.flushNs) {
136131 metrics.flushes.inc
137132
138- val currentProducer = producer
133+ val currentProducer = producerRef.get()
139134 if (currentProducer == null ) {
135+ if (dropProducerExceptions) {
136+ // No producer to flush, but we're ignoring exceptions so just return.
137+ warn(" Skipping flush because the Kafka producer is null." )
138+ metrics.flushFailed.inc
139+ return
140+ }
140141 throw new SystemProducerException (" Kafka producer is null." )
141142 }
142143
@@ -162,7 +163,14 @@ class KafkaSystemProducer(systemName: String,
162163 }
163164
164165
165- private def handleSendException (currentProducer : Producer [Array [Byte ], Array [Byte ]], producerException : SystemProducerException , isFatalException : Boolean ) = {
166+ /**
167+ * Handles a fatal exception by closing the producer and either recreating it or storing the exception
168+ * to rethrow later, depending on the value of dropProducerExceptions.
169+ *
170+ * @param currentProducer the current producer for which the exception occurred. Must not be null.
171+ * @param producerException the exception to handle.
172+ */
173+ private def handleFatalSendException (currentProducer : Producer [Array [Byte ], Array [Byte ]], producerException : SystemProducerException ): Unit = {
166174 metrics.sendFailed.inc
167175 error(producerException)
168176 // The SystemProducer API is synchronous, so there's no way for us to guarantee that an exception will
@@ -172,49 +180,56 @@ class KafkaSystemProducer(systemName: String,
172180 if (dropProducerExceptions) {
173181 warn(" Ignoring producer exception. All messages in the failed producer request will be dropped!" )
174182
175- if (isFatalException) {
176- producerLock.synchronized {
177- // Prevent each callback from recreating producer for the same failure.
178- if (currentProducer == producer) {
179- info(" Creating a new producer for system %s." format systemName)
180- try {
181- currentProducer.close(0 , TimeUnit .MILLISECONDS )
182- } catch {
183- case exception : Exception => error(" Exception while closing producer." , exception)
184- }
185- producer = getProducer()
186- }
187- }
188- }
189- } else {
190- // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
191- // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
192- // This works because there is only 1 callback thread and no sends can complete until the callback returns.
193- if (isFatalException) {
194- fatalException.compareAndSet(null , producerException)
183+ // Prevent each callback from closing and nulling producer for the same failure.
184+ if (currentProducer == producerRef.get()) {
185+ info(" Closing producer for system %s." format systemName)
195186 try {
187+ // send()s can get ProducerClosedException if the producer is stopped after they get the currentProducer
188+ // reference but before producer.send() returns. That's ONLY ok when dropProducerExceptions is true.
189+ // Also, when producer.close(0) is invoked on the Kafka IO thread, when it returns there will be no more
190+ // messages sent over the wire. This is key to ensuring no out-of-order messages as a result of recreating
191+ // the producer.
196192 currentProducer.close(0 , TimeUnit .MILLISECONDS )
197193 } catch {
198194 case exception : Exception => error(" Exception while closing producer." , exception)
199195 }
196+ producerRef.compareAndSet(currentProducer, null )
197+ }
198+ } else {
199+ // If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
200+ // Close producer to ensure messages queued in-flight are not sent and hence, avoid re-ordering
201+ // This works because there is only 1 IO thread and no IO can be done until the callback returns.
202+ // Do not create a new producer here! It cannot be done without data loss for all concurrency modes.
203+ fatalException.compareAndSet(null , producerException)
204+ try {
205+ currentProducer.close(0 , TimeUnit .MILLISECONDS )
206+ } catch {
207+ case exception : Exception => error(" Exception while closing producer." , exception)
200208 }
201209 }
202210 }
203211
204212 /**
205- * A fatal exception is one that corrupts the producer or otherwise makes it unusable.
206- * We want to handle non-fatal exceptions differently because they can often be handled by the user
207- * and that's preferable because it gives users that drop exceptions a way to do that with less
208- * data loss (no collateral damage from batches of messages getting dropped)
209- *
210- * @param exception the exception to check
211- * @return true if the exception is unrecoverable.
213+ * @return the current producer. Never returns null.
212214 */
213- private def isFatalException (exception : Exception ): Boolean = {
214- exception match {
215- case _ : SerializationException => false
216- case _ : ClassCastException => false
217- case _ => true
215+ private def getOrCreateCurrentProducer = {
216+ var currentProducer = producerRef.get
217+
218+ if (currentProducer == null ) {
219+ if (dropProducerExceptions && ! stopped) {
220+ // Note: While this lock prevents others from creating a new producer, they could still set it to null.
221+ producerCreationLock.synchronized {
222+ currentProducer = producerRef.get
223+ if (currentProducer == null ) {
224+ currentProducer = getProducer()
225+ producerRef.set(currentProducer)
226+ }
227+ }
228+ // Invariant: currentProducer must not be null at this point.
229+ } else {
230+ throw new SystemProducerException (" Kafka producer is null." )
231+ }
218232 }
233+ currentProducer
219234 }
220235}
0 commit comments