From 2e5325f12fce77e05b78e26511d4878725e26656 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 14 Jun 2022 09:41:31 +0200 Subject: [PATCH 1/3] add some logs --- python/hsfs/engine/python.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 9548b903a8..041fce6af1 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -716,6 +716,12 @@ def _write_dataframe_kafka( # setup row writer function writer = self._get_encoder_func(feature_group._get_encoded_avro_schema()) + def acked(err, msg): + if err is not None: + print("Failed to deliver message: %s: %s" % (str(msg), str(err))) + + print(feature_group._online_topic_name) + # loop over rows for r in dataframe.itertuples(index=False): # itertuples returns Python NamedTyple, to be able to serialize it using @@ -746,7 +752,10 @@ def _write_dataframe_kafka( # produce producer.produce( - topic=feature_group._online_topic_name, key=key, value=encoded_row + topic=feature_group._online_topic_name, + key=key, + value=encoded_row, + callback=acked, ) # Trigger internal callbacks to empty op queue From 901a1dfcd8d74af7bd88fba2344e9698c6f36d1d Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 14 Jun 2022 15:59:49 +0200 Subject: [PATCH 2/3] handle BufferError --- python/hsfs/engine/python.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 041fce6af1..2de607f9eb 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -750,16 +750,27 @@ def acked(err, msg): # assemble key key = "".join([str(row[pk]) for pk in sorted(feature_group.primary_key)]) - # produce - producer.produce( - topic=feature_group._online_topic_name, - key=key, - value=encoded_row, - callback=acked, - ) + while True: + # if BufferError is thrown, we can be sure, message hasn't been send so we retry + try: + # produce + producer.produce( + topic=feature_group._online_topic_name, + key=key, + value=encoded_row, + callback=acked + if offline_write_options.get("debug_kafka", False) + else None, + ) - # Trigger internal callbacks to empty op queue - producer.poll(0) + # Trigger internal callbacks to empty op queue + producer.poll(0) + break + except BufferError as e: + if offline_write_options.get("debug_kafka", False): + print("Caught: {}".format(e)) + # backoff for 1 second + producer.poll(1) # make sure producer blocks and everything is delivered producer.flush() @@ -804,12 +815,15 @@ def _get_encoder_func(self, writer_schema: str) -> callable: return lambda record, outf: writer.write(record, avro.io.BinaryEncoder(outf)) def _get_kafka_config(self, write_options: dict = {}) -> dict: + # producer configuration properties + # https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html config = { "security.protocol": "SSL", "ssl.ca.location": client.get_instance()._get_ca_chain_path(), "ssl.certificate.location": client.get_instance()._get_client_cert_path(), "ssl.key.location": client.get_instance()._get_client_key_path(), "client.id": socket.gethostname(), + **write_options.get("kafka_producer_config", {}), } if isinstance(client.get_instance(), hopsworks.Client) or write_options.get( @@ -832,5 +846,4 @@ def _get_kafka_config(self, write_options: dict = {}) -> dict: ) ] ) - return config From ecda33cbe587659989cf04f7ff59f9dd4b8b88b4 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 14 Jun 2022 16:18:08 +0200 Subject: [PATCH 3/3] remove leftover print --- python/hsfs/engine/python.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 2de607f9eb..3798aaf3f5 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -720,8 +720,6 @@ def acked(err, msg): if err is not None: print("Failed to deliver message: %s: %s" % (str(msg), str(err))) - print(feature_group._online_topic_name) - # loop over rows for r in dataframe.itertuples(index=False): # itertuples returns Python NamedTyple, to be able to serialize it using