Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/built-in-pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ Scan Single Package
:members:
:member-order: bysource

.. _pipeline_scan_maven_package:

Scan Maven Package
-------------------
.. autoclass:: scanpipe.pipelines.scan_maven_package.ScanMavenPackage()
:members:
:member-order: bysource

Fetch Scores (addon)
--------------------
.. warning::
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ resolve_dependencies = "scanpipe.pipelines.resolve_dependencies:ResolveDependenc
scan_codebase = "scanpipe.pipelines.scan_codebase:ScanCodebase"
scan_for_virus = "scanpipe.pipelines.scan_for_virus:ScanForVirus"
scan_single_package = "scanpipe.pipelines.scan_single_package:ScanSinglePackage"
scan_maven_package = "scanpipe.pipelines.scan_maven_package:ScanMavenPackage"

[tool.setuptools.packages.find]
where = ["."]
Expand Down
70 changes: 70 additions & 0 deletions scanpipe/pipelines/scan_maven_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/aboutcode-org/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

from scanpipe.pipelines.scan_single_package import ScanSinglePackage
from scanpipe.pipes.maven import fetch_and_scan_remote_pom
from scanpipe.pipes.maven import update_package_license_from_resource_if_missing


class ScanMavenPackage(ScanSinglePackage):
"""
Scan a single maven package archive.

This pipeline scans a single maven package for package metadata,
declared dependencies, licenses, license clarity score and copyrights.

The output is a summary of the scan results in JSON format.
"""

@classmethod
def steps(cls):
return (
cls.get_package_input,
cls.collect_input_information,
cls.extract_input_to_codebase_directory,
cls.extract_archives,
cls.run_scan,
cls.fetch_and_scan_remote_pom,
cls.load_inventory_from_toolkit_scan,
cls.update_package_license_from_resource_if_missing,
cls.make_summary_from_scan_results,
)

def fetch_and_scan_remote_pom(self):
"""Fetch and scan remote POM files."""
scanning_errors = fetch_and_scan_remote_pom(
self.project, self.scan_output_location
)
if scanning_errors:
for scanning_error in scanning_errors:
for resource_path, errors in scanning_error.items():
self.project.add_error(
description="\n".join(errors),
model=self.pipeline_name,
details={
"resource_path": resource_path.removeprefix("codebase/")
},
)

def update_package_license_from_resource_if_missing(self):
"""Update PACKAGE license from the license detected in RESOURCES if missing."""
update_package_license_from_resource_if_missing(self.project)
264 changes: 264 additions & 0 deletions scanpipe/pipes/maven.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import json
import re

import requests

from scanpipe.pipes import fetch
from scanpipe.pipes import scancode


def fetch_and_scan_remote_pom(project, scan_output_location):
"""Fetch the .pom file from from maven.org if not present in codebase."""
with open(scan_output_location) as file:
data = json.load(file)
# Return and do nothing if data has pom.xml
for file in data["files"]:
if "pom.xml" in file["path"]:
return
packages = data.get("packages", [])

pom_url_list = get_pom_url_list(project.input_sources[0], packages)
pom_file_list = download_pom_files(pom_url_list)
scanning_errors = scan_pom_files(pom_file_list)

scanned_pom_packages, scanned_dependencies = update_datafile_paths(pom_file_list)

updated_packages = packages + scanned_pom_packages
# Replace/Update the package and dependencies section
data["packages"] = updated_packages
data["dependencies"] = scanned_dependencies
with open(scan_output_location, "w") as file:
json.dump(data, file, indent=2)
return scanning_errors


def parse_maven_filename(filename):
"""Parse a Maven's jar filename to extract artifactId and version."""
# Remove the .jar extension
base = filename.rsplit(".", 1)[0]

# Common classifiers pattern
common_classifiers = {
"sources",
"javadoc",
"tests",
"test",
"test-sources",
"src",
"bin",
"docs",
"javadocs",
"client",
"server",
"linux",
"windows",
"macos",
"linux-x86_64",
"windows-x86_64",
}

# Remove known classifier if present
for classifier in common_classifiers:
if base.endswith(f"-{classifier}"):
base = base[: -(len(classifier) + 1)]
break

# Match artifactId and version
match = re.match(r"^(.*?)-((\d[\w.\-]*))$", base)

if match:
artifact_id = match.group(1)
version = match.group(2)
return artifact_id, version
else:
return None, None


def get_pom_url_list(input_source, packages):
"""Generate Maven POM URLs from package metadata or input source."""
pom_url_list = []
if packages:
for package in packages:
if package.get("type") == "maven":
package_ns = package.get("namespace", "")
package_name = package.get("name", "")
package_version = package.get("version", "")
pom_url = (
f"https://repo1.maven.org/maven2/{package_ns.replace('.', '/')}/"
f"{package_name}/{package_version}/"
f"{package_name}-{package_version}.pom".lower()
)
pom_url_list.append(pom_url)
if not pom_url_list:
from urllib.parse import urlparse

# Check what's the input source
input_source_url = input_source.get("download_url", "")

