1+ import logging
2+ import re
3+ from typing import Iterable , List , Optional
4+ from urllib .parse import urljoin
5+
6+ import requests
7+ from packageurl import PackageURL
8+
9+ from vulnerabilities .importer import AdvisoryData , AffectedPackage , Importer , Reference
10+ from vulnerabilities .utils import nearest_patched_package
11+
12+ logger = logging .getLogger (__name__ )
13+
14+
15+ class AlpineImporter (Importer ):
16+ spdx_license_expression = "MIT"
17+ license_url = "https://gitlab.alpinelinux.org/alpine/aports/-/blob/master/LICENSE"
18+
19+ # Alpine Git repository base URL
20+ ALPINE_GIT_BASE = "https://git.alpinelinux.org/aports/tree/"
21+
22+ # Regex patterns for parsing APKBUILD secfixes
23+ SECFIXES_START = re .compile (r'^\s*#\s*secfixes:\s*$' , re .IGNORECASE )
24+ VERSION_LINE = re .compile (r'^\s*#\s+([0-9]+[^:]+):\s*$' )
25+ CVE_LINE = re .compile (r'^\s*#\s+-\s+(CVE-\d{4}-\d+)\s*$' , re .IGNORECASE )
26+
27+ def __init__ (self , * args , ** kwargs ):
28+ super ().__init__ (* args , ** kwargs )
29+ # List of packages to process - this would typically come from crawling
30+ # the Alpine repository or a configuration file
31+ self .packages_to_process = []
32+
33+ def advisory_data (self ) -> Iterable [AdvisoryData ]:
34+ # In a real implementation, you would:
35+ # 1. Fetch the list of all packages from Alpine repositories
36+ # 2. For each package, fetch its APKBUILD file
37+ # 3. Parse the secfixes section
38+
39+ # For now, this demonstrates the structure with a known example
40+ example_packages = [
41+ ('main' , 'asterisk' , '9d426cf7a7701ee6707224d3e9f6d07553a56de1' ),
42+ ]
43+
44+ for branch , package , commit in example_packages :
45+ try :
46+ url = self ._get_apkbuild_url (branch , package , commit )
47+ secfixes = self ._parse_apkbuild_from_url (url )
48+
49+ # Group by CVE to create advisories
50+ cve_to_versions = self ._group_by_cve (secfixes )
51+
52+ for cve_id , versions in cve_to_versions .items ():
53+ advisory = self ._create_advisory (
54+ cve_id = cve_id ,
55+ package_name = package ,
56+ fixed_versions = versions ,
57+ branch = branch ,
58+ apkbuild_url = url
59+ )
60+ yield advisory
61+
62+ except Exception as e :
63+ logger .error (f"Error processing { package } from { branch } : { e } " )
64+ continue
65+
66+ def _get_apkbuild_url (self , branch : str , package : str , commit : Optional [str ] = None ) -> str :
67+ """Construct URL to an APKBUILD file."""
68+ url = f"{ self .ALPINE_GIT_BASE } { branch } /{ package } /APKBUILD"
69+ if commit :
70+ url += f"?id={ commit } "
71+ return url
72+
73+ def _parse_apkbuild_from_url (self , url : str ) -> dict :
74+ try :
75+ response = requests .get (url , timeout = 30 )
76+ response .raise_for_status ()
77+ return self ._parse_secfixes (response .text )
78+ except requests .RequestException as e :
79+ logger .error (f"Failed to fetch APKBUILD from { url } : { e } " )
80+ return {}
81+
82+ def _parse_secfixes (self , content : str ) -> dict :
83+ lines = content .split ('\n ' )
84+ secfixes = {}
85+ in_secfixes_section = False
86+ current_version = None
87+
88+ for line in lines :
89+ # Check if we're entering the secfixes section
90+ if self .SECFIXES_START .match (line ):
91+ in_secfixes_section = True
92+ continue
93+
94+ if in_secfixes_section :
95+ # Check if we've left the secfixes section
96+ if not line .strip ().startswith ('#' ):
97+ break
98+
99+ # Check for version line
100+ version_match = self .VERSION_LINE .match (line )
101+ if version_match :
102+ current_version = version_match .group (1 ).strip ()
103+ secfixes [current_version ] = []
104+ continue
105+
106+ # Check for CVE line
107+ if current_version :
108+ cve_match = self .CVE_LINE .match (line )
109+ if cve_match :
110+ cve_id = cve_match .group (1 ).upper ()
111+ secfixes [current_version ].append (cve_id )
112+
113+ return secfixes
114+
115+ def _group_by_cve (self , secfixes : dict ) -> dict :
116+ cve_to_versions = {}
117+ for version , cve_list in secfixes .items ():
118+ for cve_id in cve_list :
119+ if cve_id not in cve_to_versions :
120+ cve_to_versions [cve_id ] = []
121+ cve_to_versions [cve_id ].append (version )
122+ return cve_to_versions
123+
124+ def _create_advisory (
125+ self ,
126+ cve_id : str ,
127+ package_name : str ,
128+ fixed_versions : List [str ],
129+ branch : str ,
130+ apkbuild_url : str
131+ ) -> AdvisoryData :
132+ # Create references
133+ references = [
134+ Reference (
135+ url = apkbuild_url ,
136+ reference_id = f"alpine-{ branch } -{ package_name } " ,
137+ )
138+ ]
139+
140+ # Add NVD reference for the CVE
141+ references .append (
142+ Reference (
143+ url = f"https://nvd.nist.gov/vuln/detail/{ cve_id } " ,
144+ reference_id = cve_id ,
145+ severities = [],
146+ )
147+ )
148+
149+ # Create affected packages (all versions before the fix are affected)
150+ # In a real implementation, you would need to determine the actual
151+ # affected version range. For now, we mark the fixed versions.
152+ affected_packages = []
153+ for fixed_version in fixed_versions :
154+ purl = PackageURL (
155+ type = "alpine" ,
156+ namespace = branch ,
157+ name = package_name ,
158+ version = fixed_version ,
159+ )
160+
161+ affected_package = AffectedPackage (
162+ package = purl ,
163+ fixed_version = fixed_version ,
164+ )
165+ affected_packages .append (affected_package )
166+
167+ return AdvisoryData (
168+ aliases = [cve_id ],
169+ summary = f"{ cve_id } fixed in { package_name } { ', ' .join (fixed_versions )} " ,
170+ affected_packages = affected_packages ,
171+ references = references ,
172+ url = apkbuild_url ,
173+ )
174+
175+
176+ class AlpineAPKBUILDCrawler :
177+
178+ ALPINE_PACKAGES_API = "https://pkgs.alpinelinux.org/packages"
179+
180+ def get_all_packages (self , branch : str = "main" ) -> List [str ]:
181+ # This is a placeholder - actual implementation would need to:
182+ # 1. Query Alpine's package API or
183+ # 2. Clone the aports repository and find all APKBUILD files or
184+ # 3. Use the Alpine package database
185+
186+ # Example packages that are known to have secfixes
187+ known_packages = [
188+ 'asterisk' ,
189+ 'expat' ,
190+ 'openssl' ,
191+ 'python3' ,
192+ 'nginx' ,
193+ 'apache2' ,
194+ ]
195+
196+ return known_packages
197+
198+
199+ # Example test function
200+ def test_alpine_importer ():
201+ """Test the Alpine importer with a known APKBUILD file."""
202+ importer = AlpineImporter ()
203+
204+ print ("Testing Alpine APKBUILD Importer" )
205+ print ("=" * 60 )
206+
207+ advisories = list (importer .advisory_data ())
208+
209+ print (f"\n Found { len (advisories )} advisories" )
210+
211+ for advisory in advisories :
212+ print (f"\n Advisory: { advisory .aliases } " )
213+ print (f" Summary: { advisory .summary } " )
214+ print (f" URL: { advisory .url } " )
215+ print (f" Affected packages: { len (advisory .affected_packages )} " )
216+ for pkg in advisory .affected_packages :
217+ print (f" - { pkg .package } (fixed in { pkg .fixed_version } )" )
218+ print (f" References: { len (advisory .references )} " )
219+
220+
221+ if __name__ == '__main__' :
222+ test_alpine_importer ()
0 commit comments