Skip to content

gh-63014: Add utilites to transform surrogate codes and astral characters #121219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions Lib/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,123 @@ def make_encoding_map(decoding_map):
m[v] = None
return m

_surrogates_re = None

def rehandle_surrogatepass(string, errors):
handler = None
global _surrogates_re
if not _surrogates_re:
import re
_surrogates_re = re.compile('[\ud800-\uefff]+')
pos = 0
res = []
while True:
m = _surrogates_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
res.append(string[pos: m.start()])
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
'lone surrogates'))
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]

def rehandle_surrogateescape(string, errors):
handler = None
global _surrogates_re
if not _surrogates_re:
import re
_surrogates_re = re.compile('[\ud800-\uefff]+')
pos = 0
res = []
while True:
m = _surrogates_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
start = m.start()
res.append(string[pos: start])
try:
baddata = string[start: m.end()].encode('ascii', 'surrogateescape')
except UnicodeEncodeError as err:
raise UnicodeTranslateError(string,
err.start + start,err.end + start,
r'surrogates not in range \udc80-\udcff') from None
try:
repl, pos = handler(UnicodeDecodeError('unicode', baddata,
0, len(baddata),
'lone surrogates'))
except UnicodeDecodeError as err:
raise UnicodeTranslateError(string,
err.start + start,
err.end + start,
err.reason) from None
pos += start
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]

_astral_re = None

def handle_astrals(string, errors):
handler = None
global _astral_re
if not _astral_re:
import re
_astral_re = re.compile(r'[^\u0000-\uffff]+')
pos = 0
res = []
while True:
m = _astral_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
res.append(string[pos: m.start()])
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
'astral characters'))
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]

def _decompose_astral(match):
res = []
for c in match.group():
k = ord(c) - 0x10000
res.append('%c%c' % (0xd800 + (k >> 10), 0xdc00 + (k & 0x3ff)))
return ''.join(res)

def decompose_astrals(string):
global _astral_re
if not _astral_re:
import re
_astral_re = re.compile(r'[^\u0000-\uffff]+')
return _astral_re.sub(_decompose_astral, string)

_surrogate_pair_re = None

def _compose_surrogate_pair(match):
hi, lo = match.group()
hi = ord(hi) - 0xd800
lo = ord(lo) - 0xdc00
return chr(0x10000 + (hi << 10) + lo)

def compose_surrogate_pairs(string):
global _surrogate_pair_re
if not _surrogate_pair_re:
import re
_surrogate_pair_re = re.compile(r'[\ud800-\udbff][\udc00-\udfff]')
return _surrogate_pair_re.sub(_compose_surrogate_pair, string)

### error handlers

try:
Expand Down
137 changes: 137 additions & 0 deletions Lib/test/test_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,143 @@ def test_pickle(self):
self.assertFalse(unpickled_codec_info._is_text_encoding)


def test_rehandle_surrogatepass(self):
self.assertRaises(TypeError, codecs.rehandle_surrogatepass)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\U00010480\U0001d4ff'):
with self.subTest(str=s):
self.assertEqual(codecs.rehandle_surrogatepass(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.rehandle_surrogatepass('a\ud801\udc80b', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\ud801\udc80b')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\ud801', '\\udc80')),
# ('namereplace', ('\\ud801', '\\udc80')),
# ('xmlcharrefreplace', ('&#55297;', '&#56448;')),
# ('surrogatepass', ('\ud801', '\udc80')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\ud801', '\udc80')
expected = tmpl.format(*args)
with self.subTest(error=error, data=data):
self.assertEqual(codecs.rehandle_surrogatepass(data, error),
expected)

def test_rehandle_surrogateescape(self):
self.assertRaises(TypeError, codecs.rehandle_surrogateescape)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\U00010480\U0001d4ff'):
with self.subTest(str=s):
self.assertEqual(codecs.rehandle_surrogateescape(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.rehandle_surrogateescape('a\udc80\udcffb', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\udc80\udcffb')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
with self.assertRaises(TypeError):
codecs.rehandle_surrogateescape('a\udc80b', 'namereplace')
with self.assertRaises(TypeError):
codecs.rehandle_surrogateescape('a\udc80b', 'xmlcharrefreplace')
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('a\udc80b', 'surrogatepass')
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\x80','\\xff')),
('surrogateescape', ('\udc80','\udcff')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\udc80', '\udcff')
expected = tmpl.format(*args)
if error == 'replace':
expected = expected.replace('\ufffd\ufffd', '\ufffd')
with self.subTest(error=error, data=data):
self.assertEqual(codecs.rehandle_surrogateescape(data, error),
expected)
for error in ('strict', 'ignore', 'replace',
'backslashreplace', 'namereplace', 'xmlcharrefreplace',
'surrogatepass', 'surrogateescape'):
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('\udc7f', error)
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('\udd00', error)

def test_handle_astrals(self):
self.assertRaises(TypeError, codecs.handle_astrals)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\ud801\udc80', '\udc80'):
with self.subTest(str=s):
self.assertEqual(codecs.handle_astrals(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.handle_astrals('a\U00010480\U0001d4ffb', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\U00010480\U0001d4ffb')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
# with self.assertRaises(UnicodeTranslateError):
# codecs.handle_astrals('a\U00010480b', 'surrogatepass')
with self.assertRaises(TypeError):
codecs.handle_astrals('a\U00010480b', 'surrogateescape')
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\U00010480', '\\U0001d4ff')),
# ('namereplace', ('\\N{OSMANYA LETTER ALEF}',
# '\\N{MATHEMATICAL BOLD SCRIPT SMALL V}')),
# ('xmlcharrefreplace', ('&#66688;','&#120063;')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\U00010480', '\U0001d4ff')
expected = tmpl.format(*args)
with self.subTest(error=error, data=data):
self.assertEqual(codecs.handle_astrals(data, error),
expected)

def test_decompose_astrals(self):
self.assertRaises(TypeError, codecs.decompose_astrals)
tests = [
('abc', 'abc'),
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
('a\U00010480b', 'a\ud801\udc80b'),
('a\U00010480b\U0001d4ff', 'a\ud801\udc80b\ud835\udcff'),
('a\U00010480\U0001d4ffc', 'a\ud801\udc80\ud835\udcffc'),
('a\U00010480b\U0001d4ffc', 'a\ud801\udc80b\ud835\udcffc'),
('a\ud801\udc80b', 'a\ud801\udc80b'),
('a\udc80b', 'a\udc80b'),
]
for s, r in tests:
with self.subTest(str=s):
self.assertEqual(codecs.decompose_astrals(s), r)

def test_compose_surrogate_pairs(self):
self.assertRaises(TypeError, codecs.compose_surrogate_pairs)
tests = [
('abc', 'abc'),
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
('a\ud801\udc80b', 'a\U00010480b'),
('a\ud801\udc80b\ud835\udcff', 'a\U00010480b\U0001d4ff'),
('a\ud801\udc80\ud835\udcffc', 'a\U00010480\U0001d4ffc'),
('a\ud801\udc80b\ud835\udcffc', 'a\U00010480b\U0001d4ffc'),
('a\udc80\ud801\ud801\udc80b', 'a\udc80\ud801\U00010480b'),
('a\ud801\udc80\udc80\ud801b', 'a\U00010480\udc80\ud801b'),
('a\udc80b', 'a\udc80b'),
]
for s, r in tests:
with self.subTest(str=s):
self.assertEqual(codecs.compose_surrogate_pairs(s), r)


class StreamReaderTest(unittest.TestCase):

def setUp(self):
Expand Down
Loading