10
10
import tarfile
11
11
import tempfile
12
12
import urllib .parse
13
- from collections .abc import Callable
13
+ from collections .abc import Callable , Iterator
14
14
from dataclasses import dataclass
15
15
from datetime import datetime
16
16
31
31
logger : logging .Logger = logging .getLogger (__name__ )
32
32
33
33
34
+ def _handle_temp_dir_clean (function : Callable , path : str , onerror : tuple ) -> None :
35
+ raise SourceCodeError (f"Error removing with shutil. function={ function } , " f"path={ path } , excinfo={ onerror } " )
36
+
37
+
34
38
class PyPIRegistry (PackageRegistry ):
35
39
"""This class implements the pypi package registry."""
36
40
@@ -187,10 +191,7 @@ def download_package_json(self, url: str) -> dict:
187
191
188
192
return res_obj
189
193
190
- def _handle_temp_dir_clean (self , function : Callable , path : str , onerror : tuple ) -> None :
191
- raise SourceCodeError (f"Error removing with shutil. function={ function } , " f"path={ path } , excinfo={ onerror } " )
192
-
193
- def download_package_sourcecode (self , url : str ) -> tuple [dict [str , bytes ], str ]:
194
+ def download_package_sourcecode (self , url : str ) -> str :
194
195
"""Download the package source code from pypi registry.
195
196
196
197
Parameters
@@ -200,11 +201,14 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
200
201
201
202
Returns
202
203
-------
203
- tuple[dict[str, bytes], str]
204
- A dictionary of filenames and file contents, and the temp directory with the source code.
205
- """
206
- sourcecode : dict = {}
204
+ str
205
+ The temp directory with the source code.
207
206
207
+ Raises
208
+ ------
209
+ InvalidHTTPResponseError
210
+ If the HTTP request to the registry fails or an unexpected response is returned.
211
+ """
208
212
# Get name of file.
209
213
_ , _ , file_name = url .rpartition ("/" )
210
214
package_name = re .sub (r"\.tar\.gz$" , "" , file_name )
@@ -216,7 +220,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
216
220
error_msg = f"Unable to find package source code using URL: { url } "
217
221
logger .debug (error_msg )
218
222
try :
219
- shutil .rmtree (temp_dir , onerror = self . _handle_temp_dir_clean )
223
+ shutil .rmtree (temp_dir , onerror = _handle_temp_dir_clean )
220
224
except SourceCodeError as tempdir_exception :
221
225
tempdir_exception_msg = (
222
226
f"Unable to cleanup temporary directory { temp_dir } for source code: { tempdir_exception } "
@@ -235,7 +239,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
235
239
error_msg = f"Error while streaming source file: { stream_error } "
236
240
logger .debug (error_msg )
237
241
try :
238
- shutil .rmtree (temp_dir , onerror = self . _handle_temp_dir_clean )
242
+ shutil .rmtree (temp_dir , onerror = _handle_temp_dir_clean )
239
243
except SourceCodeError as tempdir_exception :
240
244
tempdir_exception_msg = (
241
245
f"Unable to cleanup temporary directory { temp_dir } for source code: { tempdir_exception } "
@@ -249,15 +253,11 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
249
253
with tarfile .open (source_file .name , "r:gz" ) as sourcecode_tar :
250
254
sourcecode_tar .extractall (temp_dir , filter = "data" )
251
255
252
- for member in sourcecode_tar .getmembers ():
253
- if member .isfile () and (file_obj := sourcecode_tar .extractfile (member )):
254
- sourcecode [member .name ] = file_obj .read ()
255
-
256
256
except tarfile .ReadError as read_error :
257
257
error_msg = f"Error reading source code tar file: { read_error } "
258
258
logger .debug (error_msg )
259
259
try :
260
- shutil .rmtree (temp_dir , onerror = self . _handle_temp_dir_clean )
260
+ shutil .rmtree (temp_dir , onerror = _handle_temp_dir_clean )
261
261
except SourceCodeError as tempdir_exception :
262
262
tempdir_exception_msg = (
263
263
f"Unable to cleanup temporary directory { temp_dir } for source code: { tempdir_exception } "
@@ -266,11 +266,16 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
266
266
267
267
raise InvalidHTTPResponseError (error_msg ) from read_error
268
268
269
+ extracted_dir = os .listdir (temp_dir )
270
+ if len (extracted_dir ) == 1 and re .sub (".tar.gz$" , "" , file_name ) == extracted_dir [0 ]:
271
+ # structure used package name and version as top-level directory
272
+ temp_dir = os .path .join (temp_dir , extracted_dir [0 ])
273
+
269
274
else :
270
275
error_msg = f"Unable to extract source code from file { file_name } "
271
276
logger .debug (error_msg )
272
277
try :
273
- shutil .rmtree (temp_dir , onerror = self . _handle_temp_dir_clean )
278
+ shutil .rmtree (temp_dir , onerror = _handle_temp_dir_clean )
274
279
except SourceCodeError as tempdir_exception :
275
280
tempdir_exception_msg = (
276
281
f"Unable to cleanup temporary directory { temp_dir } for source code: { tempdir_exception } "
@@ -281,7 +286,7 @@ def download_package_sourcecode(self, url: str) -> tuple[dict[str, bytes], str]:
281
286
raise InvalidHTTPResponseError (error_msg )
282
287
283
288
logger .debug ("Temporary download and unzip of %s stored in %s" , file_name , temp_dir )
284
- return sourcecode , temp_dir
289
+ return temp_dir
285
290
286
291
def get_package_page (self , package_name : str ) -> str | None :
287
292
"""Implement custom API to get package main page.
@@ -401,9 +406,6 @@ class PyPIPackageJsonAsset:
401
406
#: The asset content.
402
407
package_json : dict
403
408
404
- #: The source code of the package hosted on PyPI
405
- package_sourcecode : dict
406
-
407
409
#: the source code temporary location name
408
410
package_sourcecode_path : str
409
411
@@ -537,7 +539,7 @@ def get_latest_release_upload_time(self) -> str | None:
537
539
return None
538
540
539
541
def download_sourcecode (self ) -> bool :
540
- """Get the source code of the package and store it in the package_sourcecode attribute .
542
+ """Get the source code of the package and store it in a temporary directory .
541
543
542
544
Returns
543
545
-------
@@ -547,26 +549,22 @@ def download_sourcecode(self) -> bool:
547
549
url = self .get_sourcecode_url ()
548
550
if url :
549
551
try :
550
- self .package_sourcecode , self .package_sourcecode_path = self .pypi_registry .download_package_sourcecode (
551
- url
552
- )
552
+ self .package_sourcecode_path = self .pypi_registry .download_package_sourcecode (url )
553
553
return True
554
554
except InvalidHTTPResponseError as error :
555
555
logger .debug (error )
556
556
return False
557
557
558
- def _handle_temp_dir_clean (self , function : Callable , path : str , onerror : tuple ) -> None :
559
- raise SourceCodeError (f"Error removing with shutil. function={ function } , " f"path={ path } , excinfo={ onerror } " )
560
-
561
558
def cleanup_sourcecode (self ) -> None :
562
559
"""
563
560
Delete the temporary directory created when downloading the source code.
564
561
565
- The package source code is no longer accessible after this.
562
+ The package source code is no longer accessible after this, and the package_sourcecode_path
563
+ attribute is set to an empty string.
566
564
"""
567
565
if self .package_sourcecode_path :
568
566
try :
569
- shutil .rmtree (self .package_sourcecode_path , onerror = self . _handle_temp_dir_clean )
567
+ shutil .rmtree (self .package_sourcecode_path , onerror = _handle_temp_dir_clean )
570
568
self .package_sourcecode_path = ""
571
569
except SourceCodeError as tempdir_exception :
572
570
tempdir_exception_msg = (
@@ -575,3 +573,77 @@ def cleanup_sourcecode(self) -> None:
575
573
)
576
574
logger .debug (tempdir_exception_msg )
577
575
raise tempdir_exception
576
+
577
+ def get_sourcecode_file_contents (self , path : str ) -> bytes :
578
+ """
579
+ Get the contents of a single source code file specified by the path.
580
+
581
+ The path can be relative to the package_sourcecode_path attribute, or an absolute path.
582
+
583
+ Parameters
584
+ ----------
585
+ path: str
586
+ The absolute or relative to package_sourcecode_path file path to open.
587
+
588
+ Returns
589
+ -------
590
+ bytes
591
+ The raw contents of the source code file.
592
+
593
+ Raises
594
+ ------
595
+ SourceCodeError
596
+ if the source code has not been downloaded, or there is an error accessing the file.
597
+ """
598
+ if not self .package_sourcecode_path :
599
+ error_msg = "No source code files have been downloaded"
600
+ logger .debug (error_msg )
601
+ raise SourceCodeError (error_msg )
602
+
603
+ if not os .path .isabs (path ):
604
+ path = os .path .join (self .package_sourcecode_path , path )
605
+
606
+ if not os .path .exists (path ):
607
+ error_msg = f"Unable to locate file { path } "
608
+ logger .debug (error_msg )
609
+ raise SourceCodeError (error_msg )
610
+
611
+ try :
612
+ with open (path , "rb" ) as file :
613
+ return file .read ()
614
+ except OSError as read_error :
615
+ error_msg = f"Unable to read file { path } : { read_error } "
616
+ logger .debug (error_msg )
617
+ raise SourceCodeError (error_msg ) from read_error
618
+
619
+ def iter_sourcecode (self ) -> Iterator [tuple [str , bytes ]]:
620
+ """
621
+ Iterate through all source code files.
622
+
623
+ Returns
624
+ -------
625
+ tuple[str, bytes]
626
+ The source code file path, and the the raw contents of the source code file.
627
+
628
+ Raises
629
+ ------
630
+ SourceCodeError
631
+ if the source code has not been downloaded.
632
+ """
633
+ if not self .package_sourcecode_path :
634
+ error_msg = "No source code files have been downloaded"
635
+ logger .debug (error_msg )
636
+ raise SourceCodeError (error_msg )
637
+
638
+ for root , _directories , files in os .walk (self .package_sourcecode_path ):
639
+ for file in files :
640
+ if root == "." :
641
+ root_path = os .getcwd () + os .linesep
642
+ else :
643
+ root_path = root
644
+ filepath = os .path .join (root_path , file )
645
+
646
+ with open (filepath , "rb" ) as handle :
647
+ contents = handle .read ()
648
+
649
+ yield filepath , contents
0 commit comments