@@ -724,6 +724,10 @@ def _write_dataframe_kafka(
724
724
# setup row writer function
725
725
writer = self ._get_encoder_func (feature_group ._get_encoded_avro_schema ())
726
726
727
+ def acked (err , msg ):
728
+ if err is not None :
729
+ print ("Failed to deliver message: %s: %s" % (str (msg ), str (err )))
730
+
727
731
# loop over rows
728
732
for r in dataframe .itertuples (index = False ):
729
733
# itertuples returns Python NamedTyple, to be able to serialize it using
@@ -752,13 +756,27 @@ def _write_dataframe_kafka(
752
756
# assemble key
753
757
key = "" .join ([str (row [pk ]) for pk in sorted (feature_group .primary_key )])
754
758
755
- # produce
756
- producer .produce (
757
- topic = feature_group ._online_topic_name , key = key , value = encoded_row
758
- )
759
+ while True :
760
+ # if BufferError is thrown, we can be sure, message hasn't been send so we retry
761
+ try :
762
+ # produce
763
+ producer .produce (
764
+ topic = feature_group ._online_topic_name ,
765
+ key = key ,
766
+ value = encoded_row ,
767
+ callback = acked
768
+ if offline_write_options .get ("debug_kafka" , False )
769
+ else None ,
770
+ )
759
771
760
- # Trigger internal callbacks to empty op queue
761
- producer .poll (0 )
772
+ # Trigger internal callbacks to empty op queue
773
+ producer .poll (0 )
774
+ break
775
+ except BufferError as e :
776
+ if offline_write_options .get ("debug_kafka" , False ):
777
+ print ("Caught: {}" .format (e ))
778
+ # backoff for 1 second
779
+ producer .poll (1 )
762
780
763
781
# make sure producer blocks and everything is delivered
764
782
producer .flush ()
@@ -803,12 +821,15 @@ def _get_encoder_func(self, writer_schema: str) -> callable:
803
821
return lambda record , outf : writer .write (record , avro .io .BinaryEncoder (outf ))
804
822
805
823
def _get_kafka_config (self , write_options : dict = {}) -> dict :
824
+ # producer configuration properties
825
+ # https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
806
826
config = {
807
827
"security.protocol" : "SSL" ,
808
828
"ssl.ca.location" : client .get_instance ()._get_ca_chain_path (),
809
829
"ssl.certificate.location" : client .get_instance ()._get_client_cert_path (),
810
830
"ssl.key.location" : client .get_instance ()._get_client_key_path (),
811
831
"client.id" : socket .gethostname (),
832
+ ** write_options .get ("kafka_producer_config" , {}),
812
833
}
813
834
814
835
if isinstance (client .get_instance (), hopsworks .Client ) or write_options .get (
@@ -831,5 +852,4 @@ def _get_kafka_config(self, write_options: dict = {}) -> dict:
831
852
)
832
853
]
833
854
)
834
-
835
855
return config
0 commit comments