Skip to content

Conversation

fcfangcc
Copy link
Contributor

@fcfangcc fcfangcc commented Mar 5, 2025

Description

When using the send_and_wait method, a TypeError occurs:
TypeError: original_send() got multiple values for argument 'headers'

This happens because of the following lines in the OpenTelemetry AIoKafka instrumentation code:
[opentelemetry-instrumentation-aiokafka/utils.py#L263-L266](

headers = _extract_send_headers(args, kwargs)
if headers is None:
headers = []
kwargs["headers"] = headers
).

If headers is present in args[5], these lines modify kwargs, causing headers to appear in both args and kwargs, leading to the error.

Example code triggering the issue(run with otel):

import asyncio

from aiokafka import AIOKafkaProducer


async def test():
    producer = AIOKafkaProducer(
        bootstrap_servers='bootstrap_servers',
    )
    await producer.start()
    await producer.send_and_wait('topic', value=b'test', headers=[])  # that's ok
    await producer.send_and_wait('topic', value=b'test')  # error here


asyncio.run(test())

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

  • test_wrap_send_with_headers_as_args

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

Copy link

linux-foundation-easycla bot commented Mar 5, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@dimastbk
Copy link
Contributor

Hi. I suggest add smth to tests:

    async def test_send_and_wait(self) -> None:
        AIOKafkaInstrumentor().uninstrument()
        AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

        producer = await self.producer_factory()
        add_message_mock: mock.AsyncMock = (
            producer._message_accumulator.add_message
        )
        add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]

        tracer = self.tracer_provider.get_tracer(__name__)
        with tracer.start_as_current_span("test_span") as span:
            await producer.send_and_wait("topic_1", b"value_1")

        add_message_mock.assert_awaited_with(
            TopicPartition(topic="topic_1", partition=1),
            None,
            b"value_1",
            40.0,
            timestamp_ms=None,
            headers=[("traceparent", mock.ANY)],
        )
        assert (
            add_message_mock.call_args_list[0]
            .kwargs["headers"][0][1]
            .startswith(
                f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
            )
        )

        await producer.send_and_wait("topic_2", b"value_2")
        add_message_mock.assert_awaited_with(
            TopicPartition(topic="topic_2", partition=1),
            None,
            b"value_2",
            40.0,
            timestamp_ms=None,
            headers=[("traceparent", mock.ANY)],
        )

@fcfangcc
Copy link
Contributor Author

@dimastbk Thank you for your suggestion. This test is more reasonable

@xrmx xrmx moved this to Ready for review in @xrmx's Python PR digest Apr 9, 2025
@fcfangcc fcfangcc requested a review from a team as a code owner April 10, 2025 06:05
@xrmx xrmx enabled auto-merge (squash) April 10, 2025 10:17
@xrmx xrmx merged commit bb85f98 into open-telemetry:main Apr 10, 2025
720 checks passed
@github-project-automation github-project-automation bot moved this from Ready for review to Done in @xrmx's Python PR digest Apr 10, 2025
@fcfangcc fcfangcc deleted the fix-aiokafka branch April 11, 2025 02:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

4 participants