parsed_url = urlparse(input_source_url)
maven_hosts = {
"repo1.maven.org",
"repo.maven.apache.org",
"maven.google.com",
}
if input_source_url and parsed_url.netloc in maven_hosts:
base_url = input_source_url.rsplit("/", 1)[0]
pom_url = (
base_url + "/" + "-".join(base_url.rstrip("/").split("/")[-2:]) + ".pom"
)
pom_url_list.append(pom_url)
else:
# Construct a pom_url from filename
input_filename = input_source.get("filename", "")
if input_filename.endswith(".jar") or input_filename.endswith(".aar"):
artifact_id, version = parse_maven_filename(input_filename)
if not artifact_id or not version:
return []
pom_url_list = construct_pom_url_from_filename(artifact_id, version)
else:
# Only work with input that's a .jar or .aar file
return []

return pom_url_list


def construct_pom_url_from_filename(artifact_id, version):
"""Construct a pom.xml URL from the given Maven filename."""
# Search Maven Central for the artifact to get its groupId
url = f"https://search.maven.org/solrsearch/select?q=a:{artifact_id}&wt=json"
pom_url_list = []
group_ids = []
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
# Extract all 'g' fields from the docs array that represent
# groupIds
group_ids = [doc["g"] for doc in data["response"]["docs"]]
except requests.RequestException as e:
print(f"Error fetching data: {e}")
return []
except KeyError as e:
print(f"Error parsing JSON: {e}")
return []

for group_id in group_ids:
pom_url = (
f"https://repo1.maven.org/maven2/{group_id.replace('.', '/')}/"
f"{artifact_id}/{version}/{artifact_id}-{version}.pom".lower()
)
if is_maven_pom_url(pom_url):
pom_url_list.append(pom_url)
if len(pom_url_list) > 1:
# If multiple valid POM URLs are found, it means the same
# artifactId and version exist under different groupIds. Since we
# can't confidently determine the correct groupId, we return an
# empty list to avoid fetching the wrong POM.
return []

return pom_url_list


def is_maven_pom_url(url):
"""Return True if the url is a accessible, False otherwise"""
# Maven Central has a fallback mechanism that serves a generic/error
# page instead of returning a proper 404.
try:
response = requests.get(url, timeout=5)
if response.status_code != 200:
return False
# Check content-type
content_type = response.headers.get("content-type", "").lower()
is_xml = "xml" in content_type or "text/xml" in content_type

# Check content
content = response.text.strip()
is_pom = content.startswith("<?xml") and "<project" in content

if is_xml and is_pom:
return True
else:
# It's probably the Maven Central error page
return False
except requests.RequestException:
return False


def download_pom_files(pom_url_list):
"""Fetch the pom file from the input pom_url_list"""
pom_file_list = []
for pom_url in pom_url_list:
pom_file_dict = {}
downloaded_pom = fetch.fetch_http(pom_url)
pom_file_dict["pom_file_path"] = str(downloaded_pom.path)
pom_file_dict["output_path"] = str(downloaded_pom.path) + "-output.json"
pom_file_dict["pom_url"] = pom_url
pom_file_list.append(pom_file_dict)
return pom_file_list


def scan_pom_files(pom_file_list):
"""Fetch and scan the pom file from the input pom_url_list"""
scan_errors = []
for pom_file_dict in pom_file_list:
pom_file_path = pom_file_dict.get("pom_file_path", "")
scanned_pom_output_path = pom_file_dict.get("output_path", "")

# Run a package scan on the fetched pom.xml
scanning_errors = scancode.run_scan(
location=pom_file_path,
output_file=scanned_pom_output_path,
run_scan_args={
"package": True,
},
)
if scanning_errors:
scan_errors.extend(scanning_errors)
return scan_errors


def update_datafile_paths(pom_file_list):
"""Update datafile_paths in scanned packages and dependencies."""
scanned_pom_packages = []
scanned_pom_deps = []
for pom_file_dict in pom_file_list:
scanned_pom_output_path = pom_file_dict.get("output_path", "")
pom_url = pom_file_dict.get("pom_url", "")

with open(scanned_pom_output_path) as scanned_pom_file:
scanned_pom_data = json.load(scanned_pom_file)
scanned_packages = scanned_pom_data.get("packages", [])
scanned_dependencies = scanned_pom_data.get("dependencies", [])
if scanned_packages:
for scanned_package in scanned_packages:
# Replace the 'datafile_path' with the pom_url
scanned_package["datafile_paths"] = [pom_url]
scanned_pom_packages.append(scanned_package)
if scanned_dependencies:
for scanned_dep in scanned_dependencies:
# Replace the 'datafile_path' with empty string
# See https://github.com/aboutcode-org/scancode.io/issues/1763#issuecomment-3525165830
scanned_dep["datafile_path"] = ""
scanned_pom_deps.append(scanned_dep)
return scanned_pom_packages, scanned_pom_deps


def update_package_license_from_resource_if_missing(project):
"""Populate missing licenses to packages based on resource data."""
from license_expression import Licensing

for package in project.discoveredpackages.all():
if not package.get_declared_license_expression():
package_uid = package.package_uid
detected_lic_list = []
for resource in project.codebaseresources.has_license_expression():
for for_package in resource.for_packages:
if for_package == package_uid:
detected_lic_exp = resource.detected_license_expression
if detected_lic_exp not in detected_lic_list:
detected_lic_list.append(detected_lic_exp)
if detected_lic_list:
lic_exp = " AND ".join(detected_lic_list)
declared_lic_exp = str(Licensing().dedup(lic_exp))
package.declared_license_expression = declared_lic_exp
package.save()
Loading