diff --git a/CHANGELOG.md b/CHANGELOG.md index e27928a38..3191d797a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,9 @@ All versions prior to 0.9.0 are untracked. * ClientTrustConfig now provides methods `production()`, `staging()`and `from_tuf()` to get access to current client configuration (trusted keys & certificates, URLs and their validity periods). [#1363](https://github.com/sigstore/sigstore-python/pull/1363) + * SigningConfig now has methods that return actual clients (like `RekorClient`) instead of + just URLs. The returned clients are also filtered according to SigningConfig contents. + [#1407](https://github.com/sigstore/sigstore-python/pull/1407) * `--trust-config` now requires a file with SigningConfig v0.2, and is able to fully configure the used Sigstore instance [#1358]/(https://github.com/sigstore/sigstore-python/pull/1358) * By default (when `--trust-config` is not used) the whole trust configuration now diff --git a/sigstore/_internal/trust.py b/sigstore/_internal/trust.py index f77181ef0..35e3e7fd7 100644 --- a/sigstore/_internal/trust.py +++ b/sigstore/_internal/trust.py @@ -18,6 +18,7 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime, timezone @@ -46,6 +47,7 @@ ) from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( Service, + ServiceConfiguration, ServiceSelector, TransparencyLogInstance, ) @@ -56,6 +58,9 @@ TrustedRoot as _TrustedRoot, ) +from sigstore._internal.fulcio.client import FulcioClient +from sigstore._internal.rekor.client import RekorClient +from sigstore._internal.timestamp import TimestampAuthorityClient from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater from sigstore._utils import ( KeyID, @@ -66,6 +71,12 @@ ) from sigstore.errors import Error, MetadataError, TUFError, VerificationError +# Versions supported by this client +REKOR_VERSIONS = [1] +TSA_VERSIONS = [1] +FULCIO_VERSIONS = [1] +OIDC_VERSIONS = [1] + def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool: """ @@ -323,13 +334,6 @@ def __init__(self, inner: _SigningConfig): @api private """ self._inner = inner - self._verify() - - def _verify(self) -> None: - """ - Performs various feats of heroism to ensure that the signing config - is well-formed. - """ # must have a recognized media type. try: @@ -337,14 +341,27 @@ def _verify(self) -> None: except ValueError: raise Error(f"unsupported signing config format: {self._inner.media_type}") - # currently not supporting other select modes - # TODO: Support other modes ensuring tsa_urls() and tlog_urls() work - if self._inner.rekor_tlog_config.selector != ServiceSelector.ANY: - raise Error( - f"unsupported tlog selector {self._inner.rekor_tlog_config.selector}" - ) - if self._inner.tsa_config.selector != ServiceSelector.ANY: - raise Error(f"unsupported TSA selector {self._inner.tsa_config.selector}") + # Create lists of service protos that are valid, selected by the service + # configuration & supported by this client + self._tlogs = self._get_valid_services( + self._inner.rekor_tlog_urls, REKOR_VERSIONS, self._inner.rekor_tlog_config + ) + if not self._tlogs: + raise Error("No valid Rekor transparency log found in signing config") + + self._tsas = self._get_valid_services( + self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config + ) + + self._fulcios = self._get_valid_services( + self._inner.ca_urls, FULCIO_VERSIONS, None + ) + if not self._fulcios: + raise Error("No valid Fulcio CA found in signing config") + + self._oidcs = self._get_valid_services( + self._inner.oidc_urls, OIDC_VERSIONS, None + ) @classmethod def from_file( @@ -356,54 +373,73 @@ def from_file( return cls(inner) @staticmethod - def _get_valid_service_url(services: list[Service]) -> str | None: + def _get_valid_services( + services: list[Service], + supported_versions: list[int], + config: ServiceConfiguration | None, + ) -> list[Service]: + """Return supported services, taking SigningConfig restrictions into account""" + + # split services by operator, only include valid services + services_by_operator: dict[str, list[Service]] = defaultdict(list) for service in services: - if service.major_api_version != 1: + if service.major_api_version not in supported_versions: continue if not _is_timerange_valid(service.valid_for, allow_expired=False): continue - return service.url - return None - def get_tlog_urls(self) -> list[str]: + services_by_operator[service.operator].append(service) + + # build a list of services but make sure we only include one service per operator + # and use the highest version available for that operator + result: list[Service] = [] + for op_services in services_by_operator.values(): + op_services.sort(key=lambda s: s.major_api_version) + result.append(op_services[-1]) + + # Depending on ServiceSelector, prune the result list + if not config or config.selector == ServiceSelector.ALL: + return result + + if config.selector == ServiceSelector.UNDEFINED: + raise ValueError("Undefined is not a valid signing config ServiceSelector") + + # handle EXACT and ANY selectors + count = config.count if config.selector == ServiceSelector.EXACT else 1 + if len(result) < count: + raise ValueError( + f"Expected {count} services in signing config, found {len(result)}" + ) + + return result[:count] + + def get_tlogs(self) -> list[RekorClient]: """ - Returns the rekor transparency logs that client should sign with. - Currently only returns a single one but could in future return several + Returns the rekor transparency log clients to sign with. """ + return [RekorClient(tlog.url) for tlog in self._tlogs] - url = self._get_valid_service_url(self._inner.rekor_tlog_urls) - if not url: - raise Error("No valid Rekor transparency log found in signing config") - return [url] - - def get_fulcio_url(self) -> str: + def get_fulcio(self) -> FulcioClient: """ - Returns url for the fulcio instance that client should use to get a - signing certificate from + Returns a Fulcio client to get a signing certificate from """ - url = self._get_valid_service_url(self._inner.ca_urls) - if not url: - raise Error("No valid Fulcio CA found in signing config") - return url + return FulcioClient(self._fulcios[0].url) def get_oidc_url(self) -> str: """ Returns url for the OIDC provider that client should use to interactively authenticate. """ - url = self._get_valid_service_url(self._inner.oidc_urls) - if not url: + if not self._oidcs: raise Error("No valid OIDC provider found in signing config") - return url + return self._oidcs[0].url - def get_tsa_urls(self) -> list[str]: + def get_tsas(self) -> list[TimestampAuthorityClient]: """ - Returns timestamp authority API end points. Currently returns a single one - but may return more in future. + Returns timestamp authority clients for urls configured in signing config. """ - url = self._get_valid_service_url(self._inner.tsa_urls) - return [] if url is None else [url] + return [TimestampAuthorityClient(s.url) for s in self._tsas] class TrustedRoot: diff --git a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json index 5a7b47cfb..8e72b7628 100644 --- a/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json +++ b/sigstore/_store/https%3A%2F%2Ftuf-repo-cdn.sigstore.dev/signing_config.v0.2.json @@ -34,6 +34,6 @@ "selector": "ANY" }, "tsaConfig": { - "selector": "ANY" + "selector": "ALL" } } \ No newline at end of file diff --git a/sigstore/sign.py b/sigstore/sign.py index 643cc7960..9b6f58d9b 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -332,12 +332,10 @@ def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext: """ signing_config = trust_config.signing_config return cls( - fulcio=FulcioClient(signing_config.get_fulcio_url()), - rekor=RekorClient(signing_config.get_tlog_urls()[0]), + fulcio=signing_config.get_fulcio(), + rekor=signing_config.get_tlogs()[0], trusted_root=trust_config.trusted_root, - tsa_clients=[ - TimestampAuthorityClient(url) for url in signing_config.get_tsa_urls() - ], + tsa_clients=signing_config.get_tsas(), ) @contextmanager diff --git a/test/unit/internal/test_trust.py b/test/unit/internal/test_trust.py index 71f29af7b..d025307cb 100644 --- a/test/unit/internal/test_trust.py +++ b/test/unit/internal/test_trust.py @@ -20,7 +20,15 @@ from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.x509 import load_pem_x509_certificate from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange +from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import ( + Service, + ServiceConfiguration, + ServiceSelector, +) +from sigstore._internal.fulcio.client import FulcioClient +from sigstore._internal.rekor.client import RekorClient +from sigstore._internal.timestamp import TimestampAuthorityClient from sigstore._internal.trust import ( CertificateAuthority, ClientTrustConfig, @@ -32,6 +40,19 @@ from sigstore._utils import load_pem_public_key from sigstore.errors import Error +# Test data for TestSigningcconfig +_service_v1_op1 = Service("url1", major_api_version=1, operator="op1") +_service2_v1_op1 = Service("url2", major_api_version=1, operator="op1") +_service_v2_op1 = Service("url3", major_api_version=2, operator="op1") +_service_v1_op2 = Service("url4", major_api_version=1, operator="op2") +_service_v1_op3 = Service("url5", major_api_version=1, operator="op3") +_service_v1_op4 = Service( + "url6", + major_api_version=1, + operator="op4", + valid_for=TimeRange(datetime(3000, 1, 1, tzinfo=timezone.utc)), +) + class TestCertificateAuthority: def test_good(self, asset): @@ -56,12 +77,125 @@ def test_good(self, asset): signing_config._inner.media_type == SigningConfig.SigningConfigType.SIGNING_CONFIG_0_2.value ) - assert signing_config.get_fulcio_url() == "https://fulcio.example.com" + + fulcio = signing_config.get_fulcio() + assert isinstance(fulcio, FulcioClient) + assert fulcio.url == "https://fulcio.example.com" assert signing_config.get_oidc_url() == "https://oauth2.example.com/auth" - assert signing_config.get_tlog_urls() == ["https://rekor.example.com"] - assert signing_config.get_tsa_urls() == [ - "https://timestamp.example.com/api/v1/timestamp" - ] + + tlogs = signing_config.get_tlogs() + assert len(tlogs) == 1 + assert isinstance(tlogs[0], RekorClient) + assert tlogs[0].url == "https://rekor.example.com/api/v1" + + tsas = signing_config.get_tsas() + assert len(tsas) == 1 + assert isinstance(tsas[0], TimestampAuthorityClient) + assert tsas[0].url == "https://timestamp.example.com/api/v1/timestamp" + + @pytest.mark.parametrize( + "services, versions, config, expected_result", + [ + pytest.param( + [_service_v1_op1], + [1], + ServiceConfiguration(ServiceSelector.ALL), + [_service_v1_op1], + id="base case", + ), + pytest.param( + [_service_v1_op1, _service2_v1_op1], + [1], + ServiceConfiguration(ServiceSelector.ALL), + [_service2_v1_op1], + id="multiple services, same operator: expect 1 service in result", + ), + pytest.param( + [_service_v1_op1, _service_v1_op2], + [1], + ServiceConfiguration(ServiceSelector.ALL), + [_service_v1_op1, _service_v1_op2], + id="2 services, different operator: expect 2 services in result", + ), + pytest.param( + [_service_v1_op1, _service_v1_op2, _service_v1_op4], + [1], + ServiceConfiguration(ServiceSelector.ALL), + [_service_v1_op1, _service_v1_op2], + id="3 services, one is not yet valid: expect 2 services in result", + ), + pytest.param( + [_service_v1_op1, _service_v1_op2], + [1], + ServiceConfiguration(ServiceSelector.ANY), + [_service_v1_op1], + id="ANY selector: expect 1 service only in result", + ), + pytest.param( + [_service_v1_op1, _service_v1_op2, _service_v1_op3], + [1], + ServiceConfiguration(ServiceSelector.EXACT, 2), + [_service_v1_op1, _service_v1_op2], + id="EXACT selector: expect configured number of services in result", + ), + pytest.param( + [_service_v1_op1, _service_v2_op1], + [1, 2], + ServiceConfiguration(ServiceSelector.ALL), + [_service_v2_op1], + id="services with different version: expect highest version", + ), + pytest.param( + [_service_v1_op1, _service_v2_op1], + [1], + ServiceConfiguration(ServiceSelector.ALL), + [_service_v1_op1], + id="services with different version: expect the supported version", + ), + pytest.param( + [_service_v1_op1, _service_v1_op2], + [2], + ServiceConfiguration(ServiceSelector.ALL), + [], + id="No supported versions: expect no results", + ), + pytest.param( + [_service_v1_op1, _service_v2_op1, _service_v1_op2], + [1], + None, + [_service_v1_op1, _service_v1_op2], + id="services without ServiceConfiguration: expect all supported", + ), + ], + ) + def test_get_valid_services(self, services, versions, config, expected_result): + result = SigningConfig._get_valid_services(services, versions, config) + + assert result == expected_result + + @pytest.mark.parametrize( + "services, versions, config", + [ + ( # ANY selector without services + [], + [1], + ServiceConfiguration(ServiceSelector.ANY), + ), + ( # EXACT selector without enough services + [_service_v1_op1], + [1], + ServiceConfiguration(ServiceSelector.EXACT, 2), + ), + ( # UNDEFINED selector + [_service_v1_op1], + [1], + ServiceConfiguration(ServiceSelector.UNDEFINED, 1), + ), + ], + ) + def test_get_valid_services_fail(self, services, versions, config): + with pytest.raises(ValueError): + SigningConfig._get_valid_services(services, versions, config) class TestTrustedRoot: