From ea890a02ab8595aea09bf295286901c059f85af1 Mon Sep 17 00:00:00 2001 From: Ramon Petgrave Date: Tue, 1 Jul 2025 19:10:01 +0000 Subject: [PATCH 1/9] Sign multiple artifacts in threads This is (especially) useful with rekor 2 where the service only responds after log inclusion: We would prefer to get all signatures in the same inclusion batch. The change still affects both rekor v1 and rekor v2. This commit is a rebase of a bunch of Ramons commits. Signed-off-by: Ramon Petgrave Signed-off-by: Jussi Kukkonen --- sigstore/_cli.py | 122 +++++++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/sigstore/_cli.py b/sigstore/_cli.py index fab51ac65..b2a96fa24 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -20,6 +20,7 @@ import logging import os import sys +from concurrent import futures from dataclasses import dataclass from pathlib import Path from typing import Any, NoReturn, TextIO, Union @@ -667,63 +668,72 @@ def _sign_common( _invalid_arguments(args, "No identity token supplied or detected!") with signing_ctx.signer(identity) as signer: - for file, outputs in output_map.items(): - _logger.debug(f"signing for {file.name}") - with file.open(mode="rb") as io: - # The input can be indefinitely large, so we perform a streaming - # digest and sign the prehash rather than buffering it fully. - digest = sha256_digest(io) - try: - if predicate is None: - result = signer.sign_artifact(input_=digest) - else: - subject = Subject( - name=file.name, digest={"sha256": digest.digest.hex()} - ) - predicate_type = args.predicate_type - statement_builder = StatementBuilder( - subjects=[subject], - predicate_type=predicate_type, - predicate=predicate, - ) - result = signer.sign_dsse(statement_builder.build()) - except ExpiredIdentity as exp_identity: - print("Signature failed: identity token has expired") - raise exp_identity - - except ExpiredCertificate as exp_certificate: - print("Signature failed: Fulcio signing certificate has expired") - raise exp_certificate - - print("Using ephemeral certificate:") - cert = result.signing_certificate - cert_pem = cert.public_bytes(Encoding.PEM).decode() - print(cert_pem) - - print( - f"Transparency log entry created at index: {result.log_entry.log_index}" - ) + with futures.ThreadPoolExecutor(max_workers=10) as executor: + + def _sign_file(file: Path, outputs: SigningOutputs) -> None: + _logger.debug(f"signing for {file.name}") + with file.open(mode="rb") as io: + # The input can be indefinitely large, so we perform a streaming + # digest and sign the prehash rather than buffering it fully. + digest = sha256_digest(io) + try: + if predicate is None: + result = signer.sign_artifact(input_=digest) + else: + subject = Subject( + name=file.name, digest={"sha256": digest.digest.hex()} + ) + predicate_type = args.predicate_type + statement_builder = StatementBuilder( + subjects=[subject], + predicate_type=predicate_type, + predicate=predicate, + ) + result = signer.sign_dsse(statement_builder.build()) + except ExpiredIdentity as exp_identity: + print("Signature failed: identity token has expired") + raise exp_identity + + except ExpiredCertificate as exp_certificate: + print("Signature failed: Fulcio signing certificate has expired") + raise exp_certificate + + print("Using ephemeral certificate:") + cert = result.signing_certificate + cert_pem = cert.public_bytes(Encoding.PEM).decode() + print(cert_pem) + + print( + f"Transparency log entry created at index: {result.log_entry.log_index}" + ) - sig_output: TextIO - if outputs.signature is not None: - sig_output = outputs.signature.open("w") - else: - sig_output = sys.stdout - - signature = base64.b64encode(result.signature).decode() - print(signature, file=sig_output) - if outputs.signature is not None: - print(f"Signature written to {outputs.signature}") - - if outputs.certificate is not None: - with outputs.certificate.open(mode="w") as io: - print(cert_pem, file=io) - print(f"Certificate written to {outputs.certificate}") - - if outputs.bundle is not None: - with outputs.bundle.open(mode="w") as io: - print(result.to_json(), file=io) - print(f"Sigstore bundle written to {outputs.bundle}") + sig_output: TextIO + if outputs.signature is not None: + sig_output = outputs.signature.open("w") + else: + sig_output = sys.stdout + + signature = base64.b64encode(result.signature).decode() + print(signature, file=sig_output) + if outputs.signature is not None: + print(f"Signature written to {outputs.signature}") + + if outputs.certificate is not None: + with outputs.certificate.open(mode="w") as io: + print(cert_pem, file=io) + print(f"Certificate written to {outputs.certificate}") + + if outputs.bundle is not None: + with outputs.bundle.open(mode="w") as io: + print(result.to_json(), file=io) + print(f"Sigstore bundle written to {outputs.bundle}") + + jobs = [ + executor.submit(_sign_file, file, outputs) + for file, outputs in output_map.items() + ] + for job in futures.as_completed(jobs): + job.result() def _attest(args: argparse.Namespace) -> None: From e8e0298782118a8b71a3f0e892e57ef3e3f33757 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 10 Jul 2025 10:55:46 +0300 Subject: [PATCH 2/9] rekor: Make create_entry() conservative WRT session use * Session thread safety is ambiguous: it may now be safe but situation is unclear * Use a single Session per create_entry() call: This may have a little more overhead but seems like the safest option * Note that RekorLog (v1) other methods may not be thread safe: I only reviewed the create_entry() flow Signed-off-by: Jussi Kukkonen --- sigstore/_internal/rekor/client.py | 22 ++++++++++++--------- sigstore/_internal/rekor/client_v2.py | 28 +++++++++++++-------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index dfa6da446..cb2e5f872 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -74,6 +74,8 @@ def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo: class _Endpoint(ABC): def __init__(self, url: str, session: requests.Session) -> None: + # Note that _Endpoint may not be thread be safe if the same Session is provided + # to an _Endpoint in multiple threads self.url = url self.session = session @@ -210,20 +212,19 @@ def __init__(self, url: str) -> None: Create a new `RekorClient` from the given URL. """ self.url = f"{url}/api/v1" - self.session = requests.Session() - self.session.headers.update( + + def _session(self) -> requests.Session: + # We do not use a long living session to avoid potential thread safety issues: + # submitting entries via create_entry() should be thread safe. + session = requests.Session() + session.headers.update( { "Content-Type": "application/json", "Accept": "application/json", "User-Agent": USER_AGENT, } ) - - def __del__(self) -> None: - """ - Terminates the underlying network session. - """ - self.session.close() + return session @classmethod def production(cls) -> RekorClient: @@ -246,7 +247,10 @@ def log(self) -> RekorLog: """ Returns a `RekorLog` adapter for making requests to a Rekor log. """ - return RekorLog(f"{self.url}/log", session=self.session) + + # Each RekorLog gets their own session + with self._session() as s: + return RekorLog(f"{self.url}/log", session=s) def create_entry(self, request: EntryRequestBody) -> LogEntry: """ diff --git a/sigstore/_internal/rekor/client_v2.py b/sigstore/_internal/rekor/client_v2.py index a7d4e9327..94c76c630 100644 --- a/sigstore/_internal/rekor/client_v2.py +++ b/sigstore/_internal/rekor/client_v2.py @@ -55,20 +55,6 @@ def __init__(self, base_url: str) -> None: Create a new `RekorV2Client` from the given URL. """ self.url = f"{base_url}/api/v2" - self.session = requests.Session() - self.session.headers.update( - { - "Content-Type": "application/json", - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) - - def __del__(self) -> None: - """ - Terminates the underlying network session. - """ - self.session.close() def create_entry(self, payload: EntryRequestBody) -> LogEntry: """ @@ -79,7 +65,19 @@ def create_entry(self, payload: EntryRequestBody) -> LogEntry: https://github.com/sigstore/rekor-tiles/blob/main/CLIENTS.md#handling-longer-requests """ _logger.debug(f"proposed: {json.dumps(payload)}") - resp = self.session.post( + + # Use a short lived session to avoid potential issues with multi-threading: + # Session thread-safety is ambiguous + session = requests.Session() + session.headers.update( + { + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": USER_AGENT, + } + ) + + resp = session.post( f"{self.url}/log/entries", json=payload, ) From 064d24c4922c23a726273316c0b43e0f7667115f Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 10 Jul 2025 11:17:41 +0300 Subject: [PATCH 3/9] timestamp: Dont reuse session It's a little unclear if Session is now thread safe or not: avoid reuse just in case Signed-off-by: Jussi Kukkonen --- sigstore/_internal/timestamp.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/sigstore/_internal/timestamp.py b/sigstore/_internal/timestamp.py index fe210f4fc..62883636e 100644 --- a/sigstore/_internal/timestamp.py +++ b/sigstore/_internal/timestamp.py @@ -68,19 +68,6 @@ def __init__(self, url: str) -> None: Create a new `TimestampAuthorityClient` from the given URL. """ self.url = url - self.session = requests.Session() - self.session.headers.update( - { - "Content-Type": "application/timestamp-query", - "User-Agent": USER_AGENT, - } - ) - - def __del__(self) -> None: - """ - Terminates the underlying network session. - """ - self.session.close() def request_timestamp(self, signature: bytes) -> TimeStampResponse: """ @@ -104,9 +91,18 @@ def request_timestamp(self, signature: bytes) -> TimeStampResponse: msg = f"invalid request: {error}" raise TimestampError(msg) + # Use single use session to avoid potential Session thread safety issues + session = requests.Session() + session.headers.update( + { + "Content-Type": "application/timestamp-query", + "User-Agent": USER_AGENT, + } + ) + # Send it to the TSA for signing try: - response = self.session.post( + response = session.post( self.url, data=timestamp_request.as_bytes(), timeout=CLIENT_TIMEOUT, From d92680312d884b3fa5c9a4a234dc5f700730dc30 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 10 Jul 2025 11:49:38 +0300 Subject: [PATCH 4/9] CLI: Refactor terminal output * Separate the thread method so it's easier to see potential safety issues * Using print() with the same file object is generally not thread safe: Avoid it from the threaded method The output remains effectively the same except: * The b64 encoded signature is no longer printed to terminal * Some print()s are now logger.info(): e.g. Transparency log entry created at index: 5562 * Other print()s happen in a batch now, after he signing has finished Signed-off-by: Jussi Kukkonen --- sigstore/_cli.py | 140 ++++++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 62 deletions(-) diff --git a/sigstore/_cli.py b/sigstore/_cli.py index b2a96fa24..e0a00dd9b 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -23,7 +23,7 @@ from concurrent import futures from dataclasses import dataclass from pathlib import Path -from typing import Any, NoReturn, TextIO, Union +from typing import Any, NoReturn, Union from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509 import load_pem_x509_certificate @@ -57,7 +57,7 @@ Issuer, detect_credential, ) -from sigstore.sign import SigningContext +from sigstore.sign import Signer, SigningContext from sigstore.verify import ( Verifier, policy, @@ -637,6 +637,57 @@ def _get_identity_token(args: argparse.Namespace) -> None: _invalid_arguments(args, "No identity token supplied or detected!") +def _sign_file_threaded( + signer: Signer, + predicate_type: str | None, + predicate: dict[str, Any] | None, + file: Path, + outputs: SigningOutputs, +) -> None: + """sign method to be called from signing thread""" + _logger.debug(f"signing for {file.name}") + with file.open(mode="rb") as io: + # The input can be indefinitely large, so we perform a streaming + # digest and sign the prehash rather than buffering it fully. + digest = sha256_digest(io) + try: + if predicate is None: + result = signer.sign_artifact(input_=digest) + else: + subject = Subject(name=file.name, digest={"sha256": digest.digest.hex()}) + statement_builder = StatementBuilder( + subjects=[subject], + predicate_type=predicate_type, + predicate=predicate, + ) + result = signer.sign_dsse(statement_builder.build()) + except ExpiredIdentity as exp_identity: + _logger.error("Signature failed: identity token has expired") + raise exp_identity + + except ExpiredCertificate as exp_certificate: + _logger.error("Signature failed: Fulcio signing certificate has expired") + raise exp_certificate + + _logger.info( + f"Transparency log entry created at index: {result.log_entry.log_index}" + ) + + if outputs.signature is not None: + signature = base64.b64encode(result.signature).decode() + with outputs.signature.open(mode="w") as io: + print(signature, file=io) + + if outputs.certificate is not None: + cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode() + with outputs.certificate.open(mode="w") as io: + print(cert_pem, file=io) + + if outputs.bundle is not None: + with outputs.bundle.open(mode="w") as io: + print(result.to_json(), file=io) + + def _sign_common( args: argparse.Namespace, output_map: OutputMap, predicate: dict[str, Any] | None ) -> None: @@ -667,74 +718,39 @@ def _sign_common( if not identity: _invalid_arguments(args, "No identity token supplied or detected!") - with signing_ctx.signer(identity) as signer: - with futures.ThreadPoolExecutor(max_workers=10) as executor: - - def _sign_file(file: Path, outputs: SigningOutputs) -> None: - _logger.debug(f"signing for {file.name}") - with file.open(mode="rb") as io: - # The input can be indefinitely large, so we perform a streaming - # digest and sign the prehash rather than buffering it fully. - digest = sha256_digest(io) - try: - if predicate is None: - result = signer.sign_artifact(input_=digest) - else: - subject = Subject( - name=file.name, digest={"sha256": digest.digest.hex()} - ) - predicate_type = args.predicate_type - statement_builder = StatementBuilder( - subjects=[subject], - predicate_type=predicate_type, - predicate=predicate, - ) - result = signer.sign_dsse(statement_builder.build()) - except ExpiredIdentity as exp_identity: - print("Signature failed: identity token has expired") - raise exp_identity - - except ExpiredCertificate as exp_certificate: - print("Signature failed: Fulcio signing certificate has expired") - raise exp_certificate - - print("Using ephemeral certificate:") - cert = result.signing_certificate - cert_pem = cert.public_bytes(Encoding.PEM).decode() - print(cert_pem) - - print( - f"Transparency log entry created at index: {result.log_entry.log_index}" - ) - - sig_output: TextIO - if outputs.signature is not None: - sig_output = outputs.signature.open("w") - else: - sig_output = sys.stdout - - signature = base64.b64encode(result.signature).decode() - print(signature, file=sig_output) - if outputs.signature is not None: - print(f"Signature written to {outputs.signature}") + # Not all commands provide --predicate-type + predicate_type = getattr(args, "predicate_type", None) - if outputs.certificate is not None: - with outputs.certificate.open(mode="w") as io: - print(cert_pem, file=io) - print(f"Certificate written to {outputs.certificate}") - - if outputs.bundle is not None: - with outputs.bundle.open(mode="w") as io: - print(result.to_json(), file=io) - print(f"Sigstore bundle written to {outputs.bundle}") + with signing_ctx.signer(identity) as signer: + print("Using ephemeral certificate:") + cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode() + print(cert_pem) + # sign in threads: this is relevant for especially Rekor v2 as otherwise we wait + # for log inclusion for each signature separately + with futures.ThreadPoolExecutor(max_workers=10) as executor: jobs = [ - executor.submit(_sign_file, file, outputs) + executor.submit( + _sign_file_threaded, + signer, + predicate_type, + predicate, + file, + outputs, + ) for file, outputs in output_map.items() ] for job in futures.as_completed(jobs): job.result() + for file, outputs in output_map.items(): + if outputs.signature is not None: + print(f"Signature written to {outputs.signature}") + if outputs.certificate is not None: + print(f"Certificate written to {outputs.certificate}") + if outputs.bundle is not None: + print(f"Sigstore bundle written to {outputs.bundle}") + def _attest(args: argparse.Namespace) -> None: predicate_path = args.predicate From 9d6455e968aa2f22feea3b66ca44eb6d7b84428b Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Thu, 10 Jul 2025 12:43:33 +0300 Subject: [PATCH 5/9] tests: Test signing multiple artifacts Signed-off-by: Jussi Kukkonen --- test/assets/integration/b.txt | 5 +++++ test/assets/integration/c.txt | 5 +++++ test/integration/cli/test_sign.py | 35 +++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 test/assets/integration/b.txt create mode 100644 test/assets/integration/c.txt diff --git a/test/assets/integration/b.txt b/test/assets/integration/b.txt new file mode 100644 index 000000000..51c9c73f2 --- /dev/null +++ b/test/assets/integration/b.txt @@ -0,0 +1,5 @@ +DO NOT MODIFY ME! + +this is "b.txt", a sample input for sigstore-python's unit tests. + +DO NOT MODIFY ME! diff --git a/test/assets/integration/c.txt b/test/assets/integration/c.txt new file mode 100644 index 000000000..5e897d322 --- /dev/null +++ b/test/assets/integration/c.txt @@ -0,0 +1,5 @@ +DO NOT MODIFY ME! + +this is "c.txt", a sample input for sigstore-python's unit tests. + +DO NOT MODIFY ME! diff --git a/test/integration/cli/test_sign.py b/test/integration/cli/test_sign.py index 4d0953db7..e05e998a9 100644 --- a/test/integration/cli/test_sign.py +++ b/test/integration/cli/test_sign.py @@ -81,6 +81,41 @@ def test_sign_success_default_output_bundle(capsys, sigstore, asset_integration) ) +@pytest.mark.staging +@pytest.mark.ambient_oidc +def test_sign_success_multiple_artifacts(capsys, sigstore, asset_integration): + artifacts = [ + asset_integration("a.txt"), + asset_integration("b.txt"), + asset_integration("c.txt"), + ] + + sigstore( + *get_cli_params( + artifact_paths=artifacts, + ) + ) + + captures = capsys.readouterr() + + for artifact in artifacts: + expected_output_bundle = Path(f"{artifact}.sigstore.json") + + assert f"Sigstore bundle written to {expected_output_bundle}\n" in captures.out + + assert expected_output_bundle.exists() + verifier = Verifier.staging() + with ( + open(expected_output_bundle, "r") as bundle_file, + open(artifact, "rb") as input_file, + ): + bundle = Bundle.from_json(bundle_file.read()) + expected_output_bundle.unlink() + verifier.verify_artifact( + input_=input_file.read(), bundle=bundle, policy=UnsafeNoOp() + ) + + @pytest.mark.staging @pytest.mark.ambient_oidc def test_sign_success_custom_outputs(capsys, sigstore, asset_integration, tmp_path): From 761a76de700cf7413b953f6770eb1652e2f5f81f Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 16 Jul 2025 10:44:45 +0300 Subject: [PATCH 6/9] Add test that signs multiple artifacts with rekor2 Signed-off-by: Jussi Kukkonen --- test/integration/cli/test_sign.py | 48 ++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/test/integration/cli/test_sign.py b/test/integration/cli/test_sign.py index e05e998a9..dab1aa4d0 100644 --- a/test/integration/cli/test_sign.py +++ b/test/integration/cli/test_sign.py @@ -29,8 +29,13 @@ def get_cli_params( bundle_path: Optional[Path] = None, signature_path: Optional[Path] = None, certificate_path: Optional[Path] = None, + trust_config_path: Optional[Path] = None, ) -> list[str]: - cli_params = ["--staging", "sign"] + if trust_config_path is not None: + cli_params = ["--trust-config", str(trust_config_path), "sign"] + else: + cli_params = ["--staging", "sign"] + if output_directory is not None: cli_params.extend(["--output-directory", str(output_directory)]) if bundle_path is not None: @@ -116,6 +121,47 @@ def test_sign_success_multiple_artifacts(capsys, sigstore, asset_integration): ) +@pytest.mark.staging +@pytest.mark.ambient_oidc +def test_sign_success_multiple_artifacts_rekor_v2(capsys, sigstore, asset_integration, asset): + """This is a copy of test_sign_success_multiple_artifacts that exists to ensure the + multi-threaded signing works with rekor v2 as well: this test can be removed when v2 + is the default + """ + + artifacts = [ + asset_integration("a.txt"), + asset_integration("b.txt"), + asset_integration("c.txt"), + ] + + sigstore( + *get_cli_params( + artifact_paths=artifacts, + trust_config_path=asset("trust_config/staging-but-sign-with-rekor-v2.json") + ) + ) + + captures = capsys.readouterr() + + for artifact in artifacts: + expected_output_bundle = Path(f"{artifact}.sigstore.json") + + assert f"Sigstore bundle written to {expected_output_bundle}\n" in captures.out + + assert expected_output_bundle.exists() + verifier = Verifier.staging() + with ( + open(expected_output_bundle, "r") as bundle_file, + open(artifact, "rb") as input_file, + ): + bundle = Bundle.from_json(bundle_file.read()) + expected_output_bundle.unlink() + verifier.verify_artifact( + input_=input_file.read(), bundle=bundle, policy=UnsafeNoOp() + ) + + @pytest.mark.staging @pytest.mark.ambient_oidc def test_sign_success_custom_outputs(capsys, sigstore, asset_integration, tmp_path): From 05168c39c7f0bbb586e11d0dd9dd5d11ea2c6e9a Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 16 Jul 2025 14:25:44 +0300 Subject: [PATCH 7/9] tests: lint fixes Signed-off-by: Jussi Kukkonen --- test/integration/cli/test_sign.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/integration/cli/test_sign.py b/test/integration/cli/test_sign.py index dab1aa4d0..f90fce641 100644 --- a/test/integration/cli/test_sign.py +++ b/test/integration/cli/test_sign.py @@ -123,7 +123,9 @@ def test_sign_success_multiple_artifacts(capsys, sigstore, asset_integration): @pytest.mark.staging @pytest.mark.ambient_oidc -def test_sign_success_multiple_artifacts_rekor_v2(capsys, sigstore, asset_integration, asset): +def test_sign_success_multiple_artifacts_rekor_v2( + capsys, sigstore, asset_integration, asset +): """This is a copy of test_sign_success_multiple_artifacts that exists to ensure the multi-threaded signing works with rekor v2 as well: this test can be removed when v2 is the default @@ -138,7 +140,7 @@ def test_sign_success_multiple_artifacts_rekor_v2(capsys, sigstore, asset_integr sigstore( *get_cli_params( artifact_paths=artifacts, - trust_config_path=asset("trust_config/staging-but-sign-with-rekor-v2.json") + trust_config_path=asset("trust_config/staging-but-sign-with-rekor-v2.json"), ) ) From 7a9e1e57ad9ee3b4d381f6c71d1ad6adbb49e635 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Mon, 21 Jul 2025 09:30:06 +0300 Subject: [PATCH 8/9] rekor: Refactor Session handling in RekorClient Make every RekorLog have a Session of their own by default. This means RekorClient no longer needs to manage that. Signed-off-by: Jussi Kukkonen --- sigstore/_internal/rekor/client.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index cb2e5f872..4dc8e09c6 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -73,10 +73,20 @@ def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo: class _Endpoint(ABC): - def __init__(self, url: str, session: requests.Session) -> None: + def __init__(self, url: str, session: requests.Session | None = None) -> None: # Note that _Endpoint may not be thread be safe if the same Session is provided # to an _Endpoint in multiple threads self.url = url + if session is None: + session = requests.Session() + session.headers.update( + { + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": USER_AGENT, + } + ) + self.session = session @@ -213,19 +223,6 @@ def __init__(self, url: str) -> None: """ self.url = f"{url}/api/v1" - def _session(self) -> requests.Session: - # We do not use a long living session to avoid potential thread safety issues: - # submitting entries via create_entry() should be thread safe. - session = requests.Session() - session.headers.update( - { - "Content-Type": "application/json", - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) - return session - @classmethod def production(cls) -> RekorClient: """ @@ -248,9 +245,7 @@ def log(self) -> RekorLog: Returns a `RekorLog` adapter for making requests to a Rekor log. """ - # Each RekorLog gets their own session - with self._session() as s: - return RekorLog(f"{self.url}/log", session=s) + return RekorLog(f"{self.url}/log") def create_entry(self, request: EntryRequestBody) -> LogEntry: """ From 83106d1d53ee229cd98645bc31cc0887eade1f63 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Wed, 23 Jul 2025 15:45:49 +0300 Subject: [PATCH 9/9] cli: Let Python pick number of signing threads This number does affect the number of concurrent rekor POST requests we have in flight, but we are unlikely to hit rate limits as they are defined in "requests from same host per minute". Signed-off-by: Jussi Kukkonen --- sigstore/_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigstore/_cli.py b/sigstore/_cli.py index e0a00dd9b..4ed1219b7 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -728,7 +728,7 @@ def _sign_common( # sign in threads: this is relevant for especially Rekor v2 as otherwise we wait # for log inclusion for each signature separately - with futures.ThreadPoolExecutor(max_workers=10) as executor: + with futures.ThreadPoolExecutor() as executor: jobs = [ executor.submit( _sign_file_threaded,