Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3128.change.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
In deprecated easy_install, reload and merge the pth file before saving.
94 changes: 61 additions & 33 deletions setuptools/command/easy_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -1567,50 +1567,79 @@ def get_exe_prefixes(exe_filename):
class PthDistributions(Environment):
"""A .pth file with Distribution paths in it"""

dirty = False

def __init__(self, filename, sitedirs=()):
self.filename = filename
self.sitedirs = list(map(normalize_path, sitedirs))
self.basedir = normalize_path(os.path.dirname(self.filename))
self._load()
self.paths, self.dirty = self._load()
# keep a copy if someone manually updates the paths attribute on the instance
self._init_paths = self.paths[:]
super().__init__([], None, None)
for path in yield_lines(self.paths):
list(map(self.add, find_distributions(path, True)))

def _load(self):
self.paths = []
saw_import = False
def _load_raw(self):
paths = []
dirty = saw_import = False
seen = dict.fromkeys(self.sitedirs)
if os.path.isfile(self.filename):
f = open(self.filename, 'rt')
for line in f:
if line.startswith('import'):
saw_import = True
continue
path = line.rstrip()
self.paths.append(path)
if not path.strip() or path.strip().startswith('#'):
continue
# skip non-existent paths, in case somebody deleted a package
# manually, and duplicate paths as well
path = self.paths[-1] = normalize_path(
os.path.join(self.basedir, path)
)
if not os.path.exists(path) or path in seen:
self.paths.pop() # skip it
self.dirty = True # we cleaned up, so we're dirty now :)
continue
seen[path] = 1
f.close()
f = open(self.filename, 'rt')
for line in f:
path = line.rstrip()
# still keep imports and empty/commented lines for formatting
paths.append(path)
if line.startswith(('import ', 'from ')):
saw_import = True
continue
stripped_path = path.strip()
if not stripped_path or stripped_path.startswith('#'):
continue
# skip non-existent paths, in case somebody deleted a package
# manually, and duplicate paths as well
normalized_path = normalize_path(os.path.join(self.basedir, path))
if normalized_path in seen or not os.path.exists(normalized_path):
log.debug("cleaned up dirty or duplicated %r", path)
dirty = True
paths.pop()
continue
seen[normalized_path] = 1
f.close()
# remove any trailing empty/blank line
while paths and not paths[-1].strip():
paths.pop()
dirty = True
return paths, dirty or (paths and saw_import)

if self.paths and not saw_import:
self.dirty = True # ensure anything we touch has import wrappers
while self.paths and not self.paths[-1].strip():
self.paths.pop()
def _load(self):
if os.path.isfile(self.filename):
return self._load_raw()
return [], False

def save(self):
"""Write changed .pth file back to disk"""
# first reload the file
last_paths, last_dirty = self._load()
# and check that there are no difference with what we have.
# there can be difference if someone else has written to the file
# since we first loaded it.
# we don't want to lose the eventual new paths added since then.
for path in last_paths[:]:
if path not in self.paths:
self.paths.append(path)
log.info("detected new path %r", path)
last_dirty = True
else:
last_paths.remove(path)
# also, re-check that all paths are still valid before saving them
for path in self.paths[:]:
if path not in last_paths \
and not path.startswith(('import ', 'from ', '#')):
absolute_path = os.path.join(self.basedir, path)
if not os.path.exists(absolute_path):
self.paths.remove(path)
log.info("removing now non-existent path %r", path)
last_dirty = True

self.dirty |= last_dirty or self.paths != self._init_paths
if not self.dirty:
return

Expand All @@ -1619,17 +1648,16 @@ def save(self):
log.debug("Saving %s", self.filename)
lines = self._wrap_lines(rel_paths)
data = '\n'.join(lines) + '\n'

if os.path.islink(self.filename):
os.unlink(self.filename)
with open(self.filename, 'wt') as f:
f.write(data)

elif os.path.exists(self.filename):
log.debug("Deleting empty %s", self.filename)
os.unlink(self.filename)

self.dirty = False
self._init_paths[:] = self.paths[:]

@staticmethod
def _wrap_lines(lines):
Expand Down
43 changes: 43 additions & 0 deletions setuptools/tests/test_easy_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,49 @@ def test_add_from_site_is_ignored(self):
pth.add(PRDistribution(location))
assert not pth.dirty

def test_many_pth_distributions_merge_together(self, tmpdir):
"""
If the pth file is modified under the hood, then PthDistribution
will refresh its content before saving, merging contents when
necessary.
"""
# putting the pth file in a dedicated sub-folder,
pth_subdir = tmpdir.join("pth_subdir")
pth_subdir.mkdir()
pth_path = str(pth_subdir.join("file1.pth"))
pth1 = PthDistributions(pth_path)
pth2 = PthDistributions(pth_path)
assert (
pth1.paths == pth2.paths == []
), "unless there would be some default added at some point"
# and so putting the src_subdir in folder distinct than the pth one,
# so to keep it absolute by PthDistributions
new_src_path = tmpdir.join("src_subdir")
new_src_path.mkdir() # must exist to be accounted
new_src_path_str = str(new_src_path)
pth1.paths.append(new_src_path_str)
pth1.save()
assert (
pth1.paths
), "the new_src_path added must still be present/valid in pth1 after save"
# now,
assert (
new_src_path_str not in pth2.paths
), "right before we save the entry should still not be present"
pth2.save()
assert (
new_src_path_str in pth2.paths
), "the new_src_path entry should have been added by pth2 with its save() call"
assert pth2.paths[-1] == new_src_path, (
"and it should match exactly on the last entry actually "
"given we append to it in save()"
)
# finally,
assert PthDistributions(pth_path).paths == pth2.paths, (
"and we should have the exact same list at the end "
"with a fresh PthDistributions instance"
)


@pytest.fixture
def setup_context(tmpdir):
Expand Down