Skip to content

trust: Support operator field, support multiple services #1407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ All versions prior to 0.9.0 are untracked.
* ClientTrustConfig now provides methods `production()`, `staging()`and `from_tuf()`
to get access to current client configuration (trusted keys & certificates,
URLs and their validity periods). [#1363](https://github.com/sigstore/sigstore-python/pull/1363)
* SigningConfig now has methods that return actual clients (like `RekorClient`) instead of
just URLs. The returned clients are also filtered according to SigningConfig contents.
[#1407](https://github.com/sigstore/sigstore-python/pull/1407)
Comment on lines +53 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: SigningConfig is a private API, so perhaps we should omit it from the CHANGELOG? Alternatively, if you think it's important to include from a developer/maintainer visibility perspective, maybe we should include "this is a private API" in some form in the CHANGELOG entry so users don't think we're committing to a public interface here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I was thinking of this as well while adding the entry. In a way SigningConfig semi-public since the signing_config property in ClientTrustConfig is currently public -- and ClientTrustConfig is in practice exposed through the sign and verify modules...

I think the intent of the API now is that ClientTrustConfig is public (so users can e.g. call from_production()) but the internals of ClientTrustConfig are not... so in that way this is not an API change but it's debatable.

I might leave it as is for now: We need to rewrite the changelog anyway for next release (combine related changes, try to make a coherent log).

* `--trust-config` now requires a file with SigningConfig v0.2, and is able to fully
configure the used Sigstore instance [#1358]/(https://github.com/sigstore/sigstore-python/pull/1358)
* By default (when `--trust-config` is not used) the whole trust configuration now
Expand Down
120 changes: 78 additions & 42 deletions sigstore/_internal/trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timezone
Expand Down Expand Up @@ -46,6 +47,7 @@
)
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
Service,
ServiceConfiguration,
ServiceSelector,
TransparencyLogInstance,
)
Expand All @@ -56,6 +58,9 @@
TrustedRoot as _TrustedRoot,
)

from sigstore._internal.fulcio.client import FulcioClient
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.timestamp import TimestampAuthorityClient
from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
from sigstore._utils import (
KeyID,
Expand All @@ -66,6 +71,12 @@
)
from sigstore.errors import Error, MetadataError, TUFError, VerificationError

# Versions supported by this client
REKOR_VERSIONS = [1]
TSA_VERSIONS = [1]
FULCIO_VERSIONS = [1]
OIDC_VERSIONS = [1]


def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
"""
Expand Down Expand Up @@ -323,28 +334,34 @@ def __init__(self, inner: _SigningConfig):
@api private
"""
self._inner = inner
self._verify()

def _verify(self) -> None:
"""
Performs various feats of heroism to ensure that the signing config
is well-formed.
"""

# must have a recognized media type.
try:
SigningConfig.SigningConfigType(self._inner.media_type)
except ValueError:
raise Error(f"unsupported signing config format: {self._inner.media_type}")

# currently not supporting other select modes
# TODO: Support other modes ensuring tsa_urls() and tlog_urls() work
if self._inner.rekor_tlog_config.selector != ServiceSelector.ANY:
raise Error(
f"unsupported tlog selector {self._inner.rekor_tlog_config.selector}"
)
if self._inner.tsa_config.selector != ServiceSelector.ANY:
raise Error(f"unsupported TSA selector {self._inner.tsa_config.selector}")
# Create lists of service protos that are valid, selected by the service
# configuration & supported by this client
self._tlogs = self._get_valid_services(
self._inner.rekor_tlog_urls, REKOR_VERSIONS, self._inner.rekor_tlog_config
)
if not self._tlogs:
raise Error("No valid Rekor transparency log found in signing config")

self._tsas = self._get_valid_services(
self._inner.tsa_urls, TSA_VERSIONS, self._inner.tsa_config
)

self._fulcios = self._get_valid_services(
self._inner.ca_urls, FULCIO_VERSIONS, None
)
if not self._fulcios:
raise Error("No valid Fulcio CA found in signing config")

self._oidcs = self._get_valid_services(
self._inner.oidc_urls, OIDC_VERSIONS, None
)

