|
1 | 1 | import array
|
| 2 | +import os |
| 3 | +import struct |
| 4 | +import threading |
2 | 5 | import unittest
|
3 | 6 | from test.support import get_attribute
|
| 7 | +from test.support import threading_helper |
4 | 8 | from test.support.import_helper import import_module
|
5 |
| -import os, struct |
6 | 9 | fcntl = import_module('fcntl')
|
7 | 10 | termios = import_module('termios')
|
8 |
| -get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature |
9 |
| - |
10 |
| -try: |
11 |
| - tty = open("/dev/tty", "rb") |
12 |
| -except OSError: |
13 |
| - raise unittest.SkipTest("Unable to open /dev/tty") |
14 |
| -else: |
15 |
| - with tty: |
16 |
| - # Skip if another process is in foreground |
17 |
| - r = fcntl.ioctl(tty, termios.TIOCGPGRP, struct.pack("i", 0)) |
18 |
| - rpgrp = struct.unpack("i", r)[0] |
19 |
| - if rpgrp not in (os.getpgrp(), os.getsid(0)): |
20 |
| - raise unittest.SkipTest("Neither the process group nor the session " |
21 |
| - "are attached to /dev/tty") |
22 |
| - del tty, r, rpgrp |
23 | 11 |
|
24 | 12 | try:
|
25 | 13 | import pty
|
26 | 14 | except ImportError:
|
27 | 15 | pty = None
|
28 | 16 |
|
29 |
| -class IoctlTests(unittest.TestCase): |
| 17 | +class IoctlTestsTty(unittest.TestCase): |
| 18 | + @classmethod |
| 19 | + def setUpClass(cls): |
| 20 | + TIOCGPGRP = get_attribute(termios, 'TIOCGPGRP') |
| 21 | + try: |
| 22 | + tty = open("/dev/tty", "rb") |
| 23 | + except OSError: |
| 24 | + raise unittest.SkipTest("Unable to open /dev/tty") |
| 25 | + with tty: |
| 26 | + # Skip if another process is in foreground |
| 27 | + r = fcntl.ioctl(tty, TIOCGPGRP, struct.pack("i", 0)) |
| 28 | + rpgrp = struct.unpack("i", r)[0] |
| 29 | + if rpgrp not in (os.getpgrp(), os.getsid(0)): |
| 30 | + raise unittest.SkipTest("Neither the process group nor the session " |
| 31 | + "are attached to /dev/tty") |
| 32 | + |
30 | 33 | def test_ioctl_immutable_buf(self):
|
31 | 34 | # If this process has been put into the background, TIOCGPGRP returns
|
32 | 35 | # the session ID instead of the process group id.
|
@@ -132,31 +135,34 @@ def test_ioctl_mutate_2048(self):
|
132 | 135 | self._check_ioctl_mutate_len(2048)
|
133 | 136 | self.assertRaises(ValueError, self._check_ioctl_not_mutate_len, 2048)
|
134 | 137 |
|
| 138 | + |
| 139 | +@unittest.skipIf(pty is None, 'pty module required') |
| 140 | +class IoctlTestsPty(unittest.TestCase): |
| 141 | + def setUp(self): |
| 142 | + self.master_fd, self.slave_fd = pty.openpty() |
| 143 | + self.addCleanup(os.close, self.slave_fd) |
| 144 | + self.addCleanup(os.close, self.master_fd) |
| 145 | + |
| 146 | + @unittest.skipUnless(hasattr(termios, 'TCFLSH'), 'requires termios.TCFLSH') |
135 | 147 | def test_ioctl_tcflush(self):
|
136 |
| - with open("/dev/tty", "rb") as tty: |
137 |
| - r = fcntl.ioctl(tty, termios.TCFLSH, termios.TCIFLUSH) |
138 |
| - self.assertEqual(r, 0) |
| 148 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCIFLUSH) |
| 149 | + self.assertEqual(r, 0) |
| 150 | + r = fcntl.ioctl(self.slave_fd, termios.TCFLSH, termios.TCOFLUSH) |
| 151 | + self.assertEqual(r, 0) |
139 | 152 |
|
140 | 153 | def test_ioctl_signed_unsigned_code_param(self):
|
141 |
| - if not pty: |
142 |
| - raise unittest.SkipTest('pty module required') |
143 |
| - mfd, sfd = pty.openpty() |
144 |
| - try: |
145 |
| - if termios.TIOCSWINSZ < 0: |
146 |
| - set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ |
147 |
| - set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff |
148 |
| - else: |
149 |
| - set_winsz_opcode_pos = termios.TIOCSWINSZ |
150 |
| - set_winsz_opcode_maybe_neg, = struct.unpack("i", |
151 |
| - struct.pack("I", termios.TIOCSWINSZ)) |
152 |
| - |
153 |
| - our_winsz = struct.pack("HHHH",80,25,0,0) |
154 |
| - # test both with a positive and potentially negative ioctl code |
155 |
| - new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz) |
156 |
| - new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz) |
157 |
| - finally: |
158 |
| - os.close(mfd) |
159 |
| - os.close(sfd) |
| 154 | + if termios.TIOCSWINSZ < 0: |
| 155 | + set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ |
| 156 | + set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff |
| 157 | + else: |
| 158 | + set_winsz_opcode_pos = termios.TIOCSWINSZ |
| 159 | + set_winsz_opcode_maybe_neg, = struct.unpack("i", |
| 160 | + struct.pack("I", termios.TIOCSWINSZ)) |
| 161 | + |
| 162 | + our_winsz = struct.pack("HHHH",80,25,0,0) |
| 163 | + # test both with a positive and potentially negative ioctl code |
| 164 | + new_winsz = fcntl.ioctl(self.master_fd, set_winsz_opcode_pos, our_winsz) |
| 165 | + new_winsz = fcntl.ioctl(self.master_fd, set_winsz_opcode_maybe_neg, our_winsz) |
160 | 166 |
|
161 | 167 |
|
162 | 168 | if __name__ == "__main__":
|
|
0 commit comments