diff --git a/Lib/codecs.py b/Lib/codecs.py index a887e5d4c94a38..e7d06054aa27fb 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -1107,6 +1107,123 @@ def make_encoding_map(decoding_map): m[v] = None return m +_surrogates_re = None + +def rehandle_surrogatepass(string, errors): + handler = None + global _surrogates_re + if not _surrogates_re: + import re + _surrogates_re = re.compile('[\ud800-\uefff]+') + pos = 0 + res = [] + while True: + m = _surrogates_re.search(string, pos) + if m: + if handler is None: + handler = lookup_error(errors) + res.append(string[pos: m.start()]) + repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(), + 'lone surrogates')) + res.append(repl) + elif pos: + res.append(string[pos:]) + return ''.join(res) + else: + return string[:] + +def rehandle_surrogateescape(string, errors): + handler = None + global _surrogates_re + if not _surrogates_re: + import re + _surrogates_re = re.compile('[\ud800-\uefff]+') + pos = 0 + res = [] + while True: + m = _surrogates_re.search(string, pos) + if m: + if handler is None: + handler = lookup_error(errors) + start = m.start() + res.append(string[pos: start]) + try: + baddata = string[start: m.end()].encode('ascii', 'surrogateescape') + except UnicodeEncodeError as err: + raise UnicodeTranslateError(string, + err.start + start,err.end + start, + r'surrogates not in range \udc80-\udcff') from None + try: + repl, pos = handler(UnicodeDecodeError('unicode', baddata, + 0, len(baddata), + 'lone surrogates')) + except UnicodeDecodeError as err: + raise UnicodeTranslateError(string, + err.start + start, + err.end + start, + err.reason) from None + pos += start + res.append(repl) + elif pos: + res.append(string[pos:]) + return ''.join(res) + else: + return string[:] + +_astral_re = None + +def handle_astrals(string, errors): + handler = None + global _astral_re + if not _astral_re: + import re + _astral_re = re.compile(r'[^\u0000-\uffff]+') + pos = 0 + res = [] + while True: + m = _astral_re.search(string, pos) + if m: + if handler is None: + handler = lookup_error(errors) + res.append(string[pos: m.start()]) + repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(), + 'astral characters')) + res.append(repl) + elif pos: + res.append(string[pos:]) + return ''.join(res) + else: + return string[:] + +def _decompose_astral(match): + res = [] + for c in match.group(): + k = ord(c) - 0x10000 + res.append('%c%c' % (0xd800 + (k >> 10), 0xdc00 + (k & 0x3ff))) + return ''.join(res) + +def decompose_astrals(string): + global _astral_re + if not _astral_re: + import re + _astral_re = re.compile(r'[^\u0000-\uffff]+') + return _astral_re.sub(_decompose_astral, string) + +_surrogate_pair_re = None + +def _compose_surrogate_pair(match): + hi, lo = match.group() + hi = ord(hi) - 0xd800 + lo = ord(lo) - 0xdc00 + return chr(0x10000 + (hi << 10) + lo) + +def compose_surrogate_pairs(string): + global _surrogate_pair_re + if not _surrogate_pair_re: + import re + _surrogate_pair_re = re.compile(r'[\ud800-\udbff][\udc00-\udfff]') + return _surrogate_pair_re.sub(_compose_surrogate_pair, string) + ### error handlers try: diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 428036e1765b8f..ac2ce440ecba9d 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1945,6 +1945,143 @@ def test_pickle(self): self.assertFalse(unpickled_codec_info._is_text_encoding) + def test_rehandle_surrogatepass(self): + self.assertRaises(TypeError, codecs.rehandle_surrogatepass) + for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3', + '\U00010480\U0001d4ff'): + with self.subTest(str=s): + self.assertEqual(codecs.rehandle_surrogatepass(s, 'strict'), s) + with self.assertRaises(UnicodeTranslateError) as cm: + codecs.rehandle_surrogatepass('a\ud801\udc80b', 'strict') + self.assertEqual(cm.exception.encoding, None) + self.assertEqual(cm.exception.object, 'a\ud801\udc80b') + self.assertEqual(cm.exception.start, 1) + self.assertEqual(cm.exception.end, 3) + tests = [ + ('ignore', ('', '')), + ('replace', ('\ufffd','\ufffd')), + ('backslashreplace', ('\\ud801', '\\udc80')), + # ('namereplace', ('\\ud801', '\\udc80')), + # ('xmlcharrefreplace', ('�', '�')), + # ('surrogatepass', ('\ud801', '\udc80')), + ] + for (error, args) in tests: + for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'): + data = tmpl.format('\ud801', '\udc80') + expected = tmpl.format(*args) + with self.subTest(error=error, data=data): + self.assertEqual(codecs.rehandle_surrogatepass(data, error), + expected) + + def test_rehandle_surrogateescape(self): + self.assertRaises(TypeError, codecs.rehandle_surrogateescape) + for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3', + '\U00010480\U0001d4ff'): + with self.subTest(str=s): + self.assertEqual(codecs.rehandle_surrogateescape(s, 'strict'), s) + with self.assertRaises(UnicodeTranslateError) as cm: + codecs.rehandle_surrogateescape('a\udc80\udcffb', 'strict') + self.assertEqual(cm.exception.encoding, None) + self.assertEqual(cm.exception.object, 'a\udc80\udcffb') + self.assertEqual(cm.exception.start, 1) + self.assertEqual(cm.exception.end, 3) + with self.assertRaises(TypeError): + codecs.rehandle_surrogateescape('a\udc80b', 'namereplace') + with self.assertRaises(TypeError): + codecs.rehandle_surrogateescape('a\udc80b', 'xmlcharrefreplace') + with self.assertRaises(UnicodeTranslateError): + codecs.rehandle_surrogateescape('a\udc80b', 'surrogatepass') + tests = [ + ('ignore', ('', '')), + ('replace', ('\ufffd','\ufffd')), + ('backslashreplace', ('\\x80','\\xff')), + ('surrogateescape', ('\udc80','\udcff')), + ] + for (error, args) in tests: + for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'): + data = tmpl.format('\udc80', '\udcff') + expected = tmpl.format(*args) + if error == 'replace': + expected = expected.replace('\ufffd\ufffd', '\ufffd') + with self.subTest(error=error, data=data): + self.assertEqual(codecs.rehandle_surrogateescape(data, error), + expected) + for error in ('strict', 'ignore', 'replace', + 'backslashreplace', 'namereplace', 'xmlcharrefreplace', + 'surrogatepass', 'surrogateescape'): + with self.assertRaises(UnicodeTranslateError): + codecs.rehandle_surrogateescape('\udc7f', error) + with self.assertRaises(UnicodeTranslateError): + codecs.rehandle_surrogateescape('\udd00', error) + + def test_handle_astrals(self): + self.assertRaises(TypeError, codecs.handle_astrals) + for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3', + '\ud801\udc80', '\udc80'): + with self.subTest(str=s): + self.assertEqual(codecs.handle_astrals(s, 'strict'), s) + with self.assertRaises(UnicodeTranslateError) as cm: + codecs.handle_astrals('a\U00010480\U0001d4ffb', 'strict') + self.assertEqual(cm.exception.encoding, None) + self.assertEqual(cm.exception.object, 'a\U00010480\U0001d4ffb') + self.assertEqual(cm.exception.start, 1) + self.assertEqual(cm.exception.end, 3) + # with self.assertRaises(UnicodeTranslateError): + # codecs.handle_astrals('a\U00010480b', 'surrogatepass') + with self.assertRaises(TypeError): + codecs.handle_astrals('a\U00010480b', 'surrogateescape') + tests = [ + ('ignore', ('', '')), + ('replace', ('\ufffd','\ufffd')), + ('backslashreplace', ('\\U00010480', '\\U0001d4ff')), + # ('namereplace', ('\\N{OSMANYA LETTER ALEF}', + # '\\N{MATHEMATICAL BOLD SCRIPT SMALL V}')), + # ('xmlcharrefreplace', ('𐒀','𝓿')), + ] + for (error, args) in tests: + for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'): + data = tmpl.format('\U00010480', '\U0001d4ff') + expected = tmpl.format(*args) + with self.subTest(error=error, data=data): + self.assertEqual(codecs.handle_astrals(data, error), + expected) + + def test_decompose_astrals(self): + self.assertRaises(TypeError, codecs.decompose_astrals) + tests = [ + ('abc', 'abc'), + ('\xe0\xdf\xe7', '\xe0\xdf\xe7'), + ('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'), + ('a\U00010480b', 'a\ud801\udc80b'), + ('a\U00010480b\U0001d4ff', 'a\ud801\udc80b\ud835\udcff'), + ('a\U00010480\U0001d4ffc', 'a\ud801\udc80\ud835\udcffc'), + ('a\U00010480b\U0001d4ffc', 'a\ud801\udc80b\ud835\udcffc'), + ('a\ud801\udc80b', 'a\ud801\udc80b'), + ('a\udc80b', 'a\udc80b'), + ] + for s, r in tests: + with self.subTest(str=s): + self.assertEqual(codecs.decompose_astrals(s), r) + + def test_compose_surrogate_pairs(self): + self.assertRaises(TypeError, codecs.compose_surrogate_pairs) + tests = [ + ('abc', 'abc'), + ('\xe0\xdf\xe7', '\xe0\xdf\xe7'), + ('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'), + ('a\ud801\udc80b', 'a\U00010480b'), + ('a\ud801\udc80b\ud835\udcff', 'a\U00010480b\U0001d4ff'), + ('a\ud801\udc80\ud835\udcffc', 'a\U00010480\U0001d4ffc'), + ('a\ud801\udc80b\ud835\udcffc', 'a\U00010480b\U0001d4ffc'), + ('a\udc80\ud801\ud801\udc80b', 'a\udc80\ud801\U00010480b'), + ('a\ud801\udc80\udc80\ud801b', 'a\U00010480\udc80\ud801b'), + ('a\udc80b', 'a\udc80b'), + ] + for s, r in tests: + with self.subTest(str=s): + self.assertEqual(codecs.compose_surrogate_pairs(s), r) + + class StreamReaderTest(unittest.TestCase): def setUp(self):