|
| 1 | +import io |
1 | 2 | import pickle
|
2 | 3 | import pickletools
|
3 | 4 | from test import support
|
@@ -62,6 +63,315 @@ def test_optimize_binput_and_memoize(self):
|
62 | 63 | self.assertNotIn(pickle.BINPUT, pickled2)
|
63 | 64 |
|
64 | 65 |
|
| 66 | +class SimpleReader: |
| 67 | + def __init__(self, data): |
| 68 | + self.data = data |
| 69 | + self.pos = 0 |
| 70 | + |
| 71 | + def read(self, n): |
| 72 | + data = self.data[self.pos: self.pos + n] |
| 73 | + self.pos += n |
| 74 | + return data |
| 75 | + |
| 76 | + def readline(self): |
| 77 | + nl = self.data.find(b'\n', self.pos) + 1 |
| 78 | + if not nl: |
| 79 | + nl = len(self.data) |
| 80 | + data = self.data[self.pos: nl] |
| 81 | + self.pos = nl |
| 82 | + return data |
| 83 | + |
| 84 | + |
| 85 | +class GenopsTests(unittest.TestCase): |
| 86 | + def test_genops(self): |
| 87 | + it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.') |
| 88 | + self.assertEqual([(item[0].name,) + item[1:] for item in it], [ |
| 89 | + ('MARK', None, 0), |
| 90 | + ('INT', 123, 1), |
| 91 | + ('BININT1', 0x12, 6), |
| 92 | + ('BININT', 0x78563412, 8), |
| 93 | + ('TUPLE', None, 13), |
| 94 | + ('STOP', None, 14), |
| 95 | + ]) |
| 96 | + |
| 97 | + def test_from_file(self): |
| 98 | + f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix') |
| 99 | + self.assertEqual(f.read(6), b'prefix') |
| 100 | + it = pickletools.genops(f) |
| 101 | + self.assertEqual([(item[0].name,) + item[1:] for item in it], [ |
| 102 | + ('MARK', None, 6), |
| 103 | + ('INT', 123, 7), |
| 104 | + ('BININT1', 0x12, 12), |
| 105 | + ('BININT', 0x78563412, 14), |
| 106 | + ('TUPLE', None, 19), |
| 107 | + ('STOP', None, 20), |
| 108 | + ]) |
| 109 | + self.assertEqual(f.read(), b'suffix') |
| 110 | + |
| 111 | + def test_without_pos(self): |
| 112 | + f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.') |
| 113 | + it = pickletools.genops(f) |
| 114 | + self.assertEqual([(item[0].name,) + item[1:] for item in it], [ |
| 115 | + ('MARK', None, None), |
| 116 | + ('INT', 123, None), |
| 117 | + ('BININT1', 0x12, None), |
| 118 | + ('BININT', 0x78563412, None), |
| 119 | + ('TUPLE', None, None), |
| 120 | + ('STOP', None, None), |
| 121 | + ]) |
| 122 | + |
| 123 | + def test_no_stop(self): |
| 124 | + it = pickletools.genops(b'N') |
| 125 | + item = next(it) |
| 126 | + self.assertEqual(item[0].name, 'NONE') |
| 127 | + with self.assertRaisesRegex(ValueError, |
| 128 | + 'pickle exhausted before seeing STOP'): |
| 129 | + next(it) |
| 130 | + |
| 131 | + def test_truncated_data(self): |
| 132 | + it = pickletools.genops(b'I123') |
| 133 | + with self.assertRaisesRegex(ValueError, |
| 134 | + 'no newline found when trying to read stringnl'): |
| 135 | + next(it) |
| 136 | + it = pickletools.genops(b'J\x12\x34') |
| 137 | + with self.assertRaisesRegex(ValueError, |
| 138 | + 'not enough data in stream to read int4'): |
| 139 | + next(it) |
| 140 | + |
| 141 | + def test_unknown_opcode(self): |
| 142 | + it = pickletools.genops(b'N\xff') |
| 143 | + item = next(it) |
| 144 | + self.assertEqual(item[0].name, 'NONE') |
| 145 | + with self.assertRaisesRegex(ValueError, |
| 146 | + r"at position 1, opcode b'\\xff' unknown"): |
| 147 | + next(it) |
| 148 | + |
| 149 | + def test_unknown_opcode_without_pos(self): |
| 150 | + f = SimpleReader(b'N\xff') |
| 151 | + it = pickletools.genops(f) |
| 152 | + item = next(it) |
| 153 | + self.assertEqual(item[0].name, 'NONE') |
| 154 | + with self.assertRaisesRegex(ValueError, |
| 155 | + r"at position <unknown>, opcode b'\\xff' unknown"): |
| 156 | + next(it) |
| 157 | + |
| 158 | + |
| 159 | +class DisTests(unittest.TestCase): |
| 160 | + maxDiff = None |
| 161 | + |
| 162 | + def check_dis(self, data, expected, **kwargs): |
| 163 | + out = io.StringIO() |
| 164 | + pickletools.dis(data, out=out, **kwargs) |
| 165 | + self.assertEqual(out.getvalue(), expected) |
| 166 | + |
| 167 | + def check_dis_error(self, data, expected, expected_error, **kwargs): |
| 168 | + out = io.StringIO() |
| 169 | + with self.assertRaisesRegex(ValueError, expected_error): |
| 170 | + pickletools.dis(data, out=out, **kwargs) |
| 171 | + self.assertEqual(out.getvalue(), expected) |
| 172 | + |
| 173 | + def test_mark(self): |
| 174 | + self.check_dis(b'(N(tl.', '''\ |
| 175 | + 0: ( MARK |
| 176 | + 1: N NONE |
| 177 | + 2: ( MARK |
| 178 | + 3: t TUPLE (MARK at 2) |
| 179 | + 4: l LIST (MARK at 0) |
| 180 | + 5: . STOP |
| 181 | +highest protocol among opcodes = 0 |
| 182 | +''') |
| 183 | + |
| 184 | + def test_indentlevel(self): |
| 185 | + self.check_dis(b'(N(tl.', '''\ |
| 186 | + 0: ( MARK |
| 187 | + 1: N NONE |
| 188 | + 2: ( MARK |
| 189 | + 3: t TUPLE (MARK at 2) |
| 190 | + 4: l LIST (MARK at 0) |
| 191 | + 5: . STOP |
| 192 | +highest protocol among opcodes = 0 |
| 193 | +''', indentlevel=2) |
| 194 | + |
| 195 | + def test_mark_without_pos(self): |
| 196 | + self.check_dis(SimpleReader(b'(N(tl.'), '''\ |
| 197 | +( MARK |
| 198 | +N NONE |
| 199 | +( MARK |
| 200 | +t TUPLE (MARK at unknown opcode offset) |
| 201 | +l LIST (MARK at unknown opcode offset) |
| 202 | +. STOP |
| 203 | +highest protocol among opcodes = 0 |
| 204 | +''') |
| 205 | + |
| 206 | + def test_no_mark(self): |
| 207 | + self.check_dis_error(b'Nt.', '''\ |
| 208 | + 0: N NONE |
| 209 | + 1: t TUPLE no MARK exists on stack |
| 210 | +''', 'no MARK exists on stack') |
| 211 | + |
| 212 | + def test_put(self): |
| 213 | + self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\ |
| 214 | + 0: N NONE |
| 215 | + 1: p PUT 0 |
| 216 | + 4: q BINPUT 1 |
| 217 | + 6: r LONG_BINPUT 2 |
| 218 | + 11: \\x94 MEMOIZE (as 3) |
| 219 | + 12: . STOP |
| 220 | +highest protocol among opcodes = 4 |
| 221 | +''') |
| 222 | + |
| 223 | + def test_put_redefined(self): |
| 224 | + self.check_dis_error(b'Np1\np1\n.', '''\ |
| 225 | + 0: N NONE |
| 226 | + 1: p PUT 1 |
| 227 | + 4: p PUT 1 |
| 228 | +''', 'memo key 1 already defined') |
| 229 | + self.check_dis_error(b'Np1\nq\x01.', '''\ |
| 230 | + 0: N NONE |
| 231 | + 1: p PUT 1 |
| 232 | + 4: q BINPUT 1 |
| 233 | +''', 'memo key 1 already defined') |
| 234 | + self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\ |
| 235 | + 0: N NONE |
| 236 | + 1: p PUT 1 |
| 237 | + 4: r LONG_BINPUT 1 |
| 238 | +''', 'memo key 1 already defined') |
| 239 | + self.check_dis_error(b'Np1\n\x94.', '''\ |
| 240 | + 0: N NONE |
| 241 | + 1: p PUT 1 |
| 242 | + 4: \\x94 MEMOIZE (as 1) |
| 243 | +''', 'memo key None already defined') |
| 244 | + |
| 245 | + def test_put_empty_stack(self): |
| 246 | + self.check_dis_error(b'p0\n', '''\ |
| 247 | + 0: p PUT 0 |
| 248 | +''', "stack is empty -- can't store into memo") |
| 249 | + |
| 250 | + def test_put_markobject(self): |
| 251 | + self.check_dis_error(b'(p0\n', '''\ |
| 252 | + 0: ( MARK |
| 253 | + 1: p PUT 0 |
| 254 | +''', "can't store markobject in the memo") |
| 255 | + |
| 256 | + def test_get(self): |
| 257 | + self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\ |
| 258 | + 0: ( MARK |
| 259 | + 1: N NONE |
| 260 | + 2: p PUT 1 |
| 261 | + 5: g GET 1 |
| 262 | + 8: h BINGET 1 |
| 263 | + 10: j LONG_BINGET 1 |
| 264 | + 15: t TUPLE (MARK at 0) |
| 265 | + 16: . STOP |
| 266 | +highest protocol among opcodes = 1 |
| 267 | +''') |
| 268 | + |
| 269 | + def test_get_without_put(self): |
| 270 | + self.check_dis_error(b'g1\n.', '''\ |
| 271 | + 0: g GET 1 |
| 272 | +''', 'memo key 1 has never been stored into') |
| 273 | + self.check_dis_error(b'h\x01.', '''\ |
| 274 | + 0: h BINGET 1 |
| 275 | +''', 'memo key 1 has never been stored into') |
| 276 | + self.check_dis_error(b'j\x01\x00\x00\x00.', '''\ |
| 277 | + 0: j LONG_BINGET 1 |
| 278 | +''', 'memo key 1 has never been stored into') |
| 279 | + |
| 280 | + def test_memo(self): |
| 281 | + memo = {} |
| 282 | + self.check_dis(b'Np1\n.', '''\ |
| 283 | + 0: N NONE |
| 284 | + 1: p PUT 1 |
| 285 | + 4: . STOP |
| 286 | +highest protocol among opcodes = 0 |
| 287 | +''', memo=memo) |
| 288 | + self.check_dis(b'g1\n.', '''\ |
| 289 | + 0: g GET 1 |
| 290 | + 3: . STOP |
| 291 | +highest protocol among opcodes = 0 |
| 292 | +''', memo=memo) |
| 293 | + |
| 294 | + def test_mark_pop(self): |
| 295 | + self.check_dis(b'(N00N.', '''\ |
| 296 | + 0: ( MARK |
| 297 | + 1: N NONE |
| 298 | + 2: 0 POP |
| 299 | + 3: 0 POP (MARK at 0) |
| 300 | + 4: N NONE |
| 301 | + 5: . STOP |
| 302 | +highest protocol among opcodes = 0 |
| 303 | +''') |
| 304 | + |
| 305 | + def test_too_small_stack(self): |
| 306 | + self.check_dis_error(b'a', '''\ |
| 307 | + 0: a APPEND |
| 308 | +''', 'tries to pop 2 items from stack with only 0 items') |
| 309 | + self.check_dis_error(b']a', '''\ |
| 310 | + 0: ] EMPTY_LIST |
| 311 | + 1: a APPEND |
| 312 | +''', 'tries to pop 2 items from stack with only 1 items') |
| 313 | + |
| 314 | + def test_no_stop(self): |
| 315 | + self.check_dis_error(b'N', '''\ |
| 316 | + 0: N NONE |
| 317 | +''', 'pickle exhausted before seeing STOP') |
| 318 | + |
| 319 | + def test_truncated_data(self): |
| 320 | + self.check_dis_error(b'NI123', '''\ |
| 321 | + 0: N NONE |
| 322 | +''', 'no newline found when trying to read stringnl') |
| 323 | + self.check_dis_error(b'NJ\x12\x34', '''\ |
| 324 | + 0: N NONE |
| 325 | +''', 'not enough data in stream to read int4') |
| 326 | + |
| 327 | + def test_unknown_opcode(self): |
| 328 | + self.check_dis_error(b'N\xff', '''\ |
| 329 | + 0: N NONE |
| 330 | +''', r"at position 1, opcode b'\\xff' unknown") |
| 331 | + |
| 332 | + def test_stop_not_empty_stack(self): |
| 333 | + self.check_dis_error(b']N.', '''\ |
| 334 | + 0: ] EMPTY_LIST |
| 335 | + 1: N NONE |
| 336 | + 2: . STOP |
| 337 | +highest protocol among opcodes = 1 |
| 338 | +''', r'stack not empty after STOP: \[list\]') |
| 339 | + |
| 340 | + def test_annotate(self): |
| 341 | + self.check_dis(b'(Nt.', '''\ |
| 342 | + 0: ( MARK Push markobject onto the stack. |
| 343 | + 1: N NONE Push None on the stack. |
| 344 | + 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. |
| 345 | + 3: . STOP Stop the unpickling machine. |
| 346 | +highest protocol among opcodes = 0 |
| 347 | +''', annotate=1) |
| 348 | + self.check_dis(b'(Nt.', '''\ |
| 349 | + 0: ( MARK Push markobject onto the stack. |
| 350 | + 1: N NONE Push None on the stack. |
| 351 | + 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. |
| 352 | + 3: . STOP Stop the unpickling machine. |
| 353 | +highest protocol among opcodes = 0 |
| 354 | +''', annotate=20) |
| 355 | + self.check_dis(b'(((((((ttttttt.', '''\ |
| 356 | + 0: ( MARK Push markobject onto the stack. |
| 357 | + 1: ( MARK Push markobject onto the stack. |
| 358 | + 2: ( MARK Push markobject onto the stack. |
| 359 | + 3: ( MARK Push markobject onto the stack. |
| 360 | + 4: ( MARK Push markobject onto the stack. |
| 361 | + 5: ( MARK Push markobject onto the stack. |
| 362 | + 6: ( MARK Push markobject onto the stack. |
| 363 | + 7: t TUPLE (MARK at 6) Build a tuple out of the topmost stack slice, after markobject. |
| 364 | + 8: t TUPLE (MARK at 5) Build a tuple out of the topmost stack slice, after markobject. |
| 365 | + 9: t TUPLE (MARK at 4) Build a tuple out of the topmost stack slice, after markobject. |
| 366 | + 10: t TUPLE (MARK at 3) Build a tuple out of the topmost stack slice, after markobject. |
| 367 | + 11: t TUPLE (MARK at 2) Build a tuple out of the topmost stack slice, after markobject. |
| 368 | + 12: t TUPLE (MARK at 1) Build a tuple out of the topmost stack slice, after markobject. |
| 369 | + 13: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject. |
| 370 | + 14: . STOP Stop the unpickling machine. |
| 371 | +highest protocol among opcodes = 0 |
| 372 | +''', annotate=20) |
| 373 | + |
| 374 | + |
65 | 375 | class MiscTestCase(unittest.TestCase):
|
66 | 376 | def test__all__(self):
|
67 | 377 | not_exported = {
|
|
0 commit comments