Skip to content

Segmentation Fault on Windows when using asyncio.to_thread on consumer with test container #1953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
trajano opened this issue Mar 30, 2025 · 12 comments

Comments

@trajano
Copy link

trajano commented Mar 30, 2025

When doing

async def kafka_consumer():
  while True:
    message = await asyncio.to_thread(consumer.poll,  timeout=timeout)

If I trigger a cancellation on kafka_consumer, on Windows I get a Segmentation Fault.

I have this workaround for now

async def _poll_message(consumer: Consumer, timeout:float = 1.0) -> None | Message:
    if sys.platform == "win32":
        return consumer.poll(timeout=0.200)
    else:
        return asyncio.to_thread(consumer.poll,  timeout=timeout)

UPDATE: here's how I start up Kafka locally in a docker compose

services:
  kafka:
    image: confluentinc/cp-kafka
    ports:
      - "9092:9092"
    environment:
      CLUSTER_ID: 1
      KAFKA_NODE_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9091,PLAINTEXT_HOST://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_LOG4J_OPTS: "-Dlog4j.rootLogger=WARN,stdout"
@rohitsanj
Copy link

Hi @trajano, it appears the issue might be coming from the asyncio library and not confluent-kafka-python. Upon a quick search through https://github.com/python/cpython issues, I found this resolved issue python/cpython#123321 which seems to be related.

What Python version are you running? One of the later patch versions of 3.12 (unsure which) and all version of 3.13 should have this issue fixed.

@trajano
Copy link
Author

trajano commented Apr 7, 2025

I'm on windows and Python 3.13.2 installed via scoop.

@rohitsanj
Copy link

How are you triggering the cancellation?

Just to try and rule out confluent-kafka-python's involvement, can you try running this script to see if the causes the same behavior?

The script schedules a time.sleep asyncio task using asyncio.to_thread and we then try to cancel the asyncio task.

import asyncio
import time
import sys

async def sleep_in_thread(seconds: float):
    print("Sleeping for", seconds, "seconds...")
    await asyncio.to_thread(time.sleep, seconds)
    print("Finished sleeping")

async def main():
    task = asyncio.create_task(sleep_in_thread(10))

    await asyncio.sleep(3)
    print("Cancelling the task...")
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

if __name__ == "__main__":
    asyncio.run(main())

@trajano
Copy link
Author

trajano commented Apr 7, 2025

no error on your code. Putting it in a test as well works.

import asyncio
import time
import sys

async def sleep_in_thread(seconds: float):
    print("Sleeping for", seconds, "seconds...")
    await asyncio.to_thread(time.sleep, seconds)
    print("Finished sleeping")

async def main():
    task = asyncio.create_task(sleep_in_thread(10))

    await asyncio.sleep(3)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

def test_sample():
    asyncio.run(main())

@rohitsanj
Copy link

Thanks for running the snippet, I'm more inclined to think that this is an issue with the confluent-kafka-python library.

BTW I took another look at your workaround and wanted to point out that the call to consumer.poll in the async function will block the event loop. You probably want to use something like loop.run_in_executor(..).

Can you try adding {'debug': 'all'} to the consumer configuration and share the logs here for us to review? Thanks.

@trajano
Copy link
Author

trajano commented Apr 7, 2025

block the event loop.
yes I know it would the idea was to use asyncio.to_thread but that caused a crash but I created a new project with all the latest updates and I can't seem to recreate it anymore.

I'll probably close this bug shortly.

@trajano trajano changed the title Segmentation Fault on Windows when using asyncio.to_thread on consumer Segmentation Fault on Windows when using asyncio.to_thread on consumer with test container Apr 7, 2025
@trajano
Copy link
Author

trajano commented Apr 7, 2025

Ok got it to fail, but with test container.

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def kafka_consumer(consumer, timeout=1.0):

    while True:
        msg  = await asyncio.to_thread(consumer.poll,  timeout=timeout)

        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

async def main(consumer):

    task = asyncio.create_task(kafka_consumer(consumer))

    await asyncio.sleep(10)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])


    await main(consumer)
    consumer.close() # it could be this one because I am closing after everything else 

Note test "passes" but will yield a segfault after.

@trajano
Copy link
Author

trajano commented Apr 7, 2025

So it appears to be with the test container example because this works

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest

async def kafka_consumer(consumer, timeout=1.0):

    while True:
        msg  = await asyncio.to_thread(consumer.poll,  timeout=timeout)

        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

async def main(consumer):

    task = asyncio.create_task(kafka_consumer(consumer))

    await asyncio.sleep(15)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

@pytest.mark.asyncio(loop_scope="function")
async def test_sample():
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])

    await main(consumer)
    consumer.close()

@trajano
Copy link
Author

trajano commented Apr 7, 2025

I'll close this one for now I am suspecting it's something on testcontainers rather than this project right now. I'll reopen if the testcontainers team pass the blame back.

@trajano trajano closed this as completed Apr 7, 2025
@rohitsanj
Copy link

I'm not sure if the synchronous fixture plays well with the async test.

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):

@trajano
Copy link
Author

trajano commented Apr 7, 2025

This works though

import asyncio
import time
import sys
from confluent_kafka import Consumer
import pytest
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="module")
def kafka_container():
    """Provide a temporary Redis container."""
    with KafkaContainer("mirror.gcr.io/confluentinc/cp-kafka:7.6.0") as container:
        wait_for_logs(container, "Awaiting socket connections on 0.0.0.0:9093")
        for topic in ["mytopic"]:
            container.exec(
                f"kafka-topics "
                f"--bootstrap-server localhost:9092 "
                f"--create "
                f"--topic {topic} "
                f"--partitions 3 "
                f"--replication-factor 1",
            )
        yield container

async def main(consumer):
    await asyncio.to_thread(consumer.poll,  timeout=10)

@pytest.mark.asyncio(loop_scope="function")
async def test_sample(kafka_container):
    consumer = Consumer({
        'bootstrap.servers': kafka_container.get_bootstrap_server(),
        'group.id': 'mygroup',
        'auto.offset.reset': 'earliest',
        "debug":"all"})
    consumer.subscribe(['mytopic'])

    await main(consumer)
    consumer.close()

@trajano
Copy link
Author

trajano commented Apr 7, 2025

The only thing I can think of why the blame would switch back to this project is if there was something in the C code that didn't release some sort of connection or thread. At that point I am not certain what to do :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants