Skip to content

Commit 1ea845c

Browse files
committed
Copy pythonGH-98137, make some changes, add benchmarking script, trying to get it working
1 parent d4c410f commit 1ea845c

File tree

4 files changed

+243
-15
lines changed

4 files changed

+243
-15
lines changed

Lib/asyncio/__init__.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,36 @@
3636
tasks.__all__ +
3737
threads.__all__ +
3838
timeouts.__all__ +
39-
transports.__all__)
39+
transports.__all__ + (
40+
'create_eager_task_factory',
41+
'eager_task_factory',
42+
))
43+
44+
# throwing things here temporarily to defer premature dir layout bikeshedding
45+
46+
def create_eager_task_factory(custom_task_constructor):
47+
48+
def factory(loop, coro, *, name=None, context=None):
49+
loop._check_closed()
50+
try:
51+
result = coro.send(None)
52+
except StopIteration as si:
53+
print("XXX")
54+
fut = loop.create_future()
55+
fut.set_result(si.value)
56+
return fut
57+
except Exception as ex:
58+
print("YYY")
59+
fut = loop.create_future()
60+
fut.set_exception(ex)
61+
return fut
62+
else:
63+
return custom_task_constructor(
64+
coro, loop=loop, name=name, context=context, yield_result=result)
65+
66+
return factory
67+
68+
eager_task_factory = create_eager_task_factory(Task)
4069

4170
if sys.platform == 'win32': # pragma: no cover
4271
from .windows_events import *

Lib/asyncio/taskgroups.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,14 @@ def create_task(self, coro, *, name=None, context=None):
141141
raise RuntimeError(f"TaskGroup {self!r} is finished")
142142
if self._aborting:
143143
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
144-
if context is None:
144+
if hasattr(self._loop, "eager_task_factory"):
145+
task = self._loop.eager_task_factory(coro, name=name, context=context)
146+
elif context is None:
145147
task = self._loop.create_task(coro)
146148
else:
147149
task = self._loop.create_task(coro, context=context)
148-
tasks._set_task_name(task, name)
150+
if not task.done(): # If it's done already, it's a future
151+
tasks._set_task_name(task, name)
149152
task.add_done_callback(self._on_task_done)
150153
self._tasks.add(task)
151154
return task

Lib/asyncio/tasks.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ def _set_task_name(task, name):
7575
set_name(name)
7676

7777

78+
_NOT_SET = object()
79+
7880
class Task(futures._PyFuture): # Inherit Python Task implementation
7981
# from a Python Future implementation.
8082

@@ -93,7 +95,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
9395
# status is still pending
9496
_log_destroy_pending = True
9597

96-
def __init__(self, coro, *, loop=None, name=None, context=None):
98+
def __init__(self, coro, *, loop=None, name=None, context=None,
99+
yield_result=_NOT_SET):
97100
super().__init__(loop=loop)
98101
if self._source_traceback:
99102
del self._source_traceback[-1]
@@ -117,7 +120,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
117120
else:
118121
self._context = context
119122

120-
self._loop.call_soon(self.__step, context=self._context)
123+
if yield_result is _NOT_SET:
124+
self._loop.call_soon(self.__step, context=self._context)
125+
else:
126+
self.__step2(yield_result)
121127
_register_task(self)
122128

123129
def __del__(self):
@@ -287,6 +293,12 @@ def __step(self, exc=None):
287293
except BaseException as exc:
288294
super().set_exception(exc)
289295
else:
296+
self.__step2(result)
297+
finally:
298+
_leave_task(self._loop, self)
299+
self = None # Needed to break cycles when an exception occurs.
300+
301+
def __step2(self, result):
290302
blocking = getattr(result, '_asyncio_future_blocking', None)
291303
if blocking is not None:
292304
# Yielded Future must come from Future.__iter__().
@@ -333,9 +345,6 @@ def __step(self, exc=None):
333345
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
334346
self._loop.call_soon(
335347
self.__step, new_exc, context=self._context)
336-
finally:
337-
_leave_task(self._loop, self)
338-
self = None # Needed to break cycles when an exception occurs.
339348

340349
def __wakeup(self, future):
341350
try:
@@ -357,13 +366,13 @@ def __wakeup(self, future):
357366
_PyTask = Task
358367

359368

360-
try:
361-
import _asyncio
362-
except ImportError:
363-
pass
364-
else:
365-
# _CTask is needed for tests.
366-
Task = _CTask = _asyncio.Task
369+
# try:
370+
# import _asyncio
371+
# except ImportError:
372+
# pass
373+
# else:
374+
# # _CTask is needed for tests.
375+
# Task = _CTask = _asyncio.Task
367376

368377

369378
def create_task(coro, *, name=None, context=None):

