Skip to content

Bug: Segmentation fault when using with confluent kafka consumer #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
trajano opened this issue Apr 7, 2025 · 1 comment
Open

Bug: Segmentation fault when using with confluent kafka consumer #805

trajano opened this issue Apr 7, 2025 · 1 comment

Comments

@trajano
Copy link

trajano commented Apr 7, 2025

Describe the bug

Creating an asyncio test case that uses the kafka container on completion of the test (the test itself is successful) the pytest environment segfaults.

To Reproduce

Provide a self-contained code snippet that illustrates the bug or unexpected behavior. Ideally, send a Pull Request to illustrate with a test that illustrates the problem.

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def kafka_consumer(consumer, timeout=1.0):

    while True:
        msg  = await asyncio.to_thread(consumer.poll,  timeout=timeout)

        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

async def main(consumer):

    task = asyncio.create_task(kafka_consumer(consumer))

    await asyncio.sleep(10)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])


    await main(consumer)
    consumer.close() 

Runtime environment

Provide a summary of your runtime environment. Which operating system, python version, and docker version are you using? What is the version of testcontainers-python you are using? You can run the following commands to get the relevant information.

[project]
name = "kafka-test"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "confluent-kafka>=2.9.0",
]

[dependency-groups]
dev = [
    "pytest>=8.3.5",
    "pytest-asyncio>=0.26.0",
    "testcontainers>=4.10.0",
]

More info

confluentinc/confluent-kafka-python#1953 (comment)

@trajano
Copy link
Author

trajano commented Apr 7, 2025

Without the asyncio cancellation it works fine

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def main(consumer):
    await asyncio.to_thread(consumer.poll,  timeout=10)

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])


    await main(consumer)
    consumer.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant