From 05679f45efe1ec487713a1617884e3acb57ec50c Mon Sep 17 00:00:00 2001 From: Rajashree Mandaogane Date: Wed, 1 Feb 2023 18:20:35 -0800 Subject: [PATCH 1/2] Allow Kafka producer headers to be dict or list --- CHANGELOG.md | 2 ++ .../instrumentation/confluent_kafka/utils.py | 7 ++++++- .../tests/test_instrumentation.py | 16 ++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 242dd4935c..2d8ca3e2df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512)) - Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer ([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656)) +- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list + ([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655)) ## Version 1.15.0/0.36b0 (2022-12-10) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index ea304c81d3..89fad19c1b 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: if value: value = value.encode() - carrier.append((key, value)) + + if isinstance(carrier, list): + carrier.append((key, value)) + + if isinstance(carrier, dict): + carrier[key] = value _kafka_getter = KafkaContextGetter() diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index d78d128760..aeccd96020 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -23,6 +23,9 @@ ProxiedConsumer, ProxiedProducer, ) +from opentelemetry.instrumentation.confluent_kafka.utils import ( + KafkaContextSetter, +) class TestConfluentKafka(TestCase): @@ -73,3 +76,16 @@ def test_consumer_commit_method_exists(self) -> None: consumer = instrumentation.instrument_consumer(consumer) self.assertEqual(consumer.__class__, ProxiedConsumer) self.assertTrue(hasattr(consumer, "commit")) + + def test_context_setter(self) -> None: + context_setter = KafkaContextSetter() + + carrier_dict = {"key1": "val1"} + context_setter.set(carrier_dict, "key2", "val2") + self.assertGreaterEqual( + carrier_dict.items(), {"key2": "val2".encode()}.items() + ) + + carrier_list = [("key1", "val1")] + context_setter.set(carrier_list, "key2", "val2") + self.assertTrue(("key2", "val2".encode()) in carrier_list) From 6fe759b76ca5144428d956572383f265c2200fb7 Mon Sep 17 00:00:00 2001 From: Rajashree Mandaogane Date: Fri, 17 Feb 2023 16:08:11 -0800 Subject: [PATCH 2/2] modify kafka context getter helper methods to work on dict and list --- CHANGELOG.md | 4 ++-- .../instrumentation/confluent_kafka/utils.py | 14 ++++++++++++-- .../tests/test_instrumentation.py | 15 +++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d8ca3e2df..631ffb7075 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix Flask instrumentation to only close the span if it was created by the same thread. ([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654)) +- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list + ([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655)) ## Version 1.16.0/0.37b0 (2023-02-17) @@ -61,8 +63,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512)) - Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer ([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656)) -- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list - ([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655)) ## Version 1.15.0/0.36b0 (2022-12-10) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 89fad19c1b..77fce03cd8 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -41,16 +41,26 @@ class KafkaContextGetter(textmap.Getter): def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]: if carrier is None: return None - for item_key, value in carrier: + + carrier_items = carrier + if isinstance(carrier, dict): + carrier_items = carrier.items() + + for item_key, value in carrier_items: if item_key == key: if value is not None: return [value.decode()] + return None def keys(self, carrier: textmap.CarrierT) -> List[str]: if carrier is None: return [] - return [key for (key, value) in carrier] + + carrier_items = carrier + if isinstance(carrier, dict): + carrier_items = carrier.items() + return [key for (key, value) in carrier_items] class KafkaContextSetter(textmap.Setter): diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index aeccd96020..1e3f304188 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -24,6 +24,7 @@ ProxiedProducer, ) from opentelemetry.instrumentation.confluent_kafka.utils import ( + KafkaContextGetter, KafkaContextSetter, ) @@ -89,3 +90,17 @@ def test_context_setter(self) -> None: carrier_list = [("key1", "val1")] context_setter.set(carrier_list, "key2", "val2") self.assertTrue(("key2", "val2".encode()) in carrier_list) + + def test_context_getter(self) -> None: + context_setter = KafkaContextSetter() + context_getter = KafkaContextGetter() + + carrier_dict = {} + context_setter.set(carrier_dict, "key1", "val1") + self.assertEqual(context_getter.get(carrier_dict, "key1"), ["val1"]) + self.assertEqual(["key1"], context_getter.keys(carrier_dict)) + + carrier_list = [] + context_setter.set(carrier_list, "key1", "val1") + self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"]) + self.assertEqual(["key1"], context_getter.keys(carrier_list))