Skip to content

Commit 2b5ca1b

Browse files
committed
Disallow task.yield under callback, provide callback way to yield
1 parent 04f7caf commit 2b5ca1b

File tree

3 files changed

+108
-60
lines changed

3 files changed

+108
-60
lines changed

design/mvp/CanonicalABI.md

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,27 @@ created by `canon_lift` and `Subtask`, which is created by `canon_lower`.
444444
Additional sync-/async-specialized mutable state is added by the `SyncTask`,
445445
`AsyncTask` and `AsyncSubtask` subclasses.
446446

447+
The `Task` class and its subclasses depend on the following two enums:
448+
```python
449+
class AsyncCallState(IntEnum):
450+
STARTING = 0
451+
STARTED = 1
452+
RETURNED = 2
453+
DONE = 3
454+
455+
class EventCode(IntEnum):
456+
CALL_STARTING = AsyncCallState.STARTING
457+
CALL_STARTED = AsyncCallState.STARTED
458+
CALL_RETURNED = AsyncCallState.RETURNED
459+
CALL_DONE = AsyncCallState.DONE
460+
YIELDED = 4
461+
```
462+
The `AsyncCallState` enum describes the linear sequence of states that an async
463+
call necessarily transitions through: [`STARTING`](Async.md#starting),
464+
`STARTED`, [`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum
465+
shares common code values with `AsyncCallState` to define the set of integer
466+
event codes that are delivered to [waiting](Async.md#waiting) or polling tasks.
467+
447468
A `Task` object is created for each call to `canon_lift` and is implicitly
448469
threaded through all core function calls. This implicit `Task` parameter
449470
specifies a concept of [the current task](Async.md#current-task) and inherently
@@ -520,8 +541,7 @@ All `Task`s (whether lifted `async` or not) are allowed to call `async`-lowered
520541
imports. Calling an `async`-lowered import creates an `AsyncSubtask` (defined
521542
below) which is stored in the current component instance's `async_subtasks`
522543
table and tracked by the current task's `num_async_subtasks` counter, which is
523-
guarded to be `0` in `Task.exit` (below) to ensure the
524-
tree-structured-concurrency [component invariant].
544+
guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency].
525545
```python
526546
def add_async_subtask(self, subtask):
527547
assert(subtask.supertask is None and subtask.index is None)
@@ -549,7 +569,7 @@ tree-structured-concurrency [component invariant].
549569
if subtask.state == AsyncCallState.DONE:
550570
self.inst.async_subtasks.remove(subtask.index)
551571
self.num_async_subtasks -= 1
552-
return (subtask.state, subtask.index)
572+
return (EventCode(subtask.state), subtask.index)
553573
```
554574
While a task is running, it may call `wait` (via `canon task.wait` or, when a
555575
`callback` is present, by returning to the event loop) to block until there is
@@ -573,6 +593,16 @@ another task:
573593
return self.process_event(self.events.get_nowait())
574594
```
575595

596+
A task may also cooperatively yield the current thread, explicitly allowing
597+
the runtime to switch to another ready task, but without blocking on I/O (as
598+
emulated in the Python code here by awaiting a `sleep(0)`).
599+
```python
600+
async def yield_(self):
601+
self.inst.thread.release()
602+
await asyncio.sleep(0)
603+
await self.inst.thread.acquire()
604+
```
605+
576606
Lastly, when a task exists, the runtime enforces the guard conditions mentioned
577607
above and releases the `thread` lock, allowing other tasks to start or make
578608
progress.
@@ -641,17 +671,6 @@ implementation should be able to avoid separately allocating
641671
`pending_sync_tasks` by instead embedding a "next pending" linked list in the
642672
`Subtask` table element of the caller.
643673

644-
The `AsyncTask` class dynamically checks that the task calls the
645-
`canon_task_start` and `canon_task_return` (defined below) in the right order
646-
before finishing the task. "The right order" is defined in terms of a simple
647-
linear state machine that progresses through the following 4 states:
648-
```python
649-
class AsyncCallState(IntEnum):
650-
STARTING = 0
651-
STARTED = 1
652-
RETURNED = 2
653-
DONE = 3
654-
```
655674
The first 3 fields of `AsyncTask` are simply immutable copies of
656675
arguments/immediates passed to `canon_lift` that are used later on. The last 2
657676
fields are used to check the above-mentioned state machine transitions and also
@@ -1950,10 +1969,16 @@ async def canon_lift(opts, inst, callee, ft, caller, start_thunk, return_thunk):
19501969
if not opts.callback:
19511970
[] = await call_and_trap_on_throw(callee, task, [])
19521971
else:
1953-
[ctx] = await call_and_trap_on_throw(callee, task, [])
1954-
while ctx != 0:
1955-
event, payload = await task.wait()
1956-
[ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, payload])
1972+
[packed_ctx] = await call_and_trap_on_throw(callee, task, [])
1973+
while packed_ctx != 0:
1974+
is_yield = bool(packed_ctx & 1)
1975+
ctx = packed_ctx & ~1
1976+
if is_yield:
1977+
await task.yield_()
1978+
event, payload = (EventCode.YIELDED, 0)
1979+
else:
1980+
event, payload = await task.wait()
1981+
[packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, payload])
19571982

