1
+ import contextlib
2
+ import os
3
+ import sys
1
4
import unittest
2
5
from concurrent .futures .interpreter import ExecutionFailed
3
6
from test import support
7
10
from .util import BaseTestCase , InterpreterPoolMixin , setup_module
8
11
9
12
13
+ def write_msg (fd , msg ):
14
+ os .write (fd , msg + b'\0 ' )
15
+
16
+
17
+ def read_msg (fd ):
18
+ msg = b''
19
+ while ch := os .read (fd , 1 ):
20
+ if ch == b'\0 ' :
21
+ return msg
22
+ msg += ch
23
+
24
+
25
+ def get_current_name ():
26
+ return __name__
27
+
28
+
10
29
class InterpreterPoolExecutorTest (InterpreterPoolMixin , ExecutorTest , BaseTestCase ):
11
30
31
+ def pipe (self ):
32
+ r , w = os .pipe ()
33
+ self .addCleanup (lambda : os .close (r ))
34
+ self .addCleanup (lambda : os .close (w ))
35
+ return r , w
36
+
12
37
def assertTaskRaises (self , exctype ):
13
38
return self .assertRaisesRegex (ExecutionFailed , exctype .__name__ )
14
39
40
+ def test_init_func (self ):
41
+ msg = b'step: init'
42
+ r , w = self .pipe ()
43
+ os .write (w , b'\0 ' )
44
+
45
+ executor = self .executor_type (
46
+ initializer = write_msg , initargs = (w , msg ))
47
+ before = os .read (r , 100 )
48
+ executor .submit (mul , 10 , 10 )
49
+ after = read_msg (r )
50
+
51
+ self .assertEqual (before , b'\0 ' )
52
+ self .assertEqual (after , msg )
53
+
54
+ def test_init_script (self ):
55
+ msg1 = b'step: init'
56
+ msg2 = b'step: run'
57
+ r , w = self .pipe ()
58
+ initscript = f"""if True:
59
+ import os
60
+ msg = { msg2 !r}
61
+ os.write({ w } , { msg1 !r} + b'\\ 0')
62
+ """
63
+ script = f"""if True:
64
+ os.write({ w } , msg + b'\\ 0')
65
+ """
66
+ os .write (w , b'\0 ' )
67
+
68
+ executor = self .executor_type (initializer = initscript )
69
+ before_init = os .read (r , 100 )
70
+ fut = executor .submit (script )
71
+ after_init = read_msg (r )
72
+ write_msg (w , b'' )
73
+ before_run = read_msg (r )
74
+ fut .result ()
75
+ after_run = read_msg (r )
76
+
77
+ self .assertEqual (before_init , b'\0 ' )
78
+ self .assertEqual (after_init , msg1 )
79
+ self .assertEqual (before_run , b'' )
80
+ self .assertEqual (after_run , msg2 )
81
+
82
+ def test_init_script_args (self ):
83
+ with self .assertRaises (ValueError ):
84
+ self .executor_type (initializer = 'pass' , initargs = ('spam' ,))
85
+
86
+ def test_init_shared (self ):
87
+ msg = b'eggs'
88
+ r , w = self .pipe ()
89
+ script = f"""if True:
90
+ import os
91
+ os.write({ w } , spam + b'\\ 0')
92
+ """
93
+
94
+ executor = self .executor_type (shared = {'spam' : msg })
95
+ fut = executor .submit (script )
96
+ fut .result ()
97
+ after = read_msg (r )
98
+
99
+ self .assertEqual (after , msg )
100
+
101
+ def test_submit_script (self ):
102
+ msg = b'spam'
103
+ r , w = self .pipe ()
104
+ script = f"""if True:
105
+ import os
106
+ os.write({ w } , __name__.encode('utf-8') + b'\\ 0')
107
+ """
108
+ executor = self .executor_type ()
109
+
110
+ fut = executor .submit (script )
111
+ res = fut .result ()
112
+ after = read_msg (r )
113
+
114
+ self .assertEqual (after , b'__main__' )
115
+ self .assertIs (res , None )
116
+
117
+ def test_submit_func_globals (self ):
118
+ raise NotImplementedError
119
+ executor = self .executor_type ()
120
+ fut = executor .submit (get_current_name )
121
+ name = fut .result ()
122
+
123
+ self .assertEqual (name , '__main__' )
124
+ self .assertNotEqual (name , __name__ )
125
+
15
126
def test_saturation (self ):
16
127
blocker = queues .create ()
17
128
executor = self .executor_type (4 , shared = dict (blocker = blocker ))
@@ -32,35 +143,6 @@ def test_idle_thread_reuse(self):
32
143
self .assertEqual (len (executor ._threads ), 1 )
33
144
executor .shutdown (wait = True )
34
145
35
- # def test_executor_map_current_future_cancel(self):
36
- # blocker = queues.create()
37
- # log = queues.create()
38
- #
39
- # script = """if True:
40
- # def log_n_wait({ident}):
41
- # blocker(f"ident {ident} started")
42
- # try:
43
- # stop_event.wait()
44
- # finally:
45
- # log.append(f"ident {ident} stopped")
46
- # """
47
- #
48
- # with self.executor_type(max_workers=1) as pool:
49
- # # submit work to saturate the pool
50
- # fut = pool.submit(script.format(ident="first"))
51
- # gen = pool.map(log_n_wait, ["second", "third"], timeout=0)
52
- # try:
53
- # with self.assertRaises(TimeoutError):
54
- # next(gen)
55
- # finally:
56
- # gen.close()
57
- # blocker.put
58
- # stop_event.set()
59
- # fut.result()
60
- # # ident='second' is cancelled as a result of raising a TimeoutError
61
- # # ident='third' is cancelled because it remained in the collection of futures
62
- # self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
63
-
64
146
65
147
def setUpModule ():
66
148
setup_module ()
0 commit comments