diff --git a/setuptools/_wheelbuilder.py b/setuptools/_wheelbuilder.py new file mode 100644 index 0000000000..b223edae0b --- /dev/null +++ b/setuptools/_wheelbuilder.py @@ -0,0 +1,180 @@ +import csv +import hashlib +import io +import itertools +import logging +import os +import stat +import time +from base64 import urlsafe_b64encode +from pathlib import Path +from typing import Dict, Iterable, Optional, Tuple, Union +from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo + +from .discovery import _Filter + +_Path = Union[str, Path] +_Timestamp = Tuple[int, int, int, int, int, int] +_StrOrIter = Union[str, Iterable[str]] + +_HASH_ALG = "sha256" +_HASH_BUF_SIZE = 65536 +_MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC +_DEFAULT_COMPRESSION = ZIP_DEFLATED +_WHEEL_VERSION = "1.0" +_META_TEMPLATE = f"""\ +Wheel-Version: {_WHEEL_VERSION} +Generator: {{generator}} +Root-Is-Purelib: {{root_is_purelib}} +""" + +_logger = logging.getLogger(__name__) + + +class WheelBuilder: + """Wrapper around ZipFile that abstracts some aspects of creating a ``.whl`` file + It should be used as a context manager and before closing will add the ``WHEEL`` + and ``RECORD`` files to the end of the archive. + The caller is responsible for writing files/dirs in a suitable order, + (which includes ensuring ``.dist-info`` is written last). + """ + + def __init__( + self, + path: _Path, + root_is_purelib: bool = True, + compression: int = _DEFAULT_COMPRESSION, + generator: Optional[str] = None, + timestamp: Optional[int] = None, + ): + self._path = Path(path) + self._root_is_purelib = root_is_purelib + self._generator = generator + self._compression = compression + self._zip = ZipFile(self._path, "w", compression=compression) + self._records: Dict[str, Tuple[str, int]] = {} + + basename = str(self._path.with_suffix("").name) + parts = basename.split("-") + self._distribution, self._version = parts[:2] + self._tags = parts[-3:] + self._build = parts[2] if len(parts) > 5 else "" + self._dist_info = f"{self._distribution}-{self._version}.dist-info" + self._timestamp = _get_timestamp(timestamp, int(time.time())) + assert len(self._tags), f"Invalid wheel name: {self._path}" + + def __enter__(self) -> "WheelBuilder": + self._zip.__enter__() + _logger.debug(f"creating '{str(self._path)!r}'") + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._add_wheel_meta() + self._save_record() + return self._zip.__exit__(exc_type, exc_value, traceback) + + def _default_generator(self) -> str: + from setuptools.version import __version__ + + return f"setuptools ({__version__})" + + def add_existing_file(self, arcname: str, file: _Path): + """Add a file that already exists in the file system to the wheel.""" + hashsum = hashlib.new(_HASH_ALG) + file_stat = os.stat(file) + zipinfo = ZipInfo(arcname, self._timestamp) + attr = stat.S_IMODE(file_stat.st_mode) | stat.S_IFMT(file_stat.st_mode) + zipinfo.external_attr = attr << 16 + zipinfo.compress_type = self._compression + + with open(file, "rb") as src, self._zip.open(zipinfo, "w") as dst: + while True: + buffer = src.read(_HASH_BUF_SIZE) + if not buffer: + file_size = src.tell() + break + dst.write(buffer) + hashsum.update(buffer) + + _logger.debug(f"adding {str(arcname)!r} [{attr:o}]") + hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=') + self._records[arcname] = (hash_digest, file_size) + + def add_tree( + self, path: _Path, prefix: Optional[str] = None, exclude: Iterable[str] = () + ): + """ + Add the file tree **UNDER** ``path`` to the wheel file (does not include + the parent directory itself). + You can use ``prefix`` to create a new parent directory. + """ + should_exclude = _Filter(*exclude) + for root, dirs, files in os.walk(path): + # Use sorted to improve determinism. + dirs[:] = [x for x in sorted(dirs) if x != "__pycache__"] + for name in sorted(files): + file = os.path.normpath(os.path.join(root, name)) + arcname = os.path.relpath(file, path).replace(os.path.sep, "/") + if not os.path.isfile(file) or should_exclude(arcname): + continue + if prefix: + arcname = os.path.join(prefix, arcname) + self.add_existing_file(arcname, file) + + def new_file(self, arcname: str, contents: _StrOrIter, permissions: int = 0o664): + """ + Create a new entry in the wheel named ``arcname`` that contains + the UTF-8 text specified by ``contents``. + """ + zipinfo = ZipInfo(arcname, self._timestamp) + zipinfo.external_attr = (permissions | stat.S_IFREG) << 16 + zipinfo.compress_type = self._compression + hashsum = hashlib.new(_HASH_ALG) + file_size = 0 + iter_contents = [contents] if isinstance(contents, str) else contents + with self._zip.open(zipinfo, "w") as fp: + for part in iter_contents: + bpart = bytes(part, "utf-8") + file_size += fp.write(bpart) + hashsum.update(bpart) + hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=') + self._records[arcname] = (hash_digest, file_size) + + def _save_record(self): + arcname = f"{self._dist_info}/RECORD" + zipinfo = ZipInfo(arcname, self._timestamp) + zipinfo.external_attr = (0o664 | stat.S_IFREG) << 16 + zipinfo.compress_type = self._compression + out = self._zip.open(zipinfo, "w") + buf = io.TextIOWrapper(out, encoding="utf-8") + with out, buf: + writer = csv.writer(buf, delimiter=",", quotechar='"', lineterminator="\n") + for file, (hash_digest, size) in self._records.items(): + writer.writerow((file, f"{_HASH_ALG}={hash_digest}", size)) + writer.writerow((arcname, "", "")) + + def _add_wheel_meta(self): + arcname = f"{self._dist_info}/WHEEL" + beginning = _META_TEMPLATE.format( + generator=self._generator or self._default_generator(), + root_is_purelib=self._root_is_purelib, + ) + impl_tag, abi_tag, plat_tag = self._tags + tags = ( + f"Tag: {impl}-{abi}-{plat}\n" + for impl in impl_tag.split(".") + for abi in abi_tag.split(".") + for plat in plat_tag.split(".") + ) + build = [f"Build: {self._build}\n"] if self._build else [] + contents = itertools.chain([beginning], tags, build) + self.new_file(arcname, contents) + + +def _get_timestamp( + given: Optional[int] = None, + fallback: int = _MINIMUM_TIMESTAMP, +) -> _Timestamp: + timestamp = given or int(os.environ.get("SOURCE_DATE_EPOCH", fallback)) + timestamp = max(timestamp, _MINIMUM_TIMESTAMP) + return time.gmtime(timestamp)[0:6] diff --git a/setuptools/tests/test_wheelbuilder.py b/setuptools/tests/test_wheelbuilder.py new file mode 100644 index 0000000000..046fccc486 --- /dev/null +++ b/setuptools/tests/test_wheelbuilder.py @@ -0,0 +1,86 @@ +# This test is based on the `test_wheelfile.py` from pypa/wheel, +# which was initially distributed under the MIT License: +# Copyright (c) 2012 Daniel Holth and contributors +import stat +import sys +import textwrap +from zipfile import ZipFile, ZIP_DEFLATED + +import pytest + +from setuptools._wheelbuilder import WheelBuilder + + +def test_write_str(tmp_path): + with WheelBuilder(tmp_path / "test-1.0-py3-none-any.whl") as builder: + builder.new_file("hello/héllö.py", 'print("Héllö, world!")\n') + builder.new_file("hello/h,ll,.py", 'print("Héllö, world!")\n') + + with ZipFile(tmp_path / "test-1.0-py3-none-any.whl", "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 4 # RECORD + WHEEL + assert infolist[0].filename == "hello/héllö.py" + assert infolist[0].file_size == 25 + assert infolist[1].filename == "hello/h,ll,.py" + assert infolist[1].file_size == 25 + assert infolist[2].filename == "test-1.0.dist-info/WHEEL" + assert infolist[3].filename == "test-1.0.dist-info/RECORD" + + record = "\n".join( + line + for line in str(zf.read("test-1.0.dist-info/RECORD"), "utf-8").splitlines() + if not line.startswith("test-1.0.dist-info/WHEEL") + # Avoid changes in setuptools versions messing with the test + ) + + expected = """\ + hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25 + "hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25 + test-1.0.dist-info/RECORD,, + """ + assert record.strip() == textwrap.dedent(expected).strip() + + +def test_timestamp(tmp_path_factory, tmp_path, monkeypatch): + build_dir = tmp_path_factory.mktemp("build") + for filename in ("one", "two", "three"): + (build_dir / filename).write_text(filename + "\n", encoding="utf-8") + + # The earliest date representable in TarInfos, 1980-01-01 + monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060") + + wheel_path = tmp_path / "test-1.0-py3-none-any.whl" + with WheelBuilder(wheel_path) as builder: + builder.add_tree(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for info in zf.infolist(): + assert info.date_time[:3] == (1980, 1, 1) + assert info.compress_type == ZIP_DEFLATED + + +@pytest.mark.skipif( + sys.platform == "win32", reason="Windows does not support UNIX-like permissions" +) +def test_attributes(tmp_path_factory, tmp_path): + # With the change from ZipFile.write() to .writestr(), we need to manually + # set member attributes. + build_dir = tmp_path_factory.mktemp("build") + files = (("foo", 0o644), ("bar", 0o755)) + for filename, mode in files: + path = build_dir / filename + path.write_text(filename + "\n", encoding="utf-8") + path.chmod(mode) + + wheel_path = tmp_path / "test-1.0-py3-none-any.whl" + with WheelBuilder(wheel_path) as builder: + builder.add_tree(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for filename, mode in files: + info = zf.getinfo(filename) + assert info.external_attr == (mode | stat.S_IFREG) << 16 + assert info.compress_type == ZIP_DEFLATED + + info = zf.getinfo("test-1.0.dist-info/RECORD") + assert info.external_attr == (0o664 | stat.S_IFREG) << 16