26
26
PENDING , RUNNING , CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED , Future ,
27
27
BrokenExecutor )
28
28
from concurrent .futures .process import BrokenProcessPool , _check_system_limits
29
- from multiprocessing import get_context
30
29
31
30
import multiprocessing .process
32
31
import multiprocessing .util
32
+ import multiprocessing as mp
33
33
34
34
35
35
if support .check_sanitizer (address = True , memory = True ):
@@ -143,7 +143,7 @@ def tearDown(self):
143
143
super ().tearDown ()
144
144
145
145
def get_context (self ):
146
- return get_context (self .ctx )
146
+ return mp . get_context (self .ctx )
147
147
148
148
149
149
class ThreadPoolMixin (ExecutorMixin ):
@@ -922,7 +922,7 @@ def submit(pool):
922
922
pool .submit (submit , pool )
923
923
924
924
for _ in range (50 ):
925
- with futures .ProcessPoolExecutor (1 , mp_context = get_context ('fork' )) as workers :
925
+ with futures .ProcessPoolExecutor (1 , mp_context = mp . get_context ('fork' )) as workers :
926
926
workers .submit (tuple )
927
927
928
928
@@ -992,7 +992,7 @@ def test_traceback(self):
992
992
def test_ressources_gced_in_workers (self ):
993
993
# Ensure that argument for a job are correctly gc-ed after the job
994
994
# is finished
995
- mgr = get_context ( self .ctx ).Manager ()
995
+ mgr = self .get_context ( ).Manager ()
996
996
obj = EventfulGCObj (mgr )
997
997
future = self .executor .submit (id , obj )
998
998
future .result ()
@@ -1039,6 +1039,8 @@ def test_idle_process_reuse_multiple(self):
1039
1039
executor .shutdown ()
1040
1040
1041
1041
def test_max_tasks_per_child (self ):
1042
+ # not using self.executor as we need to control construction.
1043
+ # arguably this could go in another class w/o that mixin.
1042
1044
executor = self .executor_type (
1043
1045
1 , mp_context = self .get_context (), max_tasks_per_child = 3 )
1044
1046
f1 = executor .submit (os .getpid )
@@ -1060,6 +1062,8 @@ def test_max_tasks_per_child(self):
1060
1062
executor .shutdown ()
1061
1063
1062
1064
def test_max_tasks_early_shutdown (self ):
1065
+ # not using self.executor as we need to control construction.
1066
+ # arguably this could go in another class w/o that mixin.
1063
1067
executor = self .executor_type (
1064
1068
3 , mp_context = self .get_context (), max_tasks_per_child = 1 )
1065
1069
futures = []
@@ -1171,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
1171
1175
self .executor .shutdown (wait = True )
1172
1176
1173
1177
executor = self .executor_type (
1174
- max_workers = 2 , mp_context = get_context ( self .ctx ))
1178
+ max_workers = 2 , mp_context = self .get_context ( ))
1175
1179
res = executor .submit (func , * args )
1176
1180
1177
1181
if ignore_stderr :
@@ -1250,7 +1254,7 @@ def test_shutdown_deadlock(self):
1250
1254
# if a worker fails after the shutdown call.
1251
1255
self .executor .shutdown (wait = True )
1252
1256
with self .executor_type (max_workers = 2 ,
1253
- mp_context = get_context ( self .ctx )) as executor :
1257
+ mp_context = self .get_context ( )) as executor :
1254
1258
self .executor = executor # Allow clean up in fail_on_deadlock
1255
1259
f = executor .submit (_crash , delay = .1 )
1256
1260
executor .shutdown (wait = True )
@@ -1263,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self):
1263
1267
# Reported in bpo-39104.
1264
1268
self .executor .shutdown (wait = True )
1265
1269
with self .executor_type (max_workers = 2 ,
1266
- mp_context = get_context ( self .ctx )) as executor :
1270
+ mp_context = self .get_context ( )) as executor :
1267
1271
self .executor = executor # Allow clean up in fail_on_deadlock
1268
1272
1269
1273
# Start the executor and get the executor_manager_thread to collect
0 commit comments