Skip to content

Commit 3a26b7d

Browse files
jkuramonpetgrave64
andauthored
Parallel signing (#1468)
* Sign multiple artifacts in threads This is (especially) useful with rekor 2 where the service only responds after log inclusion: We would prefer to get all signatures in the same inclusion batch. The change still affects both rekor v1 and rekor v2. This commit is a rebase of a bunch of Ramons commits. Signed-off-by: Ramon Petgrave <[email protected]> Signed-off-by: Jussi Kukkonen <[email protected]> * rekor: Make create_entry() conservative WRT session use * Session thread safety is ambiguous: it may now be safe but situation is unclear * Use a single Session per create_entry() call: This may have a little more overhead but seems like the safest option * Note that RekorLog (v1) other methods may not be thread safe: I only reviewed the create_entry() flow Signed-off-by: Jussi Kukkonen <[email protected]> * timestamp: Dont reuse session It's a little unclear if Session is now thread safe or not: avoid reuse just in case Signed-off-by: Jussi Kukkonen <[email protected]> * CLI: Refactor terminal output * Separate the thread method so it's easier to see potential safety issues * Using print() with the same file object is generally not thread safe: Avoid it from the threaded method The output remains effectively the same except: * The b64 encoded signature is no longer printed to terminal * Some print()s are now logger.info(): e.g. Transparency log entry created at index: 5562 * Other print()s happen in a batch now, after he signing has finished Signed-off-by: Jussi Kukkonen <[email protected]> * tests: Test signing multiple artifacts Signed-off-by: Jussi Kukkonen <[email protected]> * Add test that signs multiple artifacts with rekor2 Signed-off-by: Jussi Kukkonen <[email protected]> * tests: lint fixes Signed-off-by: Jussi Kukkonen <[email protected]> * rekor: Refactor Session handling in RekorClient Make every RekorLog have a Session of their own by default. This means RekorClient no longer needs to manage that. Signed-off-by: Jussi Kukkonen <[email protected]> * cli: Let Python pick number of signing threads This number does affect the number of concurrent rekor POST requests we have in flight, but we are unlikely to hit rate limits as they are defined in "requests from same host per minute". Signed-off-by: Jussi Kukkonen <[email protected]> --------- Signed-off-by: Ramon Petgrave <[email protected]> Signed-off-by: Jussi Kukkonen <[email protected]> Co-authored-by: Ramon Petgrave <[email protected]>
1 parent 995e7c8 commit 3a26b7d

File tree

7 files changed

+210
-98
lines changed

7 files changed

+210
-98
lines changed

sigstore/_cli.py

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import logging
2121
import os
2222
import sys
23+
from concurrent import futures
2324
from dataclasses import dataclass
2425
from pathlib import Path
25-
from typing import Any, NoReturn, TextIO, Union
26+
from typing import Any, NoReturn, Union
2627

2728
from cryptography.hazmat.primitives.serialization import Encoding
2829
from cryptography.x509 import load_pem_x509_certificate
@@ -56,7 +57,7 @@
5657
Issuer,
5758
detect_credential,
5859
)
59-
from sigstore.sign import SigningContext
60+
from sigstore.sign import Signer, SigningContext
6061
from sigstore.verify import (
6162
Verifier,
6263
policy,
@@ -636,6 +637,57 @@ def _get_identity_token(args: argparse.Namespace) -> None:
636637
_invalid_arguments(args, "No identity token supplied or detected!")
637638

638639

640+
def _sign_file_threaded(
641+
signer: Signer,
642+
predicate_type: str | None,
643+
predicate: dict[str, Any] | None,
644+
file: Path,
645+
outputs: SigningOutputs,
646+
) -> None:
647+
"""sign method to be called from signing thread"""
648+
_logger.debug(f"signing for {file.name}")
649+
with file.open(mode="rb") as io:
650+
# The input can be indefinitely large, so we perform a streaming
651+
# digest and sign the prehash rather than buffering it fully.
652+
digest = sha256_digest(io)
653+
try:
654+
if predicate is None:
655+
result = signer.sign_artifact(input_=digest)
656+
else:
657+
subject = Subject(name=file.name, digest={"sha256": digest.digest.hex()})
658+
statement_builder = StatementBuilder(
659+
subjects=[subject],
660+
predicate_type=predicate_type,
661+
predicate=predicate,
662+
)
663+
result = signer.sign_dsse(statement_builder.build())
664+
except ExpiredIdentity as exp_identity:
665+
_logger.error("Signature failed: identity token has expired")
666+
raise exp_identity
667+
668+
except ExpiredCertificate as exp_certificate:
669+
_logger.error("Signature failed: Fulcio signing certificate has expired")
670+
raise exp_certificate
671+
672+
_logger.info(
673+
f"Transparency log entry created at index: {result.log_entry.log_index}"
674+
)
675+
676+
if outputs.signature is not None:
677+
signature = base64.b64encode(result.signature).decode()
678+
with outputs.signature.open(mode="w") as io:
679+
print(signature, file=io)
680+
681+
if outputs.certificate is not None:
682+
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
683+
with outputs.certificate.open(mode="w") as io:
684+
print(cert_pem, file=io)
685+
686+
if outputs.bundle is not None:
687+
with outputs.bundle.open(mode="w") as io:
688+
print(result.to_json(), file=io)
689+
690+
639691
def _sign_common(
640692
args: argparse.Namespace, output_map: OutputMap, predicate: dict[str, Any] | None
641693
) -> None:
@@ -666,63 +718,37 @@ def _sign_common(
666718
if not identity:
667719
_invalid_arguments(args, "No identity token supplied or detected!")
668720

669-
with signing_ctx.signer(identity) as signer:
670-
for file, outputs in output_map.items():
671-
_logger.debug(f"signing for {file.name}")
672-
with file.open(mode="rb") as io:
673-
# The input can be indefinitely large, so we perform a streaming
674-
# digest and sign the prehash rather than buffering it fully.
675-
digest = sha256_digest(io)
676-
try:
677-
if predicate is None:
678-
result = signer.sign_artifact(input_=digest)
679-
else:
680-
subject = Subject(
681-
name=file.name, digest={"sha256": digest.digest.hex()}
682-
)
683-
predicate_type = args.predicate_type
684-
statement_builder = StatementBuilder(
685-
subjects=[subject],
686-
predicate_type=predicate_type,
687-
predicate=predicate,
688-
)
689-
result = signer.sign_dsse(statement_builder.build())
690-
except ExpiredIdentity as exp_identity:
691-
print("Signature failed: identity token has expired")
692-
raise exp_identity
693-
694-
except ExpiredCertificate as exp_certificate:
695-
print("Signature failed: Fulcio signing certificate has expired")
696-
raise exp_certificate
697-
698-
print("Using ephemeral certificate:")
699-
cert = result.signing_certificate
700-
cert_pem = cert.public_bytes(Encoding.PEM).decode()
701-
print(cert_pem)
702-
703-
print(
704-
f"Transparency log entry created at index: {result.log_entry.log_index}"
705-
)
721+
# Not all commands provide --predicate-type
722+
predicate_type = getattr(args, "predicate_type", None)
706723

707-
sig_output: TextIO
708-
if outputs.signature is not None:
709-
sig_output = outputs.signature.open("w")
710-
else:
711-
sig_output = sys.stdout
724+
with signing_ctx.signer(identity) as signer:
725+
print("Using ephemeral certificate:")
726+
cert_pem = signer._signing_cert().public_bytes(Encoding.PEM).decode()
727+
print(cert_pem)
728+
729+
# sign in threads: this is relevant for especially Rekor v2 as otherwise we wait
730+
# for log inclusion for each signature separately
731+
with futures.ThreadPoolExecutor() as executor:
732+
jobs = [
733+
executor.submit(
734+
_sign_file_threaded,
735+
signer,
736+
predicate_type,
737+
predicate,
738+
file,
739+
outputs,
740+
)
741+
for file, outputs in output_map.items()
742+
]
743+
for job in futures.as_completed(jobs):
744+
job.result()
712745

713-
signature = base64.b64encode(result.signature).decode()
714-
print(signature, file=sig_output)
746+
for file, outputs in output_map.items():
715747
if outputs.signature is not None:
716748
print(f"Signature written to {outputs.signature}")
717-
718749
if outputs.certificate is not None:
719-
with outputs.certificate.open(mode="w") as io:
720-
print(cert_pem, file=io)
721750
print(f"Certificate written to {outputs.certificate}")
722-
723751
if outputs.bundle is not None:
724-
with outputs.bundle.open(mode="w") as io:
725-
print(result.to_json(), file=io)
726752
print(f"Sigstore bundle written to {outputs.bundle}")
727753

728754

sigstore/_internal/rekor/client.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,20 @@ def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo:
7373

7474

7575
class _Endpoint(ABC):
76-
def __init__(self, url: str, session: requests.Session) -> None:
76+
def __init__(self, url: str, session: requests.Session | None = None) -> None:
77+
# Note that _Endpoint may not be thread be safe if the same Session is provided
78+
# to an _Endpoint in multiple threads
7779
self.url = url
80+
if session is None:
81+
session = requests.Session()
82+
session.headers.update(
83+
{
84+
"Content-Type": "application/json",
85+
"Accept": "application/json",
86+
"User-Agent": USER_AGENT,
87+
}
88+
)
89+
7890
self.session = session
7991

8092

@@ -210,20 +222,6 @@ def __init__(self, url: str) -> None:
210222
Create a new `RekorClient` from the given URL.
211223
"""
212224
self.url = f"{url}/api/v1"
213-
self.session = requests.Session()
214-
self.session.headers.update(
215-
{
216-
"Content-Type": "application/json",
217-
"Accept": "application/json",
218-
"User-Agent": USER_AGENT,
219-
}
220-
)
221-
222-
def __del__(self) -> None:
223-
"""
224-
Terminates the underlying network session.
225-
"""
226-
self.session.close()
227225

228226
@classmethod
229227
def production(cls) -> RekorClient:
@@ -246,7 +244,8 @@ def log(self) -> RekorLog:
246244
"""
247245
Returns a `RekorLog` adapter for making requests to a Rekor log.
248246
"""
249-
return RekorLog(f"{self.url}/log", session=self.session)
247+
248+
return RekorLog(f"{self.url}/log")
250249

251250
def create_entry(self, request: EntryRequestBody) -> LogEntry:
252251
"""

sigstore/_internal/rekor/client_v2.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,6 @@ def __init__(self, base_url: str) -> None:
5454
Create a new `RekorV2Client` from the given URL.
5555
"""
5656
self.url = f"{base_url}/api/v2"
57-
self.session = requests.Session()
58-
self.session.headers.update(
59-
{
60-
"Content-Type": "application/json",
61-
"Accept": "application/json",
62-
"User-Agent": USER_AGENT,
63-
}
64-
)
65-
66-
def __del__(self) -> None:
67-
"""
68-
Terminates the underlying network session.
69-
"""
70-
self.session.close()
7157

7258
def create_entry(self, payload: EntryRequestBody) -> LogEntry:
7359
"""
@@ -78,7 +64,19 @@ def create_entry(self, payload: EntryRequestBody) -> LogEntry:
7864
https://github.com/sigstore/rekor-tiles/blob/main/CLIENTS.md#handling-longer-requests
7965
"""
8066
_logger.debug(f"proposed: {json.dumps(payload)}")
81-
resp = self.session.post(
67+
68+
# Use a short lived session to avoid potential issues with multi-threading:
69+
# Session thread-safety is ambiguous
70+
session = requests.Session()
71+
session.headers.update(
72+
{
73+
"Content-Type": "application/json",
74+
"Accept": "application/json",
75+
"User-Agent": USER_AGENT,
76+
}
77+
)
78+
79+
resp = session.post(
8280
f"{self.url}/log/entries",
8381
json=payload,
8482
)

sigstore/_internal/timestamp.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,6 @@ def __init__(self, url: str) -> None:
6868
Create a new `TimestampAuthorityClient` from the given URL.
6969
"""
7070
self.url = url
71-
self.session = requests.Session()
72-
self.session.headers.update(
73-
{
74-
"Content-Type": "application/timestamp-query",
75-
"User-Agent": USER_AGENT,
76-
}
77-
)
78-
79-
def __del__(self) -> None:
80-
"""
81-
Terminates the underlying network session.
82-
"""
83-
self.session.close()
8471

8572
def request_timestamp(self, signature: bytes) -> TimeStampResponse:
8673
"""
@@ -104,9 +91,18 @@ def request_timestamp(self, signature: bytes) -> TimeStampResponse:
10491
msg = f"invalid request: {error}"
10592
raise TimestampError(msg)
10693

94+
# Use single use session to avoid potential Session thread safety issues
95+
session = requests.Session()
96+
session.headers.update(
97+
{
98+
"Content-Type": "application/timestamp-query",
99+
"User-Agent": USER_AGENT,
100+
}
101+
)
102+
107103
# Send it to the TSA for signing
108104
try:
109-
response = self.session.post(
105+
response = session.post(
110106
self.url,
111107
data=timestamp_request.as_bytes(),
112108
timeout=CLIENT_TIMEOUT,

test/assets/integration/b.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
DO NOT MODIFY ME!
2+
3+
this is "b.txt", a sample input for sigstore-python's unit tests.
4+
5+
DO NOT MODIFY ME!

test/assets/integration/c.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
DO NOT MODIFY ME!
2+
3+
this is "c.txt", a sample input for sigstore-python's unit tests.
4+
5+
DO NOT MODIFY ME!

0 commit comments

Comments
 (0)