@classmethod
def from_file(
Expand All @@ -356,54 +373,73 @@ def from_file(
return cls(inner)

@staticmethod
def _get_valid_service_url(services: list[Service]) -> str | None:
def _get_valid_services(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jku Sorry about the late review. The only change I might suggest is to sort by newest to oldest rather than assuming it's already sorted. In the Sigstore TUF trusted root, we'll have them sorted already.

This is something I am adding to sigstore-go's implementation now, sigstore-java already added this - sigstore/sigstore-go@main...haydentherapper:sigstore-go:operator#diff-35123d504dffe1b547e48e14acb7c7174416a48566d50e5a6b75e898c5667a4fR121-R123

services: list[Service],
supported_versions: list[int],
config: ServiceConfiguration | None,
) -> list[Service]:
"""Return supported services, taking SigningConfig restrictions into account"""

# split services by operator, only include valid services
services_by_operator: dict[str, list[Service]] = defaultdict(list)
for service in services:
if service.major_api_version != 1:
if service.major_api_version not in supported_versions:
continue

if not _is_timerange_valid(service.valid_for, allow_expired=False):
continue
return service.url
return None

def get_tlog_urls(self) -> list[str]:
services_by_operator[service.operator].append(service)

# build a list of services but make sure we only include one service per operator
# and use the highest version available for that operator
result: list[Service] = []
for op_services in services_by_operator.values():
op_services.sort(key=lambda s: s.major_api_version)
result.append(op_services[-1])

# Depending on ServiceSelector, prune the result list
if not config or config.selector == ServiceSelector.ALL:
return result

if config.selector == ServiceSelector.UNDEFINED:
raise ValueError("Undefined is not a valid signing config ServiceSelector")

# handle EXACT and ANY selectors
count = config.count if config.selector == ServiceSelector.EXACT else 1
if len(result) < count:
raise ValueError(
f"Expected {count} services in signing config, found {len(result)}"
)

return result[:count]

def get_tlogs(self) -> list[RekorClient]:
"""
Returns the rekor transparency logs that client should sign with.
Currently only returns a single one but could in future return several
Returns the rekor transparency log clients to sign with.
"""
return [RekorClient(tlog.url) for tlog in self._tlogs]

url = self._get_valid_service_url(self._inner.rekor_tlog_urls)
if not url:
raise Error("No valid Rekor transparency log found in signing config")
return [url]

def get_fulcio_url(self) -> str:
def get_fulcio(self) -> FulcioClient:
"""
Returns url for the fulcio instance that client should use to get a
signing certificate from
Returns a Fulcio client to get a signing certificate from
"""
url = self._get_valid_service_url(self._inner.ca_urls)
if not url:
raise Error("No valid Fulcio CA found in signing config")
return url
return FulcioClient(self._fulcios[0].url)

def get_oidc_url(self) -> str:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_oidc_url() still returns string, not Issuer object.

This method is now the odd one out: it could return Issuer but as mentioned that will need a small refactoring in Issuer -- I can handle that in a followup if the design here is reasonable to reviewers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design looks reasonable to me; doing it in a followup sounds good!

"""
Returns url for the OIDC provider that client should use to interactively
authenticate.
"""
url = self._get_valid_service_url(self._inner.oidc_urls)
if not url:
if not self._oidcs:
raise Error("No valid OIDC provider found in signing config")
return url
return self._oidcs[0].url

def get_tsa_urls(self) -> list[str]:
def get_tsas(self) -> list[TimestampAuthorityClient]:
"""
Returns timestamp authority API end points. Currently returns a single one
but may return more in future.
Returns timestamp authority clients for urls configured in signing config.
"""
url = self._get_valid_service_url(self._inner.tsa_urls)
return [] if url is None else [url]
return [TimestampAuthorityClient(s.url) for s in self._tsas]


class TrustedRoot:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
"selector": "ANY"
},
"tsaConfig": {
"selector": "ANY"
"selector": "ALL"
}
}
8 changes: 3 additions & 5 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,10 @@ def from_trust_config(cls, trust_config: ClientTrustConfig) -> SigningContext:
"""
signing_config = trust_config.signing_config
return cls(
fulcio=FulcioClient(signing_config.get_fulcio_url()),
rekor=RekorClient(signing_config.get_tlog_urls()[0]),
fulcio=signing_config.get_fulcio(),
rekor=signing_config.get_tlogs()[0],
trusted_root=trust_config.trusted_root,
tsa_clients=[
TimestampAuthorityClient(url) for url in signing_config.get_tsa_urls()
],
tsa_clients=signing_config.get_tsas(),
)

@contextmanager
Expand Down
144 changes: 139 additions & 5 deletions test/unit/internal/test_trust.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.x509 import load_pem_x509_certificate
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
Service,
ServiceConfiguration,
ServiceSelector,
)

from sigstore._internal.fulcio.client import FulcioClient
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.timestamp import TimestampAuthorityClient
from sigstore._internal.trust import (
CertificateAuthority,
ClientTrustConfig,
Expand All @@ -32,6 +40,19 @@
from sigstore._utils import load_pem_public_key
from sigstore.errors import Error

# Test data for TestSigningcconfig
_service_v1_op1 = Service("url1", major_api_version=1, operator="op1")
_service2_v1_op1 = Service("url2", major_api_version=1, operator="op1")
_service_v2_op1 = Service("url3", major_api_version=2, operator="op1")
_service_v1_op2 = Service("url4", major_api_version=1, operator="op2")
_service_v1_op3 = Service("url5", major_api_version=1, operator="op3")
_service_v1_op4 = Service(
"url6",
major_api_version=1,
operator="op4",
valid_for=TimeRange(datetime(3000, 1, 1, tzinfo=timezone.utc)),
)


class TestCertificateAuthority:
def test_good(self, asset):
Expand All @@ -56,12 +77,125 @@ def test_good(self, asset):
signing_config._inner.media_type
== SigningConfig.SigningConfigType.SIGNING_CONFIG_0_2.value
)
assert signing_config.get_fulcio_url() == "https://fulcio.example.com"

fulcio = signing_config.get_fulcio()
assert isinstance(fulcio, FulcioClient)
assert fulcio.url == "https://fulcio.example.com"
assert signing_config.get_oidc_url() == "https://oauth2.example.com/auth"
assert signing_config.get_tlog_urls() == ["https://rekor.example.com"]
assert signing_config.get_tsa_urls() == [
"https://timestamp.example.com/api/v1/timestamp"
]

tlogs = signing_config.get_tlogs()
assert len(tlogs) == 1
assert isinstance(tlogs[0], RekorClient)
assert tlogs[0].url == "https://rekor.example.com/api/v1"

tsas = signing_config.get_tsas()
assert len(tsas) == 1
assert isinstance(tsas[0], TimestampAuthorityClient)
assert tsas[0].url == "https://timestamp.example.com/api/v1/timestamp"

@pytest.mark.parametrize(
"services, versions, config, expected_result",
[
pytest.param(
[_service_v1_op1],
[1],
ServiceConfiguration(ServiceSelector.ALL),
[_service_v1_op1],
id="base case",
),
pytest.param(
[_service_v1_op1, _service2_v1_op1],
[1],
ServiceConfiguration(ServiceSelector.ALL),
[_service2_v1_op1],
id="multiple services, same operator: expect 1 service in result",
),
pytest.param(
[_service_v1_op1, _service_v1_op2],
[1],
ServiceConfiguration(ServiceSelector.ALL),
[_service_v1_op1, _service_v1_op2],
id="2 services, different operator: expect 2 services in result",
),
pytest.param(
[_service_v1_op1, _service_v1_op2, _service_v1_op4],
[1],
ServiceConfiguration(ServiceSelector.ALL),
[_service_v1_op1, _service_v1_op2],
id="3 services, one is not yet valid: expect 2 services in result",
),
pytest.param(
[_service_v1_op1, _service_v1_op2],
[1],
ServiceConfiguration(ServiceSelector.ANY),
[_service_v1_op1],
id="ANY selector: expect 1 service only in result",
),
pytest.param(
[_service_v1_op1, _service_v1_op2, _service_v1_op3],
[1],
ServiceConfiguration(ServiceSelector.EXACT, 2),
[_service_v1_op1, _service_v1_op2],
id="EXACT selector: expect configured number of services in result",
),
pytest.param(
[_service_v1_op1, _service_v2_op1],
[1, 2],
ServiceConfiguration(ServiceSelector.ALL),
[_service_v2_op1],
id="services with different version: expect highest version",
),
pytest.param(
[_service_v1_op1, _service_v2_op1],
[1],
ServiceConfiguration(ServiceSelector.ALL),
[_service_v1_op1],
id="services with different version: expect the supported version",
),
pytest.param(
[_service_v1_op1, _service_v1_op2],
[2],
ServiceConfiguration(ServiceSelector.ALL),
[],
id="No supported versions: expect no results",
),
pytest.param(
[_service_v1_op1, _service_v2_op1, _service_v1_op2],
[1],
None,
[_service_v1_op1, _service_v1_op2],
id="services without ServiceConfiguration: expect all supported",
),
],
)
def test_get_valid_services(self, services, versions, config, expected_result):
result = SigningConfig._get_valid_services(services, versions, config)

assert result == expected_result

@pytest.mark.parametrize(
"services, versions, config",
[
( # ANY selector without services
[],
[1],
ServiceConfiguration(ServiceSelector.ANY),
),
( # EXACT selector without enough services
[_service_v1_op1],
[1],
ServiceConfiguration(ServiceSelector.EXACT, 2),
),
( # UNDEFINED selector
[_service_v1_op1],
[1],
ServiceConfiguration(ServiceSelector.UNDEFINED, 1),
),
],
)
def test_get_valid_services_fail(self, services, versions, config):
with pytest.raises(ValueError):
SigningConfig._get_valid_services(services, versions, config)


class TestTrustedRoot:
Expand Down