14
14
import io # readline
15
15
import unittest
16
16
17
+ import struct
18
+ import tty
19
+ import fcntl
20
+ import platform
21
+ import warnings
22
+
17
23
TEST_STRING_1 = b"I wish to buy a fish license.\n "
18
24
TEST_STRING_2 = b"For my pet fish, Eric.\n "
19
25
26
+ try :
27
+ _TIOCGWINSZ = tty .TIOCGWINSZ
28
+ _TIOCSWINSZ = tty .TIOCSWINSZ
29
+ _HAVE_WINSZ = True
30
+ except AttributeError :
31
+ _HAVE_WINSZ = False
32
+
20
33
if verbose :
21
34
def debug (msg ):
22
35
print (msg )
@@ -60,6 +73,27 @@ def _readline(fd):
60
73
reader = io .FileIO (fd , mode = 'rb' , closefd = False )
61
74
return reader .readline ()
62
75
76
+ def expectedFailureIfStdinIsTTY (fun ):
77
+ # avoid isatty() for now
78
+ try :
79
+ tty .tcgetattr (pty .STDIN_FILENO )
80
+ return unittest .expectedFailure (fun )
81
+ except tty .error :
82
+ pass
83
+ return fun
84
+
85
+ def expectedFailureOnBSD (fun ):
86
+ PLATFORM = platform .system ()
87
+ if PLATFORM .endswith ("BSD" ) or PLATFORM == "Darwin" :
88
+ return unittest .expectedFailure (fun )
89
+ return fun
90
+
91
+ def _get_term_winsz (fd ):
92
+ s = struct .pack ("HHHH" , 0 , 0 , 0 , 0 )
93
+ return fcntl .ioctl (fd , _TIOCGWINSZ , s )
94
+
95
+ def _set_term_winsz (fd , winsz ):
96
+ fcntl .ioctl (fd , _TIOCSWINSZ , winsz )
63
97
64
98
65
99
# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
@@ -78,6 +112,20 @@ def setUp(self):
78
112
self .addCleanup (signal .alarm , 0 )
79
113
signal .alarm (10 )
80
114
115
+ # Save original stdin window size
116
+ self .stdin_rows = None
117
+ self .stdin_cols = None
118
+ if _HAVE_WINSZ :
119
+ try :
120
+ stdin_dim = os .get_terminal_size (pty .STDIN_FILENO )
121
+ self .stdin_rows = stdin_dim .lines
122
+ self .stdin_cols = stdin_dim .columns
123
+ old_stdin_winsz = struct .pack ("HHHH" , self .stdin_rows ,
124
+ self .stdin_cols , 0 , 0 )
125
+ self .addCleanup (_set_term_winsz , pty .STDIN_FILENO , old_stdin_winsz )
126
+ except OSError :
127
+ pass
128
+
81
129
def handle_sig (self , sig , frame ):
82
130
self .fail ("isatty hung" )
83
131
@@ -86,26 +134,65 @@ def handle_sighup(signum, frame):
86
134
# bpo-38547: if the process is the session leader, os.close(master_fd)
87
135
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
88
136
# signal: just ignore the signal.
137
+ #
138
+ # NOTE: the above comment is from an older version of the test;
139
+ # master_open() is not being used anymore.
89
140
pass
90
141
91
- def test_basic (self ):
142
+ @expectedFailureIfStdinIsTTY
143
+ def test_openpty (self ):
92
144
try :
93
- debug ("Calling master_open()" )
94
- master_fd , slave_name = pty .master_open ()
95
- debug ("Got master_fd '%d', slave_name '%s'" %
96
- (master_fd , slave_name ))
97
- debug ("Calling slave_open(%r)" % (slave_name ,))
98
- slave_fd = pty .slave_open (slave_name )
99
- debug ("Got slave_fd '%d'" % slave_fd )
145
+ mode = tty .tcgetattr (pty .STDIN_FILENO )
146
+ except tty .error :
147
+ # not a tty or bad/closed fd
148
+ debug ("tty.tcgetattr(pty.STDIN_FILENO) failed" )
149
+ mode = None
150
+
151
+ new_stdin_winsz = None
152
+ if self .stdin_rows != None and self .stdin_cols != None :
153
+ try :
154
+ debug ("Setting pty.STDIN_FILENO window size" )
155
+ # Set number of columns and rows to be the
156
+ # floors of 1/5 of respective original values
157
+ target_stdin_winsz = struct .pack ("HHHH" , self .stdin_rows // 5 ,
158
+ self .stdin_cols // 5 , 0 , 0 )
159
+ _set_term_winsz (pty .STDIN_FILENO , target_stdin_winsz )
160
+
161
+ # Were we able to set the window size
162
+ # of pty.STDIN_FILENO successfully?
163
+ new_stdin_winsz = _get_term_winsz (pty .STDIN_FILENO )
164
+ self .assertEqual (new_stdin_winsz , target_stdin_winsz ,
165
+ "pty.STDIN_FILENO window size unchanged" )
166
+ except OSError :
167
+ warnings .warn ("Failed to set pty.STDIN_FILENO window size" )
168
+ pass
169
+
170
+ try :
171
+ debug ("Calling pty.openpty()" )
172
+ try :
173
+ master_fd , slave_fd = pty .openpty (mode , new_stdin_winsz )
174
+ except TypeError :
175
+ master_fd , slave_fd = pty .openpty ()
176
+ debug (f"Got master_fd '{ master_fd } ', slave_fd '{ slave_fd } '" )
100
177
except OSError :
101
178
# " An optional feature could not be imported " ... ?
102
179
raise unittest .SkipTest ("Pseudo-terminals (seemingly) not functional." )
103
180
104
- self .assertTrue (os .isatty (slave_fd ), 'slave_fd is not a tty' )
181
+ self .assertTrue (os .isatty (slave_fd ), "slave_fd is not a tty" )
182
+
183
+ if mode :
184
+ self .assertEqual (tty .tcgetattr (slave_fd ), mode ,
185
+ "openpty() failed to set slave termios" )
186
+ if new_stdin_winsz :
187
+ self .assertEqual (_get_term_winsz (slave_fd ), new_stdin_winsz ,
188
+ "openpty() failed to set slave window size" )
105
189
106
190
# Solaris requires reading the fd before anything is returned.
107
191
# My guess is that since we open and close the slave fd
108
192
# in master_open(), we need to read the EOF.
193
+ #
194
+ # NOTE: the above comment is from an older version of the test;
195
+ # master_open() is not being used anymore.
109
196
110
197
# Ensure the fd is non-blocking in case there's nothing to read.
111
198
blocking = os .get_blocking (master_fd )
@@ -222,8 +309,20 @@ def test_fork(self):
222
309
223
310
os .close (master_fd )
224
311
225
- # pty.fork() passed.
312
+ @expectedFailureOnBSD
313
+ def test_master_read (self ):
314
+ debug ("Calling pty.openpty()" )
315
+ master_fd , slave_fd = pty .openpty ()
316
+ debug (f"Got master_fd '{ master_fd } ', slave_fd '{ slave_fd } '" )
317
+
318
+ debug ("Closing slave_fd" )
319
+ os .close (slave_fd )
226
320
321
+ debug ("Reading from master_fd" )
322
+ with self .assertRaises (OSError ):
323
+ os .read (master_fd , 1 )
324
+
325
+ os .close (master_fd )
227
326
228
327
class SmallPtyTests (unittest .TestCase ):
229
328
"""These tests don't spawn children or hang."""
@@ -262,8 +361,9 @@ def _socketpair(self):
262
361
self .files .extend (socketpair )
263
362
return socketpair
264
363
265
- def _mock_select (self , rfds , wfds , xfds ):
364
+ def _mock_select (self , rfds , wfds , xfds , timeout = 0 ):
266
365
# This will raise IndexError when no more expected calls exist.
366
+ # This ignores the timeout
267
367
self .assertEqual (self .select_rfds_lengths .pop (0 ), len (rfds ))
268
368
return self .select_rfds_results .pop (0 ), [], []
269
369
0 commit comments