async_tree.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
2+
"""
3+
Benchmark script for recursive async tree workloads. This script includes the
4+
following microbenchmark scenarios:
5+
6+
1) "no_suspension": No suspension in the async tree.
7+
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
8+
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
9+
un-memoized IO calls will result in suspensions.
10+
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
11+
memoization) at the leaf nodes.
12+
13+
Use the commandline flag or choose the corresponding <Scenario>AsyncTree class
14+
to run the desired microbenchmark scenario.
15+
"""
16+
17+
18+
import asyncio
19+
import math
20+
import random
21+
import time
22+
from argparse import ArgumentParser
23+
24+
25+
NUM_RECURSE_LEVELS = 6
26+
NUM_RECURSE_BRANCHES = 6
27+
IO_SLEEP_TIME = 0.05
28+
DEFAULT_MEMOIZABLE_PERCENTAGE = 90
29+
DEFAULT_CPU_PROBABILITY = 0.5
30+
FACTORIAL_N = 500
31+
32+
33+
def parse_args():
34+
parser = ArgumentParser(
35+
description="""\
36+
Benchmark script for recursive async tree workloads. It can be run as a standalone
37+
script, in which case you can specify the microbenchmark scenario to run and whether
38+
to print the results.
39+
"""
40+
)
41+
parser.add_argument(
42+
"-s",
43+
"--scenario",
44+
choices=["no_suspension", "suspense_all", "memoization", "cpu_io_mixed"],
45+
default="no_suspension",
46+
help="""\
47+
Determines which microbenchmark scenario to run. Defaults to no_suspension. Options:
48+
1) "no_suspension": No suspension in the async tree.
49+
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
50+
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
51+
un-memoized IO calls will result in suspensions.
52+
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
53+
memoization) at the leaf nodes.
54+
""",
55+
)
56+
parser.add_argument(
57+
"-m",
58+
"--memoizable-percentage",
59+
type=int,
60+
default=DEFAULT_MEMOIZABLE_PERCENTAGE,
61+
help="""\
62+
Sets the percentage (0-100) of the data that should be memoized, defaults to 90. For
63+
example, at the default 90 percent, data 1-90 will be memoized and data 91-100 will not.
64+
""",
65+
)
66+
parser.add_argument(
67+
"-c",
68+
"--cpu-probability",
69+
type=float,
70+
default=DEFAULT_CPU_PROBABILITY,
71+
help="""\
72+
Sets the probability (0-1) that a leaf node will execute a cpu-bound workload instead
73+
of an io-bound workload. Defaults to 0.5. Only applies to the "cpu_io_mixed"
74+
microbenchmark scenario.
75+
""",
76+
)
77+
parser.add_argument(
78+
"-p",
79+
"--print",
80+
action="store_true",
81+
default=False,
82+
help="Print the results (runtime and number of Tasks created).",
83+
)
84+
return parser.parse_args()
85+
86+
87+
class AsyncTree:
88+
def __init__(
89+
self,
90+
memoizable_percentage=DEFAULT_MEMOIZABLE_PERCENTAGE,
91+
cpu_probability=DEFAULT_CPU_PROBABILITY,
92+
):
93+
self.suspense_count = 0
94+
self.task_count = 0
95+
self.memoizable_percentage = memoizable_percentage
96+
self.cpu_probability = cpu_probability
97+
self.cache = {}
98+
# set to deterministic random, so that the results are reproducible
99+
random.seed(0)
100+
101+
async def mock_io_call(self):
102+
self.suspense_count += 1
103+
await asyncio.sleep(IO_SLEEP_TIME)
104+
105+
def create_task(self, loop, coro):
106+
self.task_count += 1
107+
return asyncio.Task(coro, loop=loop)
108+
109+
async def suspense_func(self):
110+
raise NotImplementedError(
111+
"To be implemented by each microbenchmark's derived class."
112+
)
113+
114+
async def recurse(self, recurse_level):
115+
if recurse_level == 0:
116+
await self.suspense_func()
117+
return
118+
119+
await asyncio.gather(
120+
*[self.recurse(recurse_level - 1) for _ in range(NUM_RECURSE_BRANCHES)]
121+
)
122+
123+
def run(self):
124+
loop = asyncio.new_event_loop()
125+
# eager_factory = asyncio.create_eager_task_factory(self.create_task)
126+
# loop.set_task_factory(eager_factory)
127+
loop.set_task_factory(asyncio.eager_task_factory)
128+
loop.run_until_complete(self.recurse(NUM_RECURSE_LEVELS))
129+
loop.close()
130+
131+
132+
class NoSuspensionAsyncTree(AsyncTree):
133+
async def suspense_func(self):
134+
return
135+
136+
137+
class SuspenseAllAsyncTree(AsyncTree):
138+
async def suspense_func(self):
139+
await self.mock_io_call()
140+
141+
142+
class MemoizationAsyncTree(AsyncTree):
143+
async def suspense_func(self):
144+
# deterministic random (seed preset)
145+
data = random.randint(1, 100)
146+
147+
if data <= self.memoizable_percentage:
148+
if self.cache.get(data):
149+
return data
150+
151+
self.cache[data] = True
152+
153+
await self.mock_io_call()
154+
return data
155+
156+
157+
class CpuIoMixedAsyncTree(MemoizationAsyncTree):
158+
async def suspense_func(self):
159+
if random.random() < self.cpu_probability:
160+
# mock cpu-bound call
161+
return math.factorial(FACTORIAL_N)
162+
else:
163+
return await MemoizationAsyncTree.suspense_func(self)
164+
165+
166+
if __name__ == "__main__":
167+
args = parse_args()
168+
scenario = args.scenario
169+
170+
trees = {
171+
"no_suspension": NoSuspensionAsyncTree,
172+
"suspense_all": SuspenseAllAsyncTree,
173+
"memoization": MemoizationAsyncTree,
174+
"cpu_io_mixed": CpuIoMixedAsyncTree,
175+
}
176+
async_tree_class = trees[scenario]
177+
async_tree = async_tree_class(args.memoizable_percentage, args.cpu_probability)
178+
179+
start_time = time.perf_counter()
180+
async_tree.run()
181+
end_time = time.perf_counter()
182+
183+
if args.print:
184+
print(f"Scenario: {scenario}")
185+
print(f"Time: {end_time - start_time} s")
186+
print(f"Tasks created: {async_tree.task_count}")
187+
print(f"Suspense called: {async_tree.suspense_count}")

0 commit comments

Comments
 (0)