19581983
assert(opts.post_return is None)
19591984
task.exit()
@@ -1981,11 +2006,13 @@ allow the callee to reclaim any memory. An async call doesn't need a
19812006

19822007
Within the async case, there are two sub-cases depending on whether the
19832008
`callback` `canonopt` was set. When `callback` is present, waiting happens in
1984-
an "event loop" inside `canon_lift`. Otherwise, waiting must happen by calling
1985-
`task.wait` (defined below), which potentially requires the runtime
1986-
implementation to use a fiber (aka. stackful coroutine) to switch to another
1987-
task. Thus, `callback` is an optimization for avoiding fiber creation for async
1988-
languages that don't need it (e.g., JS, Python, C# and Rust).
2009+
an "event loop" inside `canon_lift` which also allows yielding (i.e., allowing
2010+
other tasks to run without blocking) by setting the LSB of the returned `i32`.
2011+
Otherwise, waiting must happen by calling `task.wait` (defined below), which
2012+
potentially requires the runtime implementation to use a fiber (aka. stackful
2013+
coroutine) to switch to another task. Thus, `callback` is an optimization for
2014+
avoiding fiber creation for async languages that don't need it (e.g., JS,
2015+
Python, C# and Rust).
19892016

19902017
Uncaught Core WebAssembly [exceptions] result in a trap at component
19912018
boundaries. Thus, if a component wishes to signal an error, it must use some
@@ -2330,9 +2357,8 @@ Python `asyncio.sleep(0)` in the middle to make it clear that other
23302357
coroutines are allowed to acquire the `lock` and execute.
23312358
```python
23322359
async def canon_task_yield(task):
2333-
task.inst.thread.release()
2334-
await asyncio.sleep(0)
2335-
await task.inst.thread.acquire()
2360+
trap_if(task.opts.callback is not None)
2361+
await task.yield_()
23362362
return []
23372363
```
23382364

@@ -2413,6 +2439,7 @@ def canon_thread_hw_concurrency():
24132439
[JavaScript Embedding]: Explainer.md#JavaScript-embedding
24142440
[Adapter Functions]: FutureFeatures.md#custom-abis-via-adapter-functions
24152441
[Shared-Everything Dynamic Linking]: examples/SharedEverythingDynamicLinking.md
2442+
[Structured Concurrency]: Async.md#structured-concurrency
24162443

24172444
[Administrative Instructions]: https://webassembly.github.io/spec/core/exec/runtime.html#syntax-instr-admin
24182445
[Implementation Limits]: https://webassembly.github.io/spec/core/appendix/implementation.html

design/mvp/canonical-abi/definitions.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,19 @@ def __init__(self, rep, own, scope = None):
384384
self.scope = scope
385385
self.lend_count = 0
386386

387+
class AsyncCallState(IntEnum):
388+
STARTING = 0
389+
STARTED = 1
390+
RETURNED = 2
391+
DONE = 3
392+
393+
class EventCode(IntEnum):
394+
CALL_STARTING = AsyncCallState.STARTING
395+
CALL_STARTED = AsyncCallState.STARTED
396+
CALL_RETURNED = AsyncCallState.RETURNED
397+
CALL_DONE = AsyncCallState.DONE
398+
YIELDED = 4
399+
387400
class Task(CallContext):
388401
caller: Optional[Task]
389402
borrow_count: int
@@ -440,13 +453,18 @@ def process_event(self, subtask):
440453
if subtask.state == AsyncCallState.DONE:
441454
self.inst.async_subtasks.remove(subtask.index)
442455
self.num_async_subtasks -= 1
443-
return (subtask.state, subtask.index)
456+
return (EventCode(subtask.state), subtask.index)
444457

445458
def poll(self):
446459
if self.events.empty():
447460
return None
448461
return self.process_event(self.events.get_nowait())
449462

463+
async def yield_(self):
464+
self.inst.thread.release()
465+
await asyncio.sleep(0)
466+
await self.inst.thread.acquire()
467+
450468
def exit(self):
451469
assert(self.events.empty())
452470
trap_if(self.borrow_count != 0)
@@ -486,12 +504,6 @@ def exit(self):
486504
if self.inst.pending_sync_tasks:
487505
self.inst.pending_sync_tasks.pop(0).set_result(None)
488506

489-
class AsyncCallState(IntEnum):
490-
STARTING = 0
491-
STARTED = 1
492-
RETURNED = 2
493-
DONE = 3
494-
495507
class AsyncTask(Task):
496508
ft: FuncType
497509
start_thunk: Callable
@@ -1365,10 +1377,16 @@ async def canon_lift(opts, inst, callee, ft, caller, start_thunk, return_thunk):
13651377
if not opts.callback:
13661378
[] = await call_and_trap_on_throw(callee, task, [])
13671379
else:
1368-
[ctx] = await call_and_trap_on_throw(callee, task, [])
1369-
while ctx != 0:
1370-
event, payload = await task.wait()
1371-
[ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, payload])
1380+
[packed_ctx] = await call_and_trap_on_throw(callee, task, [])
1381+
while packed_ctx != 0:
1382+
is_yield = bool(packed_ctx & 1)
1383+
ctx = packed_ctx & ~1
1384+
if is_yield:
1385+
await task.yield_()
1386+
event, payload = (EventCode.YIELDED, 0)
1387+
else:
1388+
event, payload = await task.wait()
1389+
[packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, payload])
13721390

13731391
assert(opts.post_return is None)
13741392
task.exit()
@@ -1510,7 +1528,6 @@ async def canon_task_poll(task, ptr):
15101528
### `canon task.yield`
15111529

15121530
async def canon_task_yield(task):
1513-
task.inst.thread.release()
1514-
await asyncio.sleep(0)
1515-
await task.inst.thread.acquire()
1531+
trap_if(task.opts.callback is not None)
1532+
await task.yield_()
15161533
return []

design/mvp/canonical-abi/run_tests.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -525,19 +525,19 @@ async def consumer(task, args):
525525
consumer_heap.memory[argp] = 83
526526
consumer_heap.memory[argp+1] = 84
527527
fut1.set_result(None)
528-
state, callidx = await task.wait()
529-
assert(state == AsyncCallState.STARTED)
528+
event, callidx = await task.wait()
529+
assert(event == EventCode.CALL_STARTED)
530530
assert(callidx == 1)
531531
assert(consumer_heap.memory[retp] == 15)
532532
fut2.set_result(None)
533-
state, callidx = await task.wait()
534-
assert(state == AsyncCallState.RETURNED)
533+
event, callidx = await task.wait()
534+
assert(event == EventCode.CALL_RETURNED)
535535
assert(callidx == 1)
536536
assert(consumer_heap.memory[retp] == 44)
537537
fut3.set_result(None)
538538
assert(task.num_async_subtasks == 1)
539-
state, callidx = await task.wait()
540-
assert(state == AsyncCallState.DONE)
539+
event, callidx = await task.wait()
540+
assert(event == EventCode.CALL_DONE)
541541
assert(callidx == 1)
542542
assert(task.num_async_subtasks == 0)
543543

@@ -558,8 +558,8 @@ async def dtor(task, args):
558558
assert(task.num_async_subtasks == 1)
559559
assert(dtor_value is None)
560560
dtor_fut.set_result(None)
561-
state, callidx = await task.wait()
562-
assert(state == AsyncCallState.DONE)
561+
event, callidx = await task.wait()
562+
assert(event == AsyncCallState.DONE)
563563
assert(callidx == 1)
564564
assert(task.num_async_subtasks == 0)
565565

@@ -614,13 +614,17 @@ async def consumer(task, args):
614614
async def callback(task, args):
615615
assert(len(args) == 3)
616616
if args[0] == 42:
617-
assert(args[1] == AsyncCallState.DONE)
617+
assert(args[1] == EventCode.CALL_DONE)
618618
assert(args[2] == 1)
619+
return [53]
620+
elif args[0] == 52:
621+
assert(args[1] == EventCode.YIELDED)
622+
assert(args[2] == 0)
619623
fut2.set_result(None)
620-
return [43]
624+
return [62]
621625
else:
622-
assert(args[0] == 43)
623-
assert(args[1] == AsyncCallState.DONE)
626+
assert(args[0] == 62)
627+
assert(args[1] == EventCode.CALL_DONE)
624628
assert(args[2] == 2)
625629
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [83])
626630
return [0]
@@ -684,16 +688,16 @@ async def consumer(task, args):
684688

685689
fut.set_result(None)
686690
assert(producer1_done == False)
687-
state, callidx = await task.wait()
688-
assert(state == AsyncCallState.DONE)
691+
event, callidx = await task.wait()
692+
assert(event == EventCode.CALL_DONE)
689693
assert(callidx == 1)
690694
assert(producer1_done == True)
691695

692696
assert(producer2_done == False)
693697
await canon_task_yield(task)
694698
assert(producer2_done == True)
695-
state, callidx = task.poll()
696-
assert(state == AsyncCallState.DONE)
699+
event, callidx = task.poll()
700+
assert(event == EventCode.CALL_DONE)
697701
assert(callidx == 2)
698702
assert(producer2_done == True)
699703

@@ -739,12 +743,12 @@ async def core_func(task, args):
739743
assert(ret == (2 | (AsyncCallState.STARTED << 30)))
740744

741745
fut1.set_result(None)
742-
state, callidx = await task.wait()
743-
assert(state == AsyncCallState.DONE)
746+
event, callidx = await task.wait()
747+
assert(event == EventCode.CALL_DONE)
744748
assert(callidx == 1)
745749
fut2.set_result(None)
746-
state, callidx = await task.wait()
747-
assert(state == AsyncCallState.DONE)
750+
event, callidx = await task.wait()
751+
assert(event == EventCode.CALL_DONE)
748752
assert(callidx == 2)
749753
return []
750754

0 commit comments

Comments
 (0)