|
| 1 | +""" |
| 2 | +Extended MetadataInjector that supports sdist (.tar.gz) and zip (.zip) formats. |
| 3 | +
|
| 4 | +This extends SimpleRepository's MetadataInjectorRepository to provide metadata extraction |
| 5 | +for package formats beyond wheels. |
| 6 | +
|
| 7 | +""" |
| 8 | + |
1 | 9 | from dataclasses import replace
|
2 | 10 | import pathlib
|
3 | 11 | import tarfile
|
| 12 | +import typing |
4 | 13 | import zipfile
|
5 | 14 |
|
6 | 15 | from simple_repository import model
|
7 | 16 | from simple_repository.components.metadata_injector import MetadataInjectorRepository
|
8 | 17 |
|
9 | 18 |
|
10 |
| -def get_metadata_from_sdist(package_path: pathlib.Path) -> str: |
11 |
| - archive = tarfile.TarFile.open(package_path) |
12 |
| - names = archive.getnames() |
| 19 | +def _extract_pkg_info_from_archive( |
| 20 | + archive_names: typing.List[str], |
| 21 | + extract_func: typing.Callable[[str], typing.Optional[typing.IO[bytes]]], |
| 22 | + package_name: str, |
| 23 | +) -> str: |
| 24 | + """ |
| 25 | + Extract PKG-INFO metadata from an archive. |
| 26 | +
|
| 27 | + Args: |
| 28 | + archive_names: List of file names in the archive |
| 29 | + extract_func: Function to extract a file from the archive |
| 30 | + package_name: Name of the package for error messages |
| 31 | +
|
| 32 | + Returns: |
| 33 | + Metadata content as string |
13 | 34 |
|
14 |
| - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] |
| 35 | + Raises: |
| 36 | + ValueError: If no valid metadata is found |
| 37 | + """ |
| 38 | + pkg_info_files = [x.split("/") for x in archive_names if "PKG-INFO" in x] |
| 39 | + # Sort by path length (descending) to prefer more specific/nested metadata files |
15 | 40 | ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth))
|
16 | 41 |
|
17 | 42 | for path in ordered_pkg_info:
|
18 | 43 | candidate = "/".join(path)
|
19 |
| - f = archive.extractfile(candidate) |
| 44 | + f = extract_func(candidate) |
20 | 45 | if f is None:
|
21 | 46 | continue
|
22 |
| - data = f.read().decode() |
23 |
| - if "Metadata-Version" in data: |
24 |
| - return data |
25 |
| - raise ValueError(f"No metadata found in {package_path.name}") |
| 47 | + try: |
| 48 | + data = f.read().decode("utf-8") |
| 49 | + if "Metadata-Version" in data: |
| 50 | + return data |
| 51 | + except (UnicodeDecodeError, OSError): |
| 52 | + # Skip files that can't be decoded or read |
| 53 | + continue |
| 54 | + |
| 55 | + raise ValueError(f"No valid PKG-INFO metadata found in {package_name}") |
| 56 | + |
| 57 | + |
| 58 | +def get_metadata_from_sdist(package_path: pathlib.Path) -> str: |
| 59 | + """Extract metadata from a source distribution (.tar.gz file).""" |
| 60 | + with tarfile.TarFile.open(package_path) as archive: |
| 61 | + names = archive.getnames() |
| 62 | + |
| 63 | + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: |
| 64 | + return archive.extractfile(candidate) |
| 65 | + |
| 66 | + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) |
26 | 67 |
|
27 | 68 |
|
28 | 69 | def get_metadata_from_zip(package_path: pathlib.Path) -> str:
|
29 |
| - # Used by pyreadline. (a zipfile) |
| 70 | + """Extract metadata from a zip file (legacy format, used by packages like pyreadline).""" |
30 | 71 | with zipfile.ZipFile(package_path) as archive:
|
31 | 72 | names = archive.namelist()
|
32 | 73 |
|
33 |
| - pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] |
34 |
| - ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) |
| 74 | + def extract_func(candidate: str) -> typing.Optional[typing.IO[bytes]]: |
| 75 | + try: |
| 76 | + return archive.open(candidate, mode="r") |
| 77 | + except (KeyError, zipfile.BadZipFile): |
| 78 | + return None |
35 | 79 |
|
36 |
| - for path in ordered_pkg_info: |
37 |
| - candidate = "/".join(path) |
38 |
| - f = archive.open(candidate, mode="r") |
39 |
| - if f is None: |
40 |
| - continue |
41 |
| - data = f.read().decode() |
42 |
| - if "Metadata-Version" in data: |
43 |
| - return data |
44 |
| - raise ValueError(f"No metadata found in {package_path.name}") |
| 80 | + return _extract_pkg_info_from_archive(names, extract_func, package_path.name) |
45 | 81 |
|
46 | 82 |
|
47 | 83 | class MetadataInjector(MetadataInjectorRepository):
|
| 84 | + """ |
| 85 | + Extended MetadataInjector that supports multiple package formats. |
| 86 | +
|
| 87 | + This class extends SimpleRepository's MetadataInjectorRepository to provide |
| 88 | + metadata extraction for: |
| 89 | + - Wheel files (.whl) - handled by parent class |
| 90 | + - Source distributions (.tar.gz) - contains PKG-INFO files |
| 91 | + - Zip files (.zip) - legacy format used by some packages |
| 92 | + """ |
| 93 | + |
| 94 | + # Map of supported file extensions to their extraction functions |
| 95 | + _EXTRACTORS: typing.Dict[ |
| 96 | + str, typing.Callable[["MetadataInjector", pathlib.Path], str] |
| 97 | + ] = { |
| 98 | + ".whl": lambda self, path: self._get_metadata_from_wheel(path), |
| 99 | + ".tar.gz": lambda self, path: get_metadata_from_sdist(path), |
| 100 | + ".zip": lambda self, path: get_metadata_from_zip(path), |
| 101 | + } |
| 102 | + |
48 | 103 | def _get_metadata_from_package(self, package_path: pathlib.Path) -> str:
|
49 |
| - if package_path.name.endswith(".whl"): |
50 |
| - return self._get_metadata_from_wheel(package_path) |
51 |
| - elif package_path.name.endswith(".tar.gz"): |
52 |
| - return get_metadata_from_sdist(package_path) |
53 |
| - elif package_path.name.endswith(".zip"): |
54 |
| - return get_metadata_from_zip(package_path) |
55 |
| - raise ValueError("Package provided is not a wheel") |
| 104 | + """Extract metadata from a package file based on its extension.""" |
| 105 | + package_name = package_path.name |
| 106 | + |
| 107 | + for extension, extractor in self._EXTRACTORS.items(): |
| 108 | + if package_name.endswith(extension): |
| 109 | + return extractor(self, package_path) |
| 110 | + |
| 111 | + # Provide more descriptive error message |
| 112 | + supported_formats = ", ".join(self._EXTRACTORS.keys()) |
| 113 | + raise ValueError( |
| 114 | + f"Unsupported package format: {package_name}. " |
| 115 | + f"Supported formats: {supported_formats}" |
| 116 | + ) |
56 | 117 |
|
57 | 118 | def _add_metadata_attribute(
|
58 | 119 | self,
|
59 | 120 | project_page: model.ProjectDetail,
|
60 | 121 | ) -> model.ProjectDetail:
|
61 |
| - """Add the data-core-metadata to all the packages distributed as wheels""" |
| 122 | + """ |
| 123 | + Add the data-core-metadata attribute to all supported package files. |
| 124 | +
|
| 125 | + Unlike the parent class which only adds metadata attributes to wheel files, |
| 126 | + this implementation adds them to all files with URLs, enabling metadata |
| 127 | + requests for sdist and zip files as well. |
| 128 | + """ |
62 | 129 | files = []
|
63 | 130 | for file in project_page.files:
|
64 |
| - if file.url and not file.dist_info_metadata: |
| 131 | + matching_extension = file.filename.endswith(tuple(self._EXTRACTORS.keys())) |
| 132 | + if matching_extension and not file.dist_info_metadata: |
65 | 133 | file = replace(file, dist_info_metadata=True)
|
66 | 134 | files.append(file)
|
67 |
| - project_page = replace(project_page, files=tuple(files)) |
68 |
| - return project_page |
| 135 | + return replace(project_page, files=tuple(files)) |
0 commit comments