Skip to content

PYTHON-4864 - Create async version of SpecRunnerThread #2094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
client_knobs,
unittest,
)
from test.asynchronous.utils_spec_runner import SpecRunnerTask
from test.unified_format_shared import (
KMS_TLS_OPTS,
PLACEHOLDER_MAP,
Expand All @@ -58,7 +59,6 @@
snake_to_camel,
wait_until,
)
from test.utils_spec_runner import SpecRunnerThread
from test.version import Version
from typing import Any, Dict, List, Mapping, Optional

Expand Down Expand Up @@ -382,8 +382,8 @@ async def drop(self: AsyncGridFSBucket, *args: Any, **kwargs: Any) -> None:
return
elif entity_type == "thread":
name = spec["id"]
thread = SpecRunnerThread(name)
thread.start()
thread = SpecRunnerTask(name)
await thread.start()
self[name] = thread
return

Expand Down Expand Up @@ -1177,16 +1177,16 @@ def primary_changed() -> bool:

wait_until(primary_changed, "change primary", timeout=timeout)

def _testOperation_runOnThread(self, spec):
async def _testOperation_runOnThread(self, spec):
"""Run the 'runOnThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
await thread.schedule(functools.partial(self.run_entity_operation, spec["operation"]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we running any async unified tests that use runOnThread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDAM unified tests are the only ones that use runOnThread. Those are currently slated to be converted to async, yes.


def _testOperation_waitForThread(self, spec):
async def _testOperation_waitForThread(self, spec):
"""Run the 'waitForThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.stop()
thread.join(10)
await thread.stop()
await thread.join(10)
if thread.exc:
raise thread.exc
self.assertFalse(thread.is_alive(), "Thread {} is still running".format(spec["thread"]))
Expand Down
59 changes: 45 additions & 14 deletions test/asynchronous/utils_spec_runner.py
Copy link
Contributor

@sleepyStick sleepyStick Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not loving the duplicated code between sync and async but i'm guessing its because the async version needs some more methods? If so, then i understand and can live with it >.<

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async version doesn't implement threading.Thread, but it still needs to match the same API as the synchronous version. Let me see if I can reduce some of the duplication though.

Copy link
Contributor Author

@NoahStapp NoahStapp Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some refactoring, much less duplication now. Great call-out!

Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,77 @@
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.cursor import AsyncCursor
from pymongo.errors import AutoReconnect, BulkWriteError, OperationFailure, PyMongoError
from pymongo.lock import _async_cond_wait, _async_create_condition, _async_create_lock
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.results import BulkWriteResult, _WriteResult
from pymongo.write_concern import WriteConcern

_IS_SYNC = False

if _IS_SYNC:
PARENT = threading.Thread
else:
PARENT = object

class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()

class ConcurrentRunner(PARENT):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to utils.py since it will be used by non-spec tests.

Copy link
Contributor Author

@NoahStapp NoahStapp Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since utils.py is not currently synchro'd, let's delay that move to PYTHON-5113, it'll involve a lot of git changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay then what about helpers.py?

def __init__(self, name, *args, **kwargs):
if _IS_SYNC:
super().__init__(*args, **kwargs)
self.name = name
self.stopped = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopped is never set to True by this class.

self.task = None
if "target" in kwargs:
self.target = kwargs["target"]

if not _IS_SYNC:

async def start(self):
self.task = asyncio.create_task(self.run(), name=self.name)

async def join(self, timeout: float | None = 0): # type: ignore[override]
if self.task is not None:
await asyncio.wait([self.task], timeout=timeout)

def is_alive(self):
return not self.stopped

async def run(self):
if self.target:
await self.target()


class SpecRunnerTask(ConcurrentRunner):
def __init__(self, name):
super().__init__(name)
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.cond = _async_create_condition(_async_create_lock())
self.ops = []
self.stopped = False

def schedule(self, work):
async def schedule(self, work):
self.ops.append(work)
with self.cond:
async with self.cond:
self.cond.notify()

def stop(self):
async def stop(self):
self.stopped = True
with self.cond:
async with self.cond:
self.cond.notify()

def run(self):
async def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
async with self.cond:
await _async_cond_wait(self.cond, 10)
if self.ops:
try:
work = self.ops.pop(0)
work()
await work()
except Exception as exc:
self.exc = exc
self.stop()
await self.stop()


class AsyncSpecTestCreator:
Expand Down
2 changes: 1 addition & 1 deletion test/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ def primary_changed() -> bool:
def _testOperation_runOnThread(self, spec):
"""Run the 'runOnThread' operation."""
thread = self.entity_map[spec["thread"]]
thread.schedule(lambda: self.run_entity_operation(spec["operation"]))
thread.schedule(functools.partial(self.run_entity_operation, spec["operation"]))

def _testOperation_waitForThread(self, spec):
"""Run the 'waitForThread' operation."""
Expand Down
43 changes: 37 additions & 6 deletions test/utils_spec_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from gridfs import GridFSBucket
from gridfs.synchronous.grid_file import GridFSBucket
from pymongo.errors import AutoReconnect, BulkWriteError, OperationFailure, PyMongoError
from pymongo.lock import _cond_wait, _create_condition, _create_lock
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
from pymongo.results import BulkWriteResult, _WriteResult
Expand All @@ -54,16 +55,46 @@

_IS_SYNC = True

if _IS_SYNC:
PARENT = threading.Thread
else:
PARENT = object

class SpecRunnerThread(threading.Thread):
def __init__(self, name):
super().__init__()

class ConcurrentRunner(PARENT):
def __init__(self, name, *args, **kwargs):
if _IS_SYNC:
super().__init__(*args, **kwargs)
self.name = name
self.stopped = False
self.task = None
if "target" in kwargs:
self.target = kwargs["target"]

if not _IS_SYNC:

def start(self):
self.task = asyncio.create_task(self.run(), name=self.name)

def join(self, timeout: float | None = 0): # type: ignore[override]
if self.task is not None:
asyncio.wait([self.task], timeout=timeout)

def is_alive(self):
return not self.stopped

def run(self):
if self.target:
self.target()


class SpecRunnerThread(ConcurrentRunner):
def __init__(self, name):
super().__init__(name)
self.exc = None
self.daemon = True
self.cond = threading.Condition()
self.cond = _create_condition(_create_lock())
self.ops = []
self.stopped = False

def schedule(self, work):
self.ops.append(work)
Expand All @@ -79,7 +110,7 @@ def run(self):
while not self.stopped or self.ops:
if not self.ops:
with self.cond:
self.cond.wait(10)
_cond_wait(self.cond, 10)
if self.ops:
try:
work = self.ops.pop(0)
Expand Down
1 change: 1 addition & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"_async_create_lock": "_create_lock",
"_async_create_condition": "_create_condition",
"_async_cond_wait": "_cond_wait",
"SpecRunnerTask": "SpecRunnerThread",
"AsyncMockConnection": "MockConnection",
"AsyncMockPool": "MockPool",
}
Expand Down
Loading