-
-
Notifications
You must be signed in to change notification settings - Fork 31.9k
Safe synchronous cancellation in asyncio #103486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
This feels very similar to gh-102847, which was just closed. Since that has much more discussion we should probably keep that one open instead? |
I think this issue shares some contexts with gh-102847, but targets more specific use case: safe synchronous cancellation. Could we reopen it, then it may be better to collect the discussion there. I'd love to see some improvements on the "canceller" side APIs and ergonomics as the OP says. |
I don't think that adding a new API is going to help much. Realistic use cases of cancellation (e.g. as used in task groups and the timeout context manager) wouldn't be able to use it. Most use cases are better off using something like timeout anyway. You may have made a case for updating the example, but I resent the suggestion that the author of the example "probably made a mistake". As you mention, in the example there is no way |
Let me describe the context where I encountered this question. I am building a system composed of multiple services that run in parallel in separate Python instances. To provide synchronization between the services where necessary, I use a shared database with a "locks" table, where the lock's name field, constructed from a resource's global id, is covered with a unique index. When a service takes a lock, it tries to create a row in this table, and to release the lock, the row is deleted. In addition, each lock is equipped with a watchdog timer, so that if a service bugs out, crashes or "forgets" to unlock the resource in any other way, the lock would time out by itself. When a service genuinely needs to hold the lock for a long time, however, it can periodically reset the watchdog, thus preventing it from timing out for as long as the service is actually functional. And so, what I wanted to have, is a context manager that takes care of both creating/removing the row and resetting the watchdog timer, by spawning an asyncio task for the duration of the "managed" code. An important caveat is that the "resetter" task must be stopped and joined before the row is removed; because, if the "resetter" discovers that the row has been removed and there is nothing to update anymore, it's treated as if the lock has timed out, the resource may already be in the middle of another process, and it's no longer safe to continue, so it raises an exception and interrupts the parent task. In the end, though, I implemented this behavior by constructing a TaskGroup from within the lock manager, calling its class LockConflict(RuntimeError):
pass
class LockLost(RuntimeError):
pass
class Lock:
def __init__(self, name):
self.name = name
self.nonce = random.randbytes(8)
self.task_group = None
self.resetter_task = None
async def take(self):
created_successfully = await db.transaction(
lambda c: db_try_create_lock(c, self.nonce, self.name),
)
if not created_successfully:
raise LockConflict(
f"Lock {self.name} is already taken. Are you trying to run "
f"multiple instances of a non-multiplicable service at once?"
)
async def touch(self):
updated_successfully = await db.transaction(
lambda c: db_update_lock_watchdog(c, self.nonce, self.name),
)
if not updated_successfully:
raise LockLost(f"Lock {self.name} timed out")
async def release(self):
await db.transaction(
lambda c: db_release_lock(c, self.nonce, self.name),
)
async def async_touch_loop(self):
while True:
await asyncio.sleep(settings.service_watchdog_timer_seconds // 2)
await self.touch()
async def __aenter__(self):
if self.task is not None:
raise RuntimeError('Lock context manager entered twice')
try:
await self.take()
self.task_group = await asyncio.TaskGroup().__aenter__()
self.resetter_task = self.task_group.create_task(self.async_touch_loop())
return self
except:
# handle KeyboardInterrupt during initialization
await self.release()
raise
async def __aexit__(self, *exc_info):
try:
self.resetter_task.cancel()
await self.task_group.__aexit__(None, None, None)
finally:
await self.release()
# example usage:
async with Lock(f"record:{my_record.id}"):
await process_record(my_record) So, in the end, it might indeed be better to just add a note to the |
The same suggestion from another person: https://discuss.python.org/t/asyncio-cancel-a-cancellation-utility-as-a-coroutine-this-time-with-feeling/26304 |
My talk at pycon.us 2022 https://youtu.be/XW7yv6HuWTE brought up this as an issue I see a lot at Meta. People screwing up cancelling tasks. I even wrote my own utility into the later: https://github.com/facebookincubator/later/blob/main/later/task.py#L59 Though looking at my implementation you can't cancel it if the task its trying to cancel won't finish :P So yeah this is a work in progress. Would be nice to have this in the stdlib. And I can't just tell developers to use taskgroups especially if they are interfacing with 3rd party code. |
I suspect this will need a PEP, since so many people have broken their heads over this (even @fried admits his solution isn't perfect. :-) See https://discuss.python.org/t/asyncio-cancel-a-cancellation-utility-as-a-coroutine-this-time-with-feeling/26304/3 (I'm not sure where we should discuss this -- here on GitHub, or there on Discourse.) |
Speaking from my experience (yes, it's limited), this is an issue that should be solved while dealing with the whole task life-cycle, i.e. on a level above the asyncio, not in the asyncio. Very closely related to cancellation are other problems I've encountered in my work: task monitoring (essential tasks may not exit or else let's shut down the whole app) and the shutdown management itself (task cancelling order, cleanup actions, etc). I have solved it "good enough" for my projects, i.e. far from being suitable for general use, but that work is the reason I'm now quite sure that a fully fledged tool for managing task and taking care of all the details, edge-cases etc. would create a complete asyncio add-on library and this particular issue belongs there. |
The root problem here that So what you all really need is a way to just wait for the task to finish. Something like task.cancel()
await task.wait_done()
For the record, I don't have any need for this. This is just some drive-by API design 😊 |
async def cancel_and_wait(task, msg=None):
task.cancel(msg)
try:
await task
except asyncio.CancelledError:
if asyncio.current_task().cancelling() == 0:
raise
else:
return # this is the only non-exceptional return
else:
raise RuntimeError("Cancelled task did not end with an exception") Isn't the [edit: there was a second half to this comment that had a stupid idea which I have now deleted] |
I think this would be the same as task.cancel()
await asyncio.wait([task]) After that, it's guaranteed both |
Very interesting, I think you're right. I had assumed that |
So it seems a safer example to the one in the cancel() doc would be: async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
await asyncio.wait([task])
if task.cancelled():
print("main(): cancel_me is cancelled now"
asyncio.run(main()) Does that look correct ? |
Yes that looks correct and more robust against As to what to do about the docs: they should probably be enhanced, but I'm not sure it makes sense to dump all the complexity into the first sample? I'm also receptive to the argument Maybe there should be some note after the existing sample that the plain That explanatory text could also make mention of |
Based on @cbornet comment, I've made the following implementation, is this overkill? import asyncio
import logging
logger = logging.getLogger(__name__)
async def cancel_task(task: asyncio.Task, task_name: str):
"""
Cancel a task and wait for the cancelling to finish.
Args:
task (asyncio.Task): The task to cancel.
task_name (str): Human-readable name of the task.
"""
task_cancelled = task.cancelled()
task_done = task.done()
logger.debug(
f"{task_name = } {task_cancelled = } {task_done = }"
)
if task_cancelled or task_done:
logger.debug(
f"{task_name} task was already cancelled or done"
)
return
logger.info(f"Cancelling {task_name} task.")
task_cancelling = task.cancel()
if not task_cancelling:
logger.debug(
f"{task_name} task was already cancelled or done by the time we tried to cancel it"
)
return
logger.debug(
f"{task_name} task was not cancelled yet but is cancelling now"
)
await asyncio.wait([task])
logger.debug(f"Awaited cancelling of {task_name} task")
if not task.cancelled() and not task.done():
logger.error(
f"{task_name} task was not cancelled or done. This is VERY UNEXPECTED.",
stack_info=True,
)
return
logger.info(f"{task_name} task is now cancelled or done") |
I propose better implementation of cancel_and_wait: async def cancel_and_wait(task: asyncio.Task, msg=None):
# if task is already done, return immediately
if task.done():
return
# After our cancellation, cancelling() should be incremented by 1. If its incremented by more than 1, it means cancel was requested externally.
# In that case CancelledError should be raised to also end waiting task.
cancelling_expected = task.cancelling() + 1
task.cancel(msg)
try:
await task
except asyncio.CancelledError:
if task.cancelling() != cancelling_expected:
raise
else:
return
else:
raise RuntimeError("Cancelled task did not end with an exception") Original one has following flaws:
|
Is it safe to assume Edit: Answer: yes. cpython/Lib/asyncio/timeouts.py Lines 155 to 157 in 8ada7a9
So the example in the docs, if one wishes to allow some time for the Task to cancel before moving on: async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
print('cancel_me(): async cleanup before raise')
await asyncio.sleep(random.randint(0, 10))
print('cancel_me(): after async cleanup') # this line may or may not be reached, depending on whether we are timed out and cancelled by the wait_for call
raise
finally:
print('cancel_me(): after sleep')
async def main():
task = asyncio.create_task(cancel_me())
await asyncio.sleep(1)
task.cancel()
try:
await asyncio.wait_for(task, 5)
except asyncio.TimeoutError:
pass
finally:
if task.cancelled():
print("main(): cancel_me is cancelled now")
asyncio.run(main()) The shortcoming is the code does not distinguish whether the wrapped coroutine propagated the |
Feature or enhancement
I propose adding a function to
asyncio
to cancel a task and then safely wait for the cancellation to complete.The main difficulty with this is deciding whether to swallow or re-raise the
CancelledError
once we catch one. It we call the task that we're waiting on "dependency", and the task doing the waiting "controller", then there are two distinct possibilities:CancelledError
all the way up its own stack, and then further up through theawait
in the "controller". In this case, theCancelledError
should be swallowed, and the "controller" can continue its work normally.await
. In this case, theCancelledError
is the signal to cancel the "controller" itself, and should be either re-raised further up, or its swallowing should be accompanied by a call touncancel
.The documentation for
asyncio.Task.cancel
, in fact, does not make this decision correctly, and thus would be a bug. Copying the example, and changing the names to match this issue's terminology:If I'm not missing anything, the correct procedure would look like this:
Thus, I propose to make these changes:
Introduce a function to
asyncio
orTask
:Having a specialized function would reduce the possibility of someone making this mistake in their code (like the author of the example probably did :) ), and allow the implementation to be changed or improved in the future. One such possible enhancement, for example, could be adding a
repeat
parameter to instruct the function, in case the task uncancels itself, to keep cancelling it again in a loop.In the documentation for
asyncio
, add a warning for this kind of mistake, in the "Task Cancellation" section or in the description ofasyncio.Task.cancel
.Change the code example for
asyncio.Task.cancel
to account for cancellation ofmain
. I know that, in this specific snippet, it is impossible formain
itself to be cancelled; but a developer unsuspecting of this issue may copy this example into a situation where the controller is indeed cancellable, and end up with a bug in their code.The text was updated successfully, but these errors were encountered: