Skip to content

Commit 45377c9

Browse files
authored
chore(python): Add Support for Legacy DBEC and Migration Examples (#1938)
1 parent 50c6617 commit 45377c9

File tree

81 files changed

+4712
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+4712
-6
lines changed

.github/workflows/ci_examples_python.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,4 @@ jobs:
9595
# Run simple examples
9696
tox -e dynamodbencryption
9797
# Run migration examples
98-
# tox -e migration
98+
tox -e migration

.github/workflows/ci_test_python.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ jobs:
121121
shell: bash
122122
run: |
123123
tox -e integ
124+
tox -e legacyinteg
124125
125126
- name: Test ${{ matrix.library }} Python coverage
126127
working-directory: ./${{ matrix.library }}/runtimes/python

DynamoDbEncryption/runtimes/python/pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ include = ["**/internaldafny/generated/*.py"]
1313
[tool.poetry.dependencies]
1414
python = "^3.11.0"
1515
aws-cryptographic-material-providers = { path = "../../../submodules/MaterialProviders/AwsCryptographicMaterialProviders/runtimes/python", develop = false}
16+
# Optional dependencies
17+
# Should only include the legacy library if migrating from the legacy library
18+
dynamodb_encryption_sdk = { version = "^3.3.0", optional = true }
19+
20+
[tool.poetry.extras]
21+
legacy-ddbec = ["dynamodb_encryption_sdk"]
1622

1723
# Package testing
1824

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from _dafny import Seq
4+
5+
import aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride
6+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkDynamoDbItemEncryptorTypes import (
7+
DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig,
8+
Error_DynamoDbItemEncryptorException,
9+
Error_Opaque,
10+
DecryptItemInput_DecryptItemInput,
11+
EncryptItemInput_EncryptItemInput,
12+
)
13+
from aws_dbesdk_dynamodb.internaldafny.generated.AwsCryptographyDbEncryptionSdkStructuredEncryptionTypes import (
14+
CryptoAction_ENCRYPT__AND__SIGN,
15+
CryptoAction_SIGN__ONLY,
16+
CryptoAction_DO__NOTHING,
17+
)
18+
from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb.references import (
19+
ILegacyDynamoDbEncryptor,
20+
)
21+
from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.models import (
22+
EncryptItemInput,
23+
EncryptItemOutput,
24+
DecryptItemOutput,
25+
DecryptItemInput,
26+
)
27+
28+
try:
29+
from dynamodb_encryption_sdk.encrypted.client import EncryptedClient
30+
from dynamodb_encryption_sdk.encrypted.table import EncryptedTable
31+
from dynamodb_encryption_sdk.encrypted.resource import EncryptedResource
32+
from dynamodb_encryption_sdk.encrypted.client import EncryptedPaginator
33+
from dynamodb_encryption_sdk.encrypted.item import encrypt_dynamodb_item, decrypt_dynamodb_item
34+
from dynamodb_encryption_sdk.structures import EncryptionContext, AttributeActions
35+
from dynamodb_encryption_sdk.identifiers import CryptoAction
36+
from dynamodb_encryption_sdk.encrypted import CryptoConfig
37+
from dynamodb_encryption_sdk.internal.identifiers import ReservedAttributes
38+
39+
_HAS_LEGACY_DDBEC = True
40+
except ImportError:
41+
_HAS_LEGACY_DDBEC = False
42+
43+
44+
class InternalLegacyOverride(aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride):
45+
def __init__(self):
46+
super().__init__()
47+
self.crypto_config = None
48+
self.policy = None
49+
50+
@staticmethod
51+
def Build(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
52+
# Check for early return (Postcondition): If there is no legacyOverride there is nothing to do.
53+
if config.legacyOverride.is_None:
54+
return InternalLegacyOverride.CreateBuildSuccess(InternalLegacyOverride.CreateInternalLegacyOverrideNone())
55+
56+
legacy_override = config.legacyOverride.value
57+
58+
# Precondition: The encryptor MUST be a DynamoDBEncryptor
59+
if not _HAS_LEGACY_DDBEC:
60+
return InternalLegacyOverride.CreateBuildFailure(
61+
InternalLegacyOverride.CreateError("Could not find aws-dynamodb-encryption-python installation")
62+
)
63+
64+
# Precondition: The encryptor MUST be one of the supported legacy types
65+
if not (
66+
isinstance(legacy_override.encryptor, EncryptedClient)
67+
or isinstance(legacy_override.encryptor, EncryptedTable)
68+
or isinstance(legacy_override.encryptor, EncryptedResource)
69+
):
70+
return InternalLegacyOverride.CreateBuildFailure(
71+
InternalLegacyOverride.CreateError("Legacy encryptor is not supported")
72+
)
73+
74+
# Preconditions: MUST be able to create valid encryption context
75+
maybe_encryption_context = InternalLegacyOverride.legacyEncryptionContext(config)
76+
if maybe_encryption_context.is_Failure:
77+
return maybe_encryption_context
78+
79+
# Precondition: All actions MUST be supported types
80+
maybe_actions = InternalLegacyOverride.legacyActions(legacy_override.attributeActionsOnEncrypt)
81+
if maybe_actions.is_Failure:
82+
return maybe_actions
83+
84+
# Create and return the legacy override instance
85+
legacy_instance = InternalLegacyOverride()
86+
legacy_instance.policy = legacy_override.policy
87+
legacy_instance.crypto_config = CryptoConfig(
88+
materials_provider=legacy_override.encryptor._materials_provider,
89+
encryption_context=maybe_encryption_context.value,
90+
attribute_actions=maybe_actions.value,
91+
)
92+
return InternalLegacyOverride.CreateBuildSuccess(
93+
InternalLegacyOverride.CreateInternalLegacyOverrideSome(legacy_instance)
94+
)
95+
96+
@staticmethod
97+
def legacyEncryptionContext(config: DynamoDbItemEncryptorConfig_DynamoDbItemEncryptorConfig):
98+
"""Create the legacy encryption context from the config."""
99+
try:
100+
# Convert Dafny types to Python strings for the encryption context
101+
table_name = InternalLegacyOverride.DafnyStringToNativeString(config.logicalTableName)
102+
partition_key_name = InternalLegacyOverride.DafnyStringToNativeString(config.partitionKeyName)
103+
sort_key_name = (
104+
InternalLegacyOverride.DafnyStringToNativeString(config.sortKeyName.value)
105+
if config.sortKeyName.is_Some
106+
else None
107+
)
108+
109+
# Create the legacy encryption context with the extracted values
110+
encryption_context = EncryptionContext(
111+
table_name=table_name,
112+
partition_key_name=partition_key_name,
113+
sort_key_name=sort_key_name,
114+
)
115+
116+
return InternalLegacyOverride.CreateBuildSuccess(encryption_context)
117+
except Exception as ex:
118+
return InternalLegacyOverride.CreateBuildFailure(Error_Opaque(ex))
119+
120+
@staticmethod
121+
def legacyActions(attribute_actions_on_encrypt):
122+
"""Create the legacy attribute actions from the config."""
123+
try:
124+
# Create a new AttributeActions with default ENCRYPT_AND_SIGN
125+
# Default Action to take if no specific action is defined in ``attribute_actions``
126+
# https://docs.aws.amazon.com/database-encryption-sdk/latest/devguide/DDBEC-legacy-concepts.html#legacy-attribute-actions
127+
legacy_actions = AttributeActions(default_action=CryptoAction.ENCRYPT_AND_SIGN)
128+
129+
# Map the action from the config to legacy actions
130+
attribute_actions = {}
131+
for key, action in attribute_actions_on_encrypt.items:
132+
key_str = InternalLegacyOverride.DafnyStringToNativeString(key)
133+
134+
# Map the action type to the appropriate CryptoAction
135+
if action == CryptoAction_ENCRYPT__AND__SIGN():
136+
attribute_actions[key_str] = CryptoAction.ENCRYPT_AND_SIGN
137+
elif action == CryptoAction_SIGN__ONLY():
138+
attribute_actions[key_str] = CryptoAction.SIGN_ONLY
139+
elif action == CryptoAction_DO__NOTHING():
140+
attribute_actions[key_str] = CryptoAction.DO_NOTHING
141+
else:
142+
return InternalLegacyOverride.CreateBuildFailure(
143+
InternalLegacyOverride.CreateError(f"Unknown action type: {action}")
144+
)
145+
146+
# Update the attribute_actions dictionary
147+
legacy_actions.attribute_actions = attribute_actions
148+
return InternalLegacyOverride.CreateBuildSuccess(legacy_actions)
149+
except Exception as ex:
150+
return InternalLegacyOverride.CreateBuildFailure(Error_Opaque(ex))
151+
152+
def EncryptItem(self, input: EncryptItemInput_EncryptItemInput):
153+
"""Encrypt an item using the legacy DynamoDB encryptor.
154+
155+
:param input: EncryptItemInput containing the plaintext item to encrypt
156+
:returns Result containing the encrypted item or an error
157+
"""
158+
try:
159+
# Precondition: Policy MUST allow the caller to encrypt.
160+
if not self.policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT:
161+
return self.CreateEncryptItemFailure(
162+
InternalLegacyOverride.CreateError("Legacy policy does not support encrypt")
163+
)
164+
165+
# Get the Native Plaintext Item
166+
native_input = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemInput(
167+
input
168+
)
169+
170+
# Encrypt the item using the instance attributes
171+
encrypted_item = encrypt_dynamodb_item(
172+
item=native_input.plaintext_item,
173+
crypto_config=self.crypto_config.with_item(native_input.plaintext_item),
174+
)
175+
176+
# Return the encrypted item
177+
# The legacy encryption method returns items in the format that Dafny expects,
178+
# so no additional conversion is needed here
179+
native_output = EncryptItemOutput(encrypted_item=encrypted_item, parsed_header=None)
180+
dafny_output = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.smithy_to_dafny.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_EncryptItemOutput(
181+
native_output
182+
)
183+
return self.CreateEncryptItemSuccess(dafny_output)
184+
185+
except Exception as ex:
186+
return self.CreateEncryptItemFailure(InternalLegacyOverride.CreateError(Error_Opaque(ex)))
187+
188+
def DecryptItem(self, input: DecryptItemInput_DecryptItemInput):
189+
"""Decrypt an item using the legacy DynamoDB encryptor.
190+
191+
:param input: DecryptItemInput containing the encrypted item to decrypt
192+
:returns Result containing the decrypted item or an error
193+
"""
194+
try:
195+
# Precondition: Policy MUST allow the caller to decrypt.
196+
# = specification/dynamodb-encryption-client/decrypt-item.md#behavior
197+
## If a [Legacy Policy](./ddb-table-encryption-config.md#legacy-policy) of
198+
## `FORBID_LEGACY_ENCRYPT_FORBID_LEGACY_DECRYPT` is configured,
199+
## and the input item [is an item written in the legacy format](#determining-legacy-items),
200+
## this operation MUST fail.
201+
if not (
202+
self.policy.is_FORCE__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
203+
or self.policy.is_FORBID__LEGACY__ENCRYPT__ALLOW__LEGACY__DECRYPT
204+
):
205+
return self.CreateDecryptItemFailure(
206+
InternalLegacyOverride.CreateError("Legacy policy does not support decrypt")
207+
)
208+
209+
# Get the Native DecryptItemInput
210+
native_input: DecryptItemInput = (
211+
aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput(
212+
input
213+
)
214+
)
215+
# Decrypt the item using the instance attributes
216+
decrypted_item = decrypt_dynamodb_item(
217+
item=native_input.encrypted_item,
218+
crypto_config=self.crypto_config.with_item(native_input.encrypted_item),
219+
)
220+
221+
native_output = DecryptItemOutput(plaintext_item=decrypted_item, parsed_header=None)
222+
dafny_output = aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.smithy_to_dafny.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemOutput(
223+
native_output
224+
)
225+
return self.CreateDecryptItemSuccess(dafny_output)
226+
except Exception as ex:
227+
return self.CreateDecryptItemFailure(InternalLegacyOverride.CreateError(Error_Opaque(ex)))
228+
229+
def IsLegacyInput(self, input: DecryptItemInput_DecryptItemInput):
230+
"""
231+
Determine if the input is from a legacy client.
232+
233+
:param input: The decrypt item input to check
234+
:returns Boolean indicating if the input is from a legacy client
235+
"""
236+
if not input.is_DecryptItemInput:
237+
return False
238+
239+
# Get the Native DecryptItemInput
240+
native_input: DecryptItemInput = (
241+
aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor.dafny_to_smithy.aws_cryptography_dbencryptionsdk_dynamodb_itemencryptor_DecryptItemInput(
242+
input
243+
)
244+
)
245+
# = specification/dynamodb-encryption-client/decrypt-item.md#determining-legacy-items
246+
## An item MUST be determined to be encrypted under the legacy format if it contains
247+
## attributes for the material description and the signature.
248+
return (
249+
"*amzn-ddb-map-desc*" in native_input.encrypted_item and "*amzn-ddb-map-sig*" in native_input.encrypted_item
250+
)
251+
252+
@staticmethod
253+
def DafnyStringToNativeString(dafny_input):
254+
return b"".join(ord(c).to_bytes(2, "big") for c in dafny_input).decode("utf-16-be")
255+
256+
@staticmethod
257+
def NativeStringToDafnyString(native_input):
258+
return Seq(
259+
"".join([chr(int.from_bytes(pair, "big")) for pair in zip(*[iter(native_input.encode("utf-16-be"))] * 2)])
260+
)
261+
262+
@staticmethod
263+
def CreateError(message):
264+
"""Create an Error with the given message."""
265+
return Error_DynamoDbItemEncryptorException(InternalLegacyOverride.NativeStringToDafnyString(message))
266+
267+
268+
aws_dbesdk_dynamodb.internaldafny.generated.InternalLegacyOverride.InternalLegacyOverride = InternalLegacyOverride
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
5+
def sort_dynamodb_json_lists(obj):
6+
"""
7+
Utility that recursively sorts all lists in a DynamoDB JSON-like structure.
8+
DynamoDB JSON uses lists to represent sets, so strict equality can fail.
9+
Sort lists to ensure consistent ordering when comparing expected and actual items.
10+
"""
11+
if isinstance(obj, dict):
12+
return {k: sort_dynamodb_json_lists(v) for k, v in obj.items()}
13+
elif isinstance(obj, list):
14+
try:
15+
return sorted(obj) # Sort lists for consistent comparison
16+
except TypeError:
17+
return obj # Not all lists are sortable; ex. complex_item_ddb's "list" attribute
18+
return obj

0 commit comments

Comments
 (0)