6
6
'wait' , 'wait_for' , 'as_completed' , 'sleep' ,
7
7
'gather' , 'shield' , 'ensure_future' , 'run_coroutine_threadsafe' ,
8
8
'current_task' , 'all_tasks' ,
9
+ 'create_eager_task_factory' , 'eager_task_factory' ,
9
10
'_register_task' , '_unregister_task' , '_enter_task' , '_leave_task' ,
10
11
)
11
12
@@ -43,22 +44,26 @@ def all_tasks(loop=None):
43
44
"""Return a set of all tasks for the loop."""
44
45
if loop is None :
45
46
loop = events .get_running_loop ()
46
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
48
- # cast itself requires iteration, so we repeat it several times ignoring
49
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50
- # details.
47
+ # capturing the set of eager tasks first, so if an eager task "graduates"
48
+ # to a regular task in another thread, we don't risk missing it.
49
+ eager_tasks = list (_eager_tasks )
50
+ # Looping over the WeakSet isn't safe as it can be updated from another
51
+ # thread, therefore we cast it to list prior to filtering. The list cast
52
+ # itself requires iteration, so we repeat it several times ignoring
53
+ # RuntimeErrors (which are not very likely to occur).
54
+ # See issues 34970 and 36607 for details.
55
+ scheduled_tasks = None
51
56
i = 0
52
57
while True :
53
58
try :
54
- tasks = list (_all_tasks )
59
+ scheduled_tasks = list (_scheduled_tasks )
55
60
except RuntimeError :
56
61
i += 1
57
62
if i >= 1000 :
58
63
raise
59
64
else :
60
65
break
61
- return {t for t in tasks
66
+ return {t for t in itertools . chain ( scheduled_tasks , eager_tasks )
62
67
if futures ._get_loop (t ) is loop and not t .done ()}
63
68
64
69
@@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
93
98
# status is still pending
94
99
_log_destroy_pending = True
95
100
96
- def __init__ (self , coro , * , loop = None , name = None , context = None ):
101
+ def __init__ (self , coro , * , loop = None , name = None , context = None ,
102
+ eager_start = False ):
97
103
super ().__init__ (loop = loop )
98
104
if self ._source_traceback :
99
105
del self ._source_traceback [- 1 ]
@@ -117,8 +123,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
117
123
else :
118
124
self ._context = context
119
125
120
- self ._loop .call_soon (self .__step , context = self ._context )
121
- _register_task (self )
126
+ if eager_start and self ._loop .is_running ():
127
+ self .__eager_start ()
128
+ else :
129
+ self ._loop .call_soon (self .__step , context = self ._context )
130
+ _register_task (self )
122
131
123
132
def __del__ (self ):
124
133
if self ._state == futures ._PENDING and self ._log_destroy_pending :
@@ -250,6 +259,25 @@ def uncancel(self):
250
259
self ._num_cancels_requested -= 1
251
260
return self ._num_cancels_requested
252
261
262
+ def __eager_start (self ):
263
+ prev_task = _swap_current_task (self ._loop , self )
264
+ try :
265
+ _register_eager_task (self )
266
+ try :
267
+ self ._context .run (self .__step_run_and_handle_result , None )
268
+ finally :
269
+ _unregister_eager_task (self )
270
+ finally :
271
+ try :
272
+ curtask = _swap_current_task (self ._loop , prev_task )
273
+ assert curtask is self
274
+ finally :
275
+ if self .done ():
276
+ self ._coro = None
277
+ self = None # Needed to break cycles when an exception occurs.
278
+ else :
279
+ _register_task (self )
280
+
253
281
def __step (self , exc = None ):
254
282
if self .done ():
255
283
raise exceptions .InvalidStateError (
@@ -258,11 +286,17 @@ def __step(self, exc=None):
258
286
if not isinstance (exc , exceptions .CancelledError ):
259
287
exc = self ._make_cancelled_error ()
260
288
self ._must_cancel = False
261
- coro = self ._coro
262
289
self ._fut_waiter = None
263
290
264
291
_enter_task (self ._loop , self )
265
- # Call either coro.throw(exc) or coro.send(None).
292
+ try :
293
+ self .__step_run_and_handle_result (exc )
294
+ finally :
295
+ _leave_task (self ._loop , self )
296
+ self = None # Needed to break cycles when an exception occurs.
297
+
298
+ def __step_run_and_handle_result (self , exc ):
299
+ coro = self ._coro
266
300
try :
267
301
if exc is None :
268
302
# We use the `send` method directly, because coroutines
@@ -334,7 +368,6 @@ def __step(self, exc=None):
334
368
self ._loop .call_soon (
335
369
self .__step , new_exc , context = self ._context )
336
370
finally :
337
- _leave_task (self ._loop , self )
338
371
self = None # Needed to break cycles when an exception occurs.
339
372
340
373
def __wakeup (self , future ):
@@ -897,17 +930,41 @@ def callback():
897
930
return future
898
931
899
932
900
- # WeakSet containing all alive tasks.
901
- _all_tasks = weakref .WeakSet ()
933
+ def create_eager_task_factory (custom_task_constructor ):
934
+
935
+ if "eager_start" not in inspect .signature (custom_task_constructor ).parameters :
936
+ raise TypeError (
937
+ "Provided constructor does not support eager task execution" )
938
+
939
+ def factory (loop , coro , * , name = None , context = None ):
940
+ return custom_task_constructor (
941
+ coro , loop = loop , name = name , context = context , eager_start = True )
942
+
943
+
944
+ return factory
945
+
946
+ eager_task_factory = create_eager_task_factory (Task )
947
+
948
+
949
+ # Collectively these two sets hold references to the complete set of active
950
+ # tasks. Eagerly executed tasks use a faster regular set as an optimization
951
+ # but may graduate to a WeakSet if the task blocks on IO.
952
+ _scheduled_tasks = weakref .WeakSet ()
953
+ _eager_tasks = set ()
902
954
903
955
# Dictionary containing tasks that are currently active in
904
956
# all running event loops. {EventLoop: Task}
905
957
_current_tasks = {}
906
958
907
959
908
960
def _register_task (task ):
909
- """Register a new task in asyncio as executed by loop."""
910
- _all_tasks .add (task )
961
+ """Register an asyncio Task scheduled to run on an event loop."""
962
+ _scheduled_tasks .add (task )
963
+
964
+
965
+ def _register_eager_task (task ):
966
+ """Register an asyncio Task about to be eagerly executed."""
967
+ _eager_tasks .add (task )
911
968
912
969
913
970
def _enter_task (loop , task ):
@@ -926,28 +983,49 @@ def _leave_task(loop, task):
926
983
del _current_tasks [loop ]
927
984
928
985
986
+ def _swap_current_task (loop , task ):
987
+ prev_task = _current_tasks .get (loop )
988
+ if task is None :
989
+ del _current_tasks [loop ]
990
+ else :
991
+ _current_tasks [loop ] = task
992
+ return prev_task
993
+
994
+
929
995
def _unregister_task (task ):
930
- """Unregister a task."""
931
- _all_tasks .discard (task )
996
+ """Unregister a completed, scheduled Task."""
997
+ _scheduled_tasks .discard (task )
998
+
999
+
1000
+ def _unregister_eager_task (task ):
1001
+ """Unregister a task which finished its first eager step."""
1002
+ _eager_tasks .discard (task )
932
1003
933
1004
934
1005
_py_current_task = current_task
935
1006
_py_register_task = _register_task
1007
+ _py_register_eager_task = _register_eager_task
936
1008
_py_unregister_task = _unregister_task
1009
+ _py_unregister_eager_task = _unregister_eager_task
937
1010
_py_enter_task = _enter_task
938
1011
_py_leave_task = _leave_task
1012
+ _py_swap_current_task = _swap_current_task
939
1013
940
1014
941
1015
try :
942
- from _asyncio import (_register_task , _unregister_task ,
943
- _enter_task , _leave_task ,
944
- _all_tasks , _current_tasks ,
1016
+ from _asyncio import (_register_task , _register_eager_task ,
1017
+ _unregister_task , _unregister_eager_task ,
1018
+ _enter_task , _leave_task , _swap_current_task ,
1019
+ _scheduled_tasks , _eager_tasks , _current_tasks ,
945
1020
current_task )
946
1021
except ImportError :
947
1022
pass
948
1023
else :
949
1024
_c_current_task = current_task
950
1025
_c_register_task = _register_task
1026
+ _c_register_eager_task = _register_eager_task
951
1027
_c_unregister_task = _unregister_task
1028
+ _c_unregister_eager_task = _unregister_eager_task
952
1029
_c_enter_task = _enter_task
953
1030
_c_leave_task = _leave_task
1031
+ _c_swap_current_task = _swap_current_task
0 commit comments