Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions setuptools/_wheelbuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import csv
import hashlib
import io
import itertools
import logging
import os
import stat
import time
from base64 import urlsafe_b64encode
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple, Union
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo

from .discovery import _Filter

_Path = Union[str, Path]
_Timestamp = Tuple[int, int, int, int, int, int]
_StrOrIter = Union[str, Iterable[str]]

_HASH_ALG = "sha256"
_HASH_BUF_SIZE = 65536
_MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC
_DEFAULT_COMPRESSION = ZIP_DEFLATED
_WHEEL_VERSION = "1.0"
_META_TEMPLATE = f"""\
Wheel-Version: {_WHEEL_VERSION}
Generator: {{generator}}
Root-Is-Purelib: {{root_is_purelib}}
"""

_logger = logging.getLogger(__name__)


class WheelBuilder:
"""Wrapper around ZipFile that abstracts some aspects of creating a ``.whl`` file
It should be used as a context manager and before closing will add the ``WHEEL``
and ``RECORD`` files to the end of the archive.
The caller is responsible for writing files/dirs in a suitable order,
(which includes ensuring ``.dist-info`` is written last).
"""

def __init__(
self,
path: _Path,
root_is_purelib: bool = True,
compression: int = _DEFAULT_COMPRESSION,
generator: Optional[str] = None,
timestamp: Optional[int] = None,
):
self._path = Path(path)
self._root_is_purelib = root_is_purelib
self._generator = generator
self._compression = compression
self._zip = ZipFile(self._path, "w", compression=compression)
self._records: Dict[str, Tuple[str, int]] = {}

basename = str(self._path.with_suffix("").name)
parts = basename.split("-")
self._distribution, self._version = parts[:2]
self._tags = parts[-3:]
self._build = parts[2] if len(parts) > 5 else ""
self._dist_info = f"{self._distribution}-{self._version}.dist-info"
self._timestamp = _get_timestamp(timestamp, int(time.time()))
assert len(self._tags), f"Invalid wheel name: {self._path}"

def __enter__(self) -> "WheelBuilder":
self._zip.__enter__()
_logger.debug(f"creating '{str(self._path)!r}'")
return self

def __exit__(self, exc_type, exc_value, traceback):
self._add_wheel_meta()
self._save_record()
return self._zip.__exit__(exc_type, exc_value, traceback)

def _default_generator(self) -> str:
from setuptools.version import __version__

return f"setuptools ({__version__})"

def add_existing_file(self, arcname: str, file: _Path):
"""Add a file that already exists in the file system to the wheel."""
hashsum = hashlib.new(_HASH_ALG)
file_stat = os.stat(file)
zipinfo = ZipInfo(arcname, self._timestamp)
attr = stat.S_IMODE(file_stat.st_mode) | stat.S_IFMT(file_stat.st_mode)
zipinfo.external_attr = attr << 16
zipinfo.compress_type = self._compression

with open(file, "rb") as src, self._zip.open(zipinfo, "w") as dst:
while True:
buffer = src.read(_HASH_BUF_SIZE)
if not buffer:
file_size = src.tell()
break
dst.write(buffer)
hashsum.update(buffer)

_logger.debug(f"adding {str(arcname)!r} [{attr:o}]")
hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=')
self._records[arcname] = (hash_digest, file_size)

def add_tree(
self, path: _Path, prefix: Optional[str] = None, exclude: Iterable[str] = ()
):
"""
Add the file tree **UNDER** ``path`` to the wheel file (does not include
the parent directory itself).
You can use ``prefix`` to create a new parent directory.
"""
should_exclude = _Filter(*exclude)
for root, dirs, files in os.walk(path):
# Use sorted to improve determinism.
dirs[:] = [x for x in sorted(dirs) if x != "__pycache__"]
for name in sorted(files):
file = os.path.normpath(os.path.join(root, name))
arcname = os.path.relpath(file, path).replace(os.path.sep, "/")
if not os.path.isfile(file) or should_exclude(arcname):
continue
if prefix:
arcname = os.path.join(prefix, arcname)
self.add_existing_file(arcname, file)

def new_file(self, arcname: str, contents: _StrOrIter, permissions: int = 0o664):
"""
Create a new entry in the wheel named ``arcname`` that contains
the UTF-8 text specified by ``contents``.
"""
zipinfo = ZipInfo(arcname, self._timestamp)
zipinfo.external_attr = (permissions | stat.S_IFREG) << 16
zipinfo.compress_type = self._compression
hashsum = hashlib.new(_HASH_ALG)
file_size = 0
iter_contents = [contents] if isinstance(contents, str) else contents
with self._zip.open(zipinfo, "w") as fp:
for part in iter_contents:
bpart = bytes(part, "utf-8")
file_size += fp.write(bpart)
hashsum.update(bpart)
hash_digest = urlsafe_b64encode(hashsum.digest()).decode('ascii').rstrip('=')
self._records[arcname] = (hash_digest, file_size)

def _save_record(self):
arcname = f"{self._dist_info}/RECORD"
zipinfo = ZipInfo(arcname, self._timestamp)
zipinfo.external_attr = (0o664 | stat.S_IFREG) << 16
zipinfo.compress_type = self._compression
out = self._zip.open(zipinfo, "w")
buf = io.TextIOWrapper(out, encoding="utf-8")
with out, buf:
writer = csv.writer(buf, delimiter=",", quotechar='"', lineterminator="\n")
for file, (hash_digest, size) in self._records.items():
writer.writerow((file, f"{_HASH_ALG}={hash_digest}", size))
writer.writerow((arcname, "", ""))

def _add_wheel_meta(self):
arcname = f"{self._dist_info}/WHEEL"
beginning = _META_TEMPLATE.format(
generator=self._generator or self._default_generator(),
root_is_purelib=self._root_is_purelib,
)
impl_tag, abi_tag, plat_tag = self._tags
tags = (
f"Tag: {impl}-{abi}-{plat}\n"
for impl in impl_tag.split(".")
for abi in abi_tag.split(".")
for plat in plat_tag.split(".")
)
build = [f"Build: {self._build}\n"] if self._build else []
contents = itertools.chain([beginning], tags, build)
self.new_file(arcname, contents)


def _get_timestamp(
given: Optional[int] = None,
fallback: int = _MINIMUM_TIMESTAMP,
) -> _Timestamp:
timestamp = given or int(os.environ.get("SOURCE_DATE_EPOCH", fallback))
timestamp = max(timestamp, _MINIMUM_TIMESTAMP)
return time.gmtime(timestamp)[0:6]
86 changes: 86 additions & 0 deletions setuptools/tests/test_wheelbuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# This test is based on the `test_wheelfile.py` from pypa/wheel,
# which was initially distributed under the MIT License:
# Copyright (c) 2012 Daniel Holth <[email protected]> and contributors
import stat
import sys
import textwrap
from zipfile import ZipFile, ZIP_DEFLATED

import pytest

from setuptools._wheelbuilder import WheelBuilder


def test_write_str(tmp_path):
with WheelBuilder(tmp_path / "test-1.0-py3-none-any.whl") as builder:
builder.new_file("hello/héllö.py", 'print("Héllö, world!")\n')
builder.new_file("hello/h,ll,.py", 'print("Héllö, world!")\n')

with ZipFile(tmp_path / "test-1.0-py3-none-any.whl", "r") as zf:
infolist = zf.infolist()
assert len(infolist) == 4 # RECORD + WHEEL
assert infolist[0].filename == "hello/héllö.py"
assert infolist[0].file_size == 25
assert infolist[1].filename == "hello/h,ll,.py"
assert infolist[1].file_size == 25
assert infolist[2].filename == "test-1.0.dist-info/WHEEL"
assert infolist[3].filename == "test-1.0.dist-info/RECORD"

record = "\n".join(
line
for line in str(zf.read("test-1.0.dist-info/RECORD"), "utf-8").splitlines()
if not line.startswith("test-1.0.dist-info/WHEEL")
# Avoid changes in setuptools versions messing with the test
)

expected = """\
hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25
"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25
test-1.0.dist-info/RECORD,,
"""
assert record.strip() == textwrap.dedent(expected).strip()


def test_timestamp(tmp_path_factory, tmp_path, monkeypatch):
build_dir = tmp_path_factory.mktemp("build")
for filename in ("one", "two", "three"):
(build_dir / filename).write_text(filename + "\n", encoding="utf-8")

# The earliest date representable in TarInfos, 1980-01-01
monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060")

wheel_path = tmp_path / "test-1.0-py3-none-any.whl"
with WheelBuilder(wheel_path) as builder:
builder.add_tree(build_dir)

with ZipFile(wheel_path, "r") as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
assert info.compress_type == ZIP_DEFLATED


@pytest.mark.skipif(
sys.platform == "win32", reason="Windows does not support UNIX-like permissions"
)
def test_attributes(tmp_path_factory, tmp_path):
# With the change from ZipFile.write() to .writestr(), we need to manually
# set member attributes.
build_dir = tmp_path_factory.mktemp("build")
files = (("foo", 0o644), ("bar", 0o755))
for filename, mode in files:
path = build_dir / filename
path.write_text(filename + "\n", encoding="utf-8")
path.chmod(mode)

wheel_path = tmp_path / "test-1.0-py3-none-any.whl"
with WheelBuilder(wheel_path) as builder:
builder.add_tree(build_dir)

with ZipFile(wheel_path, "r") as zf:
for filename, mode in files:
info = zf.getinfo(filename)
assert info.external_attr == (mode | stat.S_IFREG) << 16
assert info.compress_type == ZIP_DEFLATED

info = zf.getinfo("test-1.0.dist-info/RECORD")
assert info.external_attr == (0o664 | stat.S_IFREG